{-# LANGUAGE Rank2Types #-} -- Re-writing Iteratee.hs into CPS -- The delimited-control nature of Iteratees becomes apparent. -- We will often quote the code from Iteratee.hs in comments, -- to show that rewriting from Iteratee to IterateeCPS is largely -- mechanical. -- We stress that although the type of Iteratee and the implementation -- of primitive Iteratees (such as head, peek, etc) changed, the -- code for high-level Iteratees such as read_lines and the code -- for all enumerators remains the same! The last part of this file -- (from read_lines onward, including all enumerators and tests) -- is copied *verbatim* from Iteratee.hs -- (some comments are abbreviated though and extra tests are added). -- This file is not meant for production; it is meant only to illustrate -- CPS and the likely performance problem. The frequently used -- function lift_shift causes piling up of the layers of interpretation, -- similar to piling up of the layers when emulating `control' with `shift'. -- We need a different CPS transform, the one that expresses control -- directly. -- OTH, I wonder if it would be more efficient, since that different -- CPS typically involves the stack of the continuations. module IterateeCPS where import System.Posix import Foreign.C import Foreign.Ptr import Foreign.Marshal.Alloc import Prelude hiding (head, drop, dropWhile, take, break, catch) import qualified Prelude import Data.Char (isHexDigit, digitToInt) import Control.Monad.Identity import LowLevelIO -- The same type as in Iteratee.hs type ErrMsg = String data Stream = EOF (Maybe ErrMsg) | Chunk String deriving Show {- newtype Iteratee a = Iteratee{runIter:: Stream -> IterV a} data IterV a = IE_done a Stream | IE_cont (Iteratee a) (Maybe ErrMsg) -} -- In CPS, the Iteratee looks as follows. The type shows that Iteratee -- (monad) is really State + CPS newtype Iteratee a = Iteratee{unIter :: forall r. (a -> Stream -> IterV r) -> Stream -> IterV r} data IterV a = IE_done a Stream | IE_cont (Stream -> IterV a) (Maybe ErrMsg) instance Monad Iteratee where return x = Iteratee $ \k -> k x -- eta-reduce the state m >>= f = Iteratee $ \k -> unIter m (\x -> unIter (f x) k) -- ditto -- This is essentially the reset control operator runIter :: Iteratee a -> Stream -> IterV a runIter m = unIter m IE_done -- Alas, we need this (see enum_pure_nchunk) because we emulate -- shift rather than `control' -- We have to `re-interpret' the IterV for the `higher' reset lift_shift :: (Stream -> IterV a) -> Iteratee a lift_shift sr = Iteratee(\k s -> check k (sr s)) where check k (IE_done x s) = k x s check k (IE_cont sr e) = IE_cont (check k . sr) e -- The equational law: -- lift_shift (runIter iter) === iter {- -- Throw an unrecoverable error throwErr :: ErrMsg -> Iteratee a throwErr e = Iteratee (\s -> IE_cont (throwErr e) (Just e)) -} throwErr :: ErrMsg -> Iteratee a throwErr e = Iteratee (\k s -> step) where step = IE_cont (\_ -> step) (Just e) -- Throw a recoverable error throwRecoverableErr :: ErrMsg -> Iteratee a -> Iteratee a throwRecoverableErr e i = Iteratee (\k s -> IE_cont (unIter i k) (Just e)) -- Produce the EOF error message to be passed to throwErr. -- If the stream was terminated because of an error, keep the original -- error message. -- The same as in Iteratee.hs setEOF :: Stream -> ErrMsg setEOF (EOF (Just e)) = e setEOF _ = "EOF" -- The same as in Iteratee.hs run :: Iteratee a -> a run iter = case runIter iter (EOF Nothing) of IE_done x _ -> x IE_cont _ e -> error $ "control message: " ++ show e -- ------------------------------------------------------------------------ -- Primitive iteratees: parser combinators -- Quoted from Iteratee.hs: {- break :: (Char -> Bool) -> Iteratee String break cpred = Iteratee (step "") where step before (Chunk "") = IE_cont (Iteratee (step before)) Nothing step before (Chunk str) = case Prelude.break cpred str of (_,"") -> IE_cont (Iteratee (step (before ++ str))) Nothing (str,tail) -> IE_done (before ++ str) (Chunk tail) step before stream = IE_done before stream -} -- In CPS, the same code looks as follows. -- The claimed benefit: no need to add IE_done constructor -- Consequently, the bind operator does not have to remove IE_done break :: (Char -> Bool) -> Iteratee String break cpred = Iteratee (step "") where step before k (Chunk "") = IE_cont (step before k) Nothing step before k (Chunk str) = case Prelude.break cpred str of (_,"") -> IE_cont (step (before ++ str) k) Nothing (str,tail) -> k (before ++ str) (Chunk tail) step before k stream = k before stream -- Quoted from Iteratee.hs: {- -- Look ahead at the next element of the stream, without removing -- it from the stream. -- Return (Just c) if successful, return Nothing if the stream is -- terminated (by EOF or an error) peek :: Iteratee (Maybe Char) peek = Iteratee step where step (Chunk "") = IE_cont peek Nothing step s@(Chunk (c:_)) = IE_done (Just c) s step stream = IE_done Nothing stream -} -- The same in CPS peek :: Iteratee (Maybe Char) peek = Iteratee step where step k (Chunk "") = IE_cont (step k) Nothing step k s@(Chunk (c:_)) = k (Just c) s step k stream = k Nothing stream -- Compare with head in Iteratee.hs head :: Iteratee Char head = Iteratee step where step k (Chunk "") = IE_cont (step k) Nothing step k (Chunk (c:t)) = k c (Chunk t) step k stream = IE_cont (step k) (Just (setEOF stream)) -- Compare with heads in Iteratee.hs heads :: String -> Iteratee Int heads "" = return 0 heads str = Iteratee (step 0 str) where step cnt str k (Chunk "") = IE_cont (step cnt str k) Nothing step cnt (c:t) k s@(Chunk (c':t')) = if c == c' then step (succ cnt) t k (Chunk t') else k cnt s step cnt _ k stream = k cnt stream -- Compare with skip_till_eof in Iteratee.hs skip_till_eof :: Iteratee () skip_till_eof = Iteratee step where step k (Chunk _) = IE_cont (step k) Nothing step k s = k () s -- Original Iteratee code {- -- Skip n elements of the stream, if there are that many -- This is the analogue of List.drop drop :: Int -> Iteratee () drop 0 = return () drop n = Iteratee step where step (Chunk str) | length str <= n = IE_cont (drop (n - length str)) Nothing step (Chunk str) = IE_done () (Chunk (Prelude.drop n str)) step stream = IE_done () stream -} -- The translation from Iteratee to CPS is mechanical drop :: Int -> Iteratee () drop 0 = return () drop n = Iteratee step where step k (Chunk str) | length str <= n = IE_cont (unIter (drop (n - length str)) k) Nothing step k (Chunk str) = k () (Chunk (Prelude.drop n str)) step k stream = k () stream -- Check if the stream is finished -- The translation to CPS is mechanical is_finished :: Iteratee (Maybe ErrMsg) is_finished = Iteratee check where check k s@(EOF e) = k (Just $ maybe "EOF" id e) s check k s = k Nothing s -- Quoted from Iteratee.hs: {- -- The following pattern seems to be appearing often -- I don't know the name for it yet... See however IterateeCPS.hs -- As common, we need pure and monadic versions, for use -- in pure and effectful enumerators check_if_doneM :: Monad m => (Iteratee a -> m (Iteratee a)) -> IterV a -> m (Iteratee a) check_if_doneM _ (IE_done x _) = return $ return x check_if_doneM k (IE_cont x Nothing) = k x check_if_doneM _ (IE_cont _ (Just e)) = return $ throwErr e -} -- Hmm, the analogy with lift_shift now gives me the idea what -- check_if_done is doing check_if_doneM :: Monad m => (Iteratee a -> m (Iteratee a)) -> IterV a -> m (Iteratee a) check_if_doneM _ (IE_done x _) = return $ return x check_if_doneM k (IE_cont x Nothing) = k . lift_shift $ x check_if_doneM _ (IE_cont _ (Just e)) = return $ throwErr e check_if_done :: (Iteratee a -> Iteratee a) -> IterV a -> Iteratee a check_if_done k iv = runIdentity $ check_if_doneM (Identity . k) iv -- Read n elements from a stream and apply the given iteratee to the -- stream of the read elements. Unless the stream is terminated early, we -- read exactly n elements (even if the iteratee has accepted fewer). -- This procedure shows a different way of composing two iteratees: -- `vertical' rather than `horizontal' take :: Int -> Iteratee a -> Iteratee (Iteratee a) take n iter = Iteratee (step n) where step 0 k str = k iter str step n k (Chunk "") = IE_cont (step n k) Nothing step n k chunk@(Chunk str) | length str <= n = IE_cont (check (n - length str) k $ runIter iter chunk) Nothing step n k (Chunk str) = done k (Chunk s1) (Chunk s2) where (s1,s2) = splitAt n str step n k stream = done k stream stream -- check n k (IE_done x _) = unIter (drop n) (\_ -> k $ return x) check n k (IE_done x _) = unIter (drop n >> return (return x)) k check n k (IE_cont x Nothing) = unIter (take n (lift_shift x)) k check n k (IE_cont _ (Just e)) = unIter (drop n >> throwErr e) k done k s1 s2 = k (check_if_done id $ runIter iter s1) s2 -- ======================================================================== -- From this moment on, the code is the same as in Iteratee.hs -- (modulo extra tests) -- ------------------------------------------------------------------------ -- Combining the primitive iteratees to solve the running problem: -- Reading headers and the content from an HTTP-like stream -- This section is identical to that in Iteratee.hs: the iteratee -- combinators remain the same, regardless how primitive iteratees -- are implemented. type Line = String -- The line of text, terminators are not included read_lines :: Iteratee (Either [Line] [Line]) read_lines = lines' [] where lines' acc = break (\c -> c == '\r' || c == '\n') >>= \l -> terminators >>= check acc l check acc _ 0 = return . Left . reverse $ acc -- no terminator was found check acc "" _ = return . Right . reverse $ acc check acc l _ = lines' (l:acc) terminators = heads "\r\n" >>= \n -> if n == 0 then heads "\n" else return n -- ------------------------------------------------------------------------ -- Enumerators type Enumerator a = Iteratee a -> Iteratee a type EnumeratorM m a = Iteratee a -> m (Iteratee a) -- The same as in Iteratee.hs enum_eof :: Enumerator a enum_eof iter = check $ runIter iter (EOF Nothing) where check (IE_done x _) = return x check (IE_cont _ e) = throwErr (maybe "Divergent Iteratee" id e) enum_err :: ErrMsg -> Enumerator a enum_err e iter = check $ runIter iter (EOF (Just e)) where check (IE_done x _) = return x check (IE_cont _ e) = throwErr (maybe "Divergent Iteratee" id e) (>.):: Enumerator a -> Enumerator a -> Enumerator a (>.) = flip (.) (>>.):: Monad m => EnumeratorM m a -> EnumeratorM m a -> EnumeratorM m a e1 >>. e2 = (>>= (e2 $)) . e1 liftE:: Monad m => Enumerator a -> EnumeratorM m a liftE f = return . f enum_pure_1chunk :: String -> Enumerator a enum_pure_1chunk str iter = check_if_done id $ runIter iter (Chunk str) -- The pure n-chunk enumerator -- It passes a given string to the iteratee in chunks no larger than n. -- This enumerator does no IO and is useful for testing of base parsing -- and handling of chunk boundaries enum_pure_nchunk :: String -> Int -> Enumerator a enum_pure_nchunk "" _ iter = iter enum_pure_nchunk str n iter = check_if_done (enum_pure_nchunk s2 n) $ runIter iter (Chunk s1) where (s1,s2) = splitAt n str -- The enumerator of a POSIX Fd -- Unlike fdRead (which allocates a new buffer on -- each invocation), we use the same buffer all throughout enum_fd :: Fd -> EnumeratorM IO a enum_fd fd iter = allocaBytes (fromIntegral buffer_size) (loop iter) where buffer_size = 5 -- for tests; in real life, there should be 1024 or so loop iter p = do n <- myfdRead fd p buffer_size case n of Left errno -> return $ enum_err "IO error" iter Right 0 -> return iter Right n -> do str <- peekCAStringLen (p,fromIntegral n) check_if_doneM (\i -> loop i p) $ runIter iter (Chunk str) enum_chunk_decoded :: Iteratee a -> Iteratee a enum_chunk_decoded iter = read_size where read_size = break (== '\r') >>= checkCRLF iter . check_size checkCRLF iter m = do n <- heads "\r\n" if n == 2 then m else frame_err "Bad Chunk: no CRLF" iter check_size "0" = checkCRLF iter (enum_eof iter) check_size str@(_:_) = maybe (frame_err ("Bad chunk size: " ++ str) iter) read_chunk $ read_hex 0 str check_size _ = frame_err "Error reading chink size" iter read_chunk size = take size iter >>= \r -> checkCRLF r $ enum_chunk_decoded r read_hex acc "" = Just acc read_hex acc (d:rest) | isHexDigit d = read_hex (16*acc + digitToInt d) rest read_hex acc _ = Nothing frame_err e iter = throwRecoverableErr "Frame error" (enum_err e iter) -- ------------------------------------------------------------------------ -- Primitive Tests test_iteratee = do drop 1 v1 <- head drop 2 v2 <- head return (v1,v2) test1 = run $ enum_pure_nchunk "abcde" 5 test_iteratee -- ('b','e') test2 = run $ enum_pure_nchunk "abcde" 2 test_iteratee -- ('b','e') -- Here we test passing `the state of parsing' from one enumerator -- to the other test3 = run . enum_pure_nchunk "de" 1 . enum_pure_nchunk "abc" 2 $ test_iteratee -- ('b','e') -- ------------------------------------------------------------------------ -- Tests -- Pure tests, requiring no IO test_str1 = "header1: v1\rheader2: v2\r\nheader3: v3\nheader4: v4\n" ++ "header5: v5\r\nheader6: v6\r\nheader7: v7\r\n\nrest\n" read_lines_rest :: Iteratee (Either [Line] [Line], String) read_lines_rest = do ls <- read_lines rest <- break (const False) return (ls,rest) testp1 = let (Right lines, rest) = run $ enum_pure_1chunk test_str1 read_lines_rest in lines == ["header1: v1","header2: v2","header3: v3","header4: v4", "header5: v5","header6: v6","header7: v7"] && rest == "rest\n" testp2 = let (Right lines, rest) = run $ enum_pure_nchunk test_str1 5 read_lines_rest in lines == ["header1: v1","header2: v2","header3: v3","header4: v4", "header5: v5","header6: v6","header7: v7"] && rest == "rest\n" -- Test Fd driver test_driver filepath = do fd <- openFd filepath ReadOnly Nothing defaultFileFlags putStrLn "About to read headers" result <- fmap run $ enum_fd fd read_lines_and_one_more_line closeFd fd putStrLn "Finished reading headers" case result of (Right headers,after,_) -> do putStrLn $ "The line after headers is: " ++ show after putStrLn "Complete headers" print headers (Left headers,_,status) -> do putStrLn $ "Problem " ++ show status putStrLn "Incomplete headers" print headers where read_lines_and_one_more_line = do lines <- read_lines after <- break (\c -> c == '\r' || c == '\n') status <- is_finished return (lines,after,status) test11 = test_driver "test1.txt" -- Complete headers, up to "header7: v7" test12 = test_driver "test2.txt" -- The same test13 = test_driver "test3.txt" -- "header3: v3", then EOF test14 = test_driver "/dev/null" -- Incomplete headers [], EOF -- Run the complete test, reading the headers and the body test_driver_full filepath = do fd <- openFd filepath ReadOnly Nothing defaultFileFlags putStrLn "About to read headers" result <- fmap run $ enum_fd fd read_headers_body closeFd fd putStrLn "Finished reading" case result of (Right headers,Right body,_) -> do putStrLn "Complete headers" print headers putStrLn "\nComplete body" print body (Left headers,_,status) -> do putStrLn $ "Problem " ++ show status putStrLn "Incomplete headers" print headers (Right headers,Left body,status) -> do putStrLn "Complete headers" print headers putStrLn $ "Problem " ++ show status putStrLn "Incomplete body" print body where read_headers_body = do headers <- read_lines body <- enum_chunk_decoded read_lines status <- is_finished return (headers,body,status) test21 = test_driver_full "test_full1.txt" {- Complete headers ["header1: v1","header2: v2","header3: v3","header4: v4"] Problem Just "EOF" Incomplete body ["body line 1","body line 2","body line 3","body line 4"] -} test22 = test_driver_full "test_full2.txt" -- *** Exception: control message: Just "Frame error" test23 = test_driver_full "test_full3.txt" {- Complete headers ["header1: v1","header2: v2","header3: v3","header4: v4"] Problem Just "EOF" Incomplete body ["body line 1","body line 2","body line 3","body line 4","body line 5"] -}