{-# LANGUAGE Rank2Types #-} -- Monadic and General Iteratees: -- incremental input parsers, processors and transformers -- Re-writing IterateeM.hs into CPS -- The delimited-control nature of Iteratees becomes apparent. -- We will often quote the code from IterateeM.hs in comments, -- to highlight the differences, or the lack of them. Although the type -- of Iteratee and the implementation of primitive Iteratees -- (such as head, peek, etc) changed, the code for high-level Iteratees -- such as line, print_lines and the code for all enumeratees remains -- almost the same! -- The running example, parts 1 and 2 -- Part 1 is reading the headers, the sequence of lines terminated by an -- empty line. Each line is terminated by CR, LF, or CRLF. -- We should return the headers in order. In the case of error, -- we should return the headers read so far and the description of the error. -- Part 2 is reading the headers and reading all the lines from the -- HTTP-chunk-encoded content that follows the headers. Part 2 thus -- verifies layering of streams, and processing of one stream -- embedded (chunk encoded) into another stream. module IterateeMCPS where import System.Posix import Foreign.C import Foreign.Ptr import Foreign.Marshal.Alloc import Data.List (splitAt) import Prelude hiding (head, drop, dropWhile, take, break, catch) import qualified Prelude import Data.Char (isHexDigit, digitToInt, isSpace) import Control.Monad.Trans import Control.Monad.Identity import Control.Exception import Data.Typeable import LowLevelIO -- A stream is a (continuing) sequence of elements bundled in Chunks. -- The first variant means no more data will be coming: the stream -- is exhausted, either due to EOF or some error. -- Chunk [a] gives the currently available part of the stream. -- The stream is not terminated yet. -- The case (Chunk []) signifies a stream with no currently available -- data but which is still continuing. A stream processor should, -- informally speaking, ``suspend itself'' and wait for more data -- to arrive. -- The Stream data type is the same type as in Iteratee.hs type ErrMsg = SomeException data Stream el = EOF (Maybe ErrMsg) | Chunk [el] deriving Show -- Iteratee -- a generic stream processor, what is being folded over -- a stream {- -- Quoted from IterateeM.hs newtype Iteratee el m a = Iteratee{runIter:: m (IterV el m a)} data IterV el m a = IE_done a (Stream el) | IE_cont (Stream el -> Iteratee el m a) (Maybe ErrMsg) -} newtype Iteratee el m a = Iteratee{runIter :: forall r. (a -> Stream el -> m r) -> ((Stream el -> Iteratee el m a) -> Maybe ErrMsg -> m r) -> m r} -- It turns out, Iteratee forms a monad. We can use the familiar do -- notation for composing Iteratees {- -- Quoted from IterateeM.hs instance Monad m => Monad (Iteratee el m) where return x = ie_done x (Chunk []) m >>= f = Iteratee $ runIter m >>= docase where docase (IE_done a (Chunk [])) = runIter (f a) docase (IE_done a stream) = runIter (f a) >>= \r -> case r of IE_done x _ -> return $ IE_done x stream IE_cont k Nothing -> runIter $ k stream iv -> return iv docase (IE_cont k e) = return $ IE_cont ((>>= f) . k) e -} instance Monad m => Monad (Iteratee el m) where {-# INLINE return #-} return x = Iteratee $ \on_done _ -> on_done x (Chunk []) {-# INLINE (>>=) #-} m >>= f = Iteratee $ \on_done on_cont -> let m_done a (Chunk []) = runIter (f a) on_done on_cont m_done a stream = runIter (f a) (\x _ -> on_done x stream) f_cont where f_cont k Nothing = runIter (k stream) on_done on_cont f_cont k e = on_cont k e in runIter m m_done (\k -> on_cont ((>>= f) . k)) instance MonadTrans (Iteratee el) where {-# INLINE lift #-} -- lift m = Iteratee (m >>= runIter . return) lift m = Iteratee $ \on_done _ -> m >>= \x -> on_done x (Chunk []) -- Throw an irrecoverable error throwErr :: Monad m => ErrMsg -> Iteratee el m a throwErr e = Iteratee $ \_ on_cont -> on_cont (\_ -> throwErr e) (Just e) -- Throw a recoverable error throwRecoverableErr :: Monad m => ErrMsg -> (Stream el -> Iteratee el m a) -> Iteratee el m a -- throwRecoverableErr e i = Iteratee (return $ IE_cont i (Just e)) throwRecoverableErr e i = Iteratee $ \_ on_cont -> on_cont i (Just e) -- Produce the EOF error message to be passed to throwErr. -- If the stream was terminated because of an error, keep the original -- error message. setEOF :: Stream el -> ErrMsg setEOF (EOF (Just e)) = e setEOF _ = exc_EOF exc_EOF = toException $ ErrorCall "EOF" exc_divergent = toException $ ErrorCall "divergent iteratee" -- To run this code without new Exceptions, uncomment the following -- type SomeException = Exception -- toException = id -- Useful combinators for implementing iteratees and enumerators ie_done :: a -> Stream el -> Iteratee el m a ie_done x str = Iteratee $ \on_done _ -> on_done x str ie_cont :: (Stream el -> Iteratee el m a) -> Maybe ErrMsg -> Iteratee el m a ie_cont k e = Iteratee $ \_ on_cont -> on_cont k e {- -- Quoted from IterateeM.hs liftI :: Monad m => IterateePure el m a -> Iteratee el m a liftI k = retI $ IE_cont (retI . k) Nothing -} -- This turns out to be just an instance of ie_cont: -- liftI k = ie_cont k Nothing liftI :: Monad m => (Stream el -> Iteratee el m a) -> Iteratee el m a liftI k = Iteratee $ \_ on_cont -> on_cont k Nothing -- Monadic versions, frequently used by enumerators ie_doneM :: Monad m => a -> Stream el -> m (Iteratee el m a) ie_doneM x str = return $ Iteratee $ \on_done _ -> on_done x str ie_contM :: Monad m => (Stream el -> Iteratee el m a) -> Maybe ErrMsg -> m (Iteratee el m a) ie_contM k e = return $ Iteratee $ \_ on_cont -> on_cont k e -- Unlike IterateeM.hs, we no longer need ($$) and (>>==) combinators, -- because we do not distinguish Iteratee from IterV -- The following is a `variant' of join in the Iteratee el m monad. -- When el' is the same as el, the type of joinI is indeed that of -- true monadic join. However, joinI is subtly different: since -- generally el' is different from el, it makes no sense to -- continue using the internal, Iteratee el' m a: we no longer -- have elements of the type el' to feed to that iteratee. -- We thus send EOF to the internal Iteratee and propagate its result. -- This join function is useful for Enumeratees, for embedded/nested streams. -- For example, the common pattern is -- do -- lines <- joinI $ enum_lines stream2list -- The tests below show many examples (e.g., read_lines_and_one_more_line). -- -- joinI can be implemented as -- joinI outer = outer >>= lift . run -- The following is an optimized implementation, obtained by inlining. joinI :: Monad m => Iteratee el m (Iteratee el' m a) -> Iteratee el m a joinI outer_iter = outer_iter >>= \inner -> Iteratee $ \od oc -> let on_done x _ = od x (Chunk []) on_cont k Nothing = runIter (k (EOF Nothing)) on_done on_cont' on_cont _ (Just e) = runIter (throwErr e) od oc on_cont' _ e = runIter (throwErr (maybe exc_divergent id e)) od oc in runIter inner on_done on_cont {- -- Send EOF to Iteratee and disregard the unconsumed part of the stream run' :: Monad m => Iteratee el m a -> m a run' iter = runIter (joinI (return $$ iter)) >>= check where check (IE_done x _) = return x check (IE_cont _ e) = error $ "control message: " ++ show e -} -- The following is a more optimized implementation run :: Monad m => Iteratee el m a -> m a run iter = runIter iter on_done on_cont where on_done x _ = return x on_cont k Nothing = runIter (k (EOF Nothing)) on_done on_cont' on_cont _ e = error $ "control message: " ++ show e on_cont' _ e = error $ "control message: " ++ show e -- If we need to convert Iteratee in this file to Iteratee in IterateeM.hs -- (so we can do IO incrementally and interleave iteratees) -- We can do so easily. -- The following data type and function is used in test_driver_mux -- near the end of the file. data IterStatus el m a = IS_done a | IS_cont (Stream el -> Iteratee el m a) obtain_status iter = runIter iter (\x _ -> return $ IS_done x) on_cont where on_cont k Nothing = return $ IS_cont k -- Error handling: enough for the test_driver_mux -- It could be generalized if needed. on_cont k e = print ("Iteratee error: " ++ show e) >> return (IS_done undefined) -- ------------------------------------------------------------------------ -- Primitive iteratees -- Read a stream to the end and return all of its elements as a list -- This primitive iteratee is quite useful when writing test cases. {- -- Quoted from IterateeM.hs stream2list :: Monad m => Iteratee el m [el] stream2list = liftI $ step [] where step acc (Chunk []) = IE_cont (retI . step acc) Nothing step acc (Chunk ls) = IE_cont (retI . (step $ acc ++ ls)) Nothing step acc stream = IE_done acc stream -} stream2list :: Monad m => Iteratee el m [el] stream2list = liftI $ step [] where step acc (Chunk []) = liftI (step acc) step acc (Chunk ls) = liftI (step $ acc ++ ls) step acc stream = ie_done acc stream -- Check if the stream is finished or harbors an error is_stream_finished :: Monad m => Iteratee el m (Maybe ErrMsg) is_stream_finished = liftI check where check s@(EOF e) = ie_done (Just $ maybe exc_EOF id e) s check s = ie_done Nothing s -- ------------------------------------------------------------------------ -- Primitive iteratees: parser combinators -- The analogue of List.break -- It takes an el predicate and returns a string of els, -- which is the (possibly empty) prefix of the stream. None of the -- characters in the string satisfy the el predicate. -- If the stream is not terminated, the first el of the remaining -- stream satisfies the predicate {- -- Quoted from IterateeM.hs break cpred = liftI $ step [] where step before (Chunk []) = IE_cont (retI . step before) Nothing step before (Chunk str) = case Prelude.break cpred str of (_,[]) -> IE_cont (retI . step (before ++ str)) Nothing (str,tail) -> IE_done (before ++ str) (Chunk tail) step before stream = IE_done before stream -} break :: Monad m => (el -> Bool) -> Iteratee el m [el] break cpred = liftI $ step [] where step before (Chunk []) = liftI (step before) step before (Chunk str) = case Prelude.break cpred str of (_,[]) -> liftI (step (before ++ str)) (str,tail) -> ie_done (before ++ str) (Chunk tail) step before stream = ie_done before stream -- A particular optimized case of the above: skip all elements of the stream -- satisfying the given predicate -- until the first element -- that does not satisfy the predicate, or the end of the stream. -- This is the analogue of List.dropWhile dropWhile :: Monad m => (el -> Bool) -> Iteratee el m () dropWhile cpred = liftI step where step (Chunk []) = liftI step step (Chunk str) = case Prelude.dropWhile cpred str of [] -> liftI step str -> ie_done () (Chunk str) step stream = ie_done () stream -- Look ahead at the next element of the stream, without removing -- it from the stream. -- Return (Just c) if successful, return Nothing if the stream is -- terminated (by EOF or an error) peek :: Monad m => Iteratee el m (Maybe el) peek = liftI step where step (Chunk []) = peek step s@(Chunk (c:_)) = ie_done (Just c) s step stream = ie_done Nothing stream -- Attempt to read the next element of the stream and return it -- Raise a (recoverable) error if the stream is terminated {- -- Quoted from IterateeM.hs head = liftI step where step (Chunk []) = IE_cont (retI . step) Nothing step (Chunk (c:t)) = IE_done c (Chunk t) step stream = IE_cont (retI . step) (Just (setEOF stream)) -} head :: Monad m => Iteratee el m el head = liftI step where step (Chunk []) = head step (Chunk (c:t)) = ie_done c (Chunk t) step stream = throwRecoverableErr (setEOF stream) step -- Given a sequence of elements, attempt to match them against -- the elements on the stream. Return the count of how many -- elements matched. The matched elements are removed from the -- stream. -- For example, if the stream contains "abd", then (heads "abc") -- will remove the characters "ab" and return 2. heads :: (Eq el, Monad m) => [el] -> Iteratee el m Int heads str = loop 0 str where loop cnt [] = return cnt loop cnt str = liftI (step cnt str) step cnt str (Chunk []) = liftI (step cnt str) step cnt (c:t) s@(Chunk (c':t')) = if c == c' then step (succ cnt) t (Chunk t') else ie_done cnt s step cnt _ stream = ie_done cnt stream -- Skip the rest of the stream skip_till_eof :: Monad m => Iteratee el m () skip_till_eof = liftI step where step (Chunk _) = skip_till_eof step s = ie_done () s -- Skip n elements of the stream, if there are that many -- This is the analogue of List.drop drop :: Monad m => Int -> Iteratee el m () drop 0 = return () drop n = liftI $ step n where step n (Chunk str) | length str < n = liftI (step (n - length str)) step n (Chunk str) = ie_done () (Chunk (Prelude.drop n str)) step _ stream = ie_done () stream -- ------------------------------------------------------------------------ -- Combining the primitive iteratees to solve the running problem: -- Reading headers and the content from an HTTP-like stream type Line = String -- The line of text, terminators are not included -- Read the line of text from the stream -- The line can be terminated by CR, LF or CRLF. -- Return (Right Line) if successful. Return (Left Line) if EOF or -- a stream error were encountered before the terminator is seen. -- The returned line is the string read so far. -- This is a totally high-level Iteratee, built by composing low-level -- ones. It knows nothing about the representation of Iteratees. -- Compare the code below with GHCBufferIO.line_lazy line :: Monad m => Iteratee Char m (Either Line Line) line = break (\c -> c == '\r' || c == '\n') >>= \l -> terminators >>= check l where check l 0 = return $ Left l -- no terminator was found check l _ = return $ Right l terminators = heads "\r\n" >>= \n -> if n == 0 then heads "\n" else return n -- Line iteratees: processors of a stream whose elements are made of Lines -- Collect all read lines and return them as a list -- see stream2list -- Print lines as they are received. This is the first `impure' iteratee -- with non-trivial actions during chunk processing print_lines :: Iteratee Line IO () print_lines = liftI step where step (Chunk []) = liftI step step (Chunk ls) = lift (mapM_ pr_line ls) >> liftI step step (EOF Nothing) = lift (putStrLn ">> natural end") >> ie_done () (EOF Nothing) step stream = lift (putStrLn ">> unnatural end") >> ie_done () stream pr_line line = putStrLn $ ">> read line: " ++ line -- ------------------------------------------------------------------------ -- Enumerators -- Each enumerator takes an iteratee and returns an iteratee: -- an Enumerator is an iteratee transformer. -- The enumerator normally stops when the stream is terminated -- or when the iteratee moves to the done state, whichever comes first. -- When to stop is of course up to the enumerator... -- We have two choices of composition: compose iteratees or compose -- enumerators. The latter is useful when one iteratee -- reads from the concatenation of two data sources. -- We can define Enumerator to be literally an Iteratee transformer: -- type Enumerator el m a = Iteratee el m a -> Iteratee el m a -- See the explanation in IterateeM.hs why that definition is not a good -- idea. That explanation justifies the following type: type Enumerator el m a = Iteratee el m a -> m (Iteratee el m a) -- The most primitive enumerator: applies the iteratee to the terminated -- stream. The result is the iteratee usually in the done state. -- A `good' iteratee must move to the done state upon receiving the EOF enum_eof :: Monad m => Enumerator el m a {- -- Quoted from IterateeM.hs enum_eof = check False where check _ (IE_done x _) = retI $ IE_done x (EOF Nothing) check _ i@(IE_cont _ (Just _)) = retI i check False (IE_cont k Nothing) = k (EOF Nothing) >>== check True check True _ = throwErr exc_divergent -} enum_eof iter = runIter iter on_done on_cont where on_done x _ = return $ ie_done x (EOF Nothing) on_cont k Nothing = runIter (k (EOF Nothing)) on_done on_cont' on_cont k e = return $ ie_cont k e on_cont' k Nothing = return $ throwErr exc_divergent on_cont' k e = return $ ie_cont k e {- enum_eof iter = Iteratee $ \on_done on_cont -> let i_done x _ = on_done x (EOF Nothing) i_cont k Nothing = runIter (k (EOF Nothing)) i_done i_cont' i_cont k e = on_cont k e i_cont' k Nothing = runIter (throwErr exc_divergent) on_done on_cont i_cont' k e = on_cont k e in runIter iter i_done i_cont -} -- Another primitive enumerator: tell the Iteratee the stream terminated -- with an error enum_err :: Monad m => ErrMsg -> Enumerator el m a enum_err e iter = runIter iter on_done on_cont where on_done x _ = return $ ie_done x (EOF (Just e)) on_cont k Nothing = runIter (k (EOF (Just e))) on_done on_cont' on_cont k e = return $ ie_cont k e on_cont' k Nothing = return $ throwErr exc_divergent on_cont' k e = return $ ie_cont k e -- The composition of two enumerators: just the functional composition. -- It is convenient to flip the order of the arguments of the composition -- though: in e1 >>> e2, e1 is executed first. -- This operation is similar to the left-to-right composition in -- Control.Category. -- The composition of enumerators is not exactly (.): we take care -- to force the result of the enumerator e1 before passing it to e2. -- We are thus certain that all effects of enumerating e1 happen before -- the effects of e2. (>>>):: Monad m => Enumerator el m a -> Enumerator el m a -> Enumerator el m a -- (>.) = flip (.) e1 >>> e2 = \i -> e1 i >>= e2 -- The pure 1-chunk enumerator -- It passes a given string to the iteratee in one chunk -- This enumerator does no IO and is useful for testing of base parsing enum_pure_1chunk :: Monad m => [el] -> Enumerator el m a enum_pure_1chunk str iter = runIter iter ie_doneM on_cont where on_cont k Nothing = return (k (Chunk str)) on_cont k e = return $ ie_cont k e -- The pure n-chunk enumerator -- It passes a given string to the iteratee in chunks no larger than n. -- This enumerator does no IO and is useful for testing of base parsing -- and handling of chunk boundaries enum_pure_nchunk :: Monad m => [el] -> Int -> Enumerator el m a enum_pure_nchunk str@(_:_) n iter = runIter iter ie_doneM on_cont where on_cont k Nothing = enum_pure_nchunk s2 n (k (Chunk s1)) where (s1,s2) = splitAt n str on_cont k e = return $ ie_cont k e enum_pure_nchunk _ _ i = return i {- check_if_done (enum_pure_nchunk s2 n) $ runIter iter (Chunk s1) -} -- The enumerator of a POSIX Fd -- Unlike fdRead (which allocates a new buffer on -- each invocation), we use the same buffer all throughout enumeration enum_fd :: Fd -> Enumerator Char IO a {- -- Quoted from IterateeM.hs enum_fd fd iv = Iteratee (allocaBytes (fromIntegral buffer_size) (loop iv)) where -- buffer_size = 4096 buffer_size = 5 -- for tests; in real life, there should be 1024 or so loop (IE_cont k Nothing) = do_read k loop i = \p -> return i do_read k p = do n <- myfdRead fd p buffer_size putStrLn $ "Read buffer, size " ++ either (const "IO err") show n case n of Left errno -> runIter $ k (EOF (Just (toException (ErrorCall "IO error")))) Right 0 -> return $ IE_cont k Nothing Right n -> do str <- peekCAStringLen (p,fromIntegral n) runIter (k (Chunk str)) >>= \iv -> loop iv p -} enum_fd fd iter = do p <- mallocBytes (fromIntegral buffer_size) r <- loop p iter free p return r where -- buffer_size = 4096 buffer_size = 5 -- for tests; in real life, there should be 1024 or so loop p iter = runIter iter ie_doneM (on_cont p) on_cont p k Nothing = do_read k p on_cont p k e = return $ ie_cont k e do_read k p = do n <- myfdRead fd p buffer_size putStrLn $ "Read buffer, size " ++ either (const "IO err") show n case n of Left errno -> return $ k (EOF (Just (toException (ErrorCall "IO error")))) Right 0 -> return $ liftI k Right n -> do str <- peekCAStringLen (p,fromIntegral n) loop p (k (Chunk str)) enum_file :: FilePath -> Enumerator Char IO a enum_file filepath iter = do putStrLn $ "opened file " ++ filepath fd <- openFd filepath ReadOnly Nothing defaultFileFlags r <- enum_fd fd iter closeFd fd putStrLn $ "closed file " ++ filepath return r -- ------------------------------------------------------------------------ -- Stream adapters: Iteratees and Enumerators at the same time -- {- The file IterateeM.hs has justified the following type of Enumeratee: type Enumeratee elo eli m a = IterV eli m a -> Iteratee elo m (IterV eli m a) -} type Enumeratee elo eli m a = Iteratee eli m a -> Iteratee elo m (Iteratee eli m a) -- One of the simplest Enumeratee: the nested stream is a prefix -- of the outer stream exactly n elements long. Such nesting arises -- when several independent streams are concatenated. -- -- Read n elements from a stream and apply the given (nested) iteratee to the -- stream of the read elements. Unless the stream is terminated early, we -- read exactly n elements (even if the iteratee has accepted fewer). take :: Monad m => Int -> Enumeratee el el m a {- -- Quoted from IterateeM.hs take 0 iter@IE_cont{} = return iter take n (IE_done x _) = drop n >> return (IE_done x (Chunk [])) take n (IE_cont _ (Just e)) = drop n >> throwErr e take n (IE_cont k Nothing) = ie_cont (step n k) where step n k (Chunk []) = ie_cont (step n k) step n k chunk@(Chunk str) | length str < n = k chunk >>== take (n - length str) step n k (Chunk str) = k (Chunk s1) >>== \i -> ie_done i (Chunk s2) where (s1,s2) = splitAt n str step n k stream = k stream >>== \i -> ie_done i stream -} take n iter = Iteratee $ \od oc -> runIter iter (on_done od oc) (on_cont od oc) where on_done od oc x _ = runIter (drop n >> (return $ return x)) od oc on_cont od oc k Nothing = if n == 0 then od (liftI k) (Chunk []) else runIter (liftI (step n k)) od oc on_cont od oc _ (Just e) = runIter (drop n >> throwErr e) od oc step n k (Chunk []) = liftI (step n k) step n k chunk@(Chunk str) | length str < n = take (n - length str) $ k chunk step n k (Chunk str) = ie_done (k (Chunk s1)) (Chunk s2) where (s1,s2) = splitAt n str step n k stream = ie_done (k stream) stream -- The analogue of List.take is a simple composition of take above with -- stream2list (see testw2 below). -- Our take is more general though, and seems more useful. -- The general take is needed for the HTTP chunked decoding below -- or -- for any similar decoding where the overall stream is composed of -- chunks/blocks with the known length. Such chunked encodings are -- quite common (e.g., all IFF formats such as TIFF, RIFF, AIFF; ELF, etc.) -- The following pattern appears often in Enumeratee code {-# INLINE enee_check_if_done #-} {- -- Quoted from IterateeM.hs enee_check_if_done:: Monad m => ((Stream el -> Iteratee el m a) -> Iteratee el' m (IterV el m a)) -> IterV el m a -> Iteratee el' m (IterV el m a) enee_check_if_done _ i@IE_done{} = return i enee_check_if_done f (IE_cont k Nothing) = f k enee_check_if_done _ (IE_cont _ (Just e)) = throwErr e -} enee_check_if_done:: Monad m => ((Stream eli -> Iteratee eli m a) -> Iteratee elo m (Iteratee eli m a)) -> Enumeratee elo eli m a enee_check_if_done f inner = Iteratee $ \od oc -> let on_done x s = od (ie_done x s) (Chunk []) on_cont k Nothing = runIter (f k) od oc on_cont _ (Just e) = runIter (throwErr e) od oc in runIter inner on_done on_cont {- -- Map the stream: yet another Enumeratee -- Given the stream of elements of the type elo and the function elo->eli, -- build a nested stream of elements of the type eli and apply the -- given iteratee to it. -- Note the contravariance. -} map_stream :: Monad m => (elo -> eli) -> Enumeratee elo eli m a {- -- Quoted from IterateeM.hs map_stream f = enee_check_if_done (liftI . step) where step k (Chunk []) = liftI (step k) step k (Chunk str) = map_stream f $$ k (Chunk (map f str)) step k s = ie_done (IE_cont k Nothing) s -} map_stream f = enee_check_if_done (liftI . step) where step k (Chunk []) = liftI (step k) step k (Chunk str) = map_stream f $ k (Chunk (map f str)) step k s = ie_done (liftI k) s -- Convert one stream into another, not necessarily in `lockstep' -- The transformer map_stream maps one element of the outer stream -- to one element of the nested stream. The transformer below is more -- general: it may take several elements of the outer stream to produce -- one element of the inner stream. -- The transformation from one stream to the other is specified as -- Iteratee elo m eli. -- This is a generalization for Monad.sequence sequence_stream :: Monad m => Iteratee elo m eli -> Enumeratee elo eli m a sequence_stream fi = enee_check_if_done check where check k = is_stream_finished >>= maybe (step k) (ie_done (liftI k) . EOF . Just) step k = fi >>= \v -> sequence_stream fi $ k (Chunk [v]) -- ------------------------------------------------------------------------ -- Combining the primitive iteratees to solve the running problem: -- Reading headers and the content from an HTTP-like stream -- Convert the stream of characters to the stream of lines, and -- apply the given iteratee to enumerate the latter. -- The stream of lines is normally terminated by the empty line. -- When the stream of characters is terminated, the stream of lines -- is also terminated, abnormally. -- This is the first proper Enumeratee: it is the iteratee of the -- character stream and the enumerator of the line stream. -- More generally, we could have used sequence_stream to implement enum_lines. enum_lines :: Monad m => Enumeratee Char Line m a {- -- Quoted from IterateeM.hs enum_lines = enee_check_if_done (\k -> line >>= check_line k) where check_line k (Right "") = return (IE_cont k Nothing) -- empty line, normal exit check_line k (Right l) = enum_lines $$ k (Chunk [l]) check_line k (Left "") = return $$ k (EOF (Just exc_EOF)) -- abnormal termin check_line k (Left l) = return $$ enum_err exc_EOF $$ k (Chunk [l]) -} enum_lines = enee_check_if_done (\k -> line >>= check_line k) where check_line k (Right "") = return (liftI k) -- empty line, normal exit check_line k (Right l) = enum_lines $ k (Chunk [l]) check_line k (Left "") = return $ k (EOF (Just exc_EOF)) -- abnormal termin check_line k (Left l) = lift $ enum_err exc_EOF $ k (Chunk [l]) -- Convert the stream of characters to the stream of words, and -- apply the given iteratee to enumerate the latter. -- Words are delimited by white space. -- This is the analogue of List.words -- It is instructive to compare the code below with the code of -- List.words, which is: -- words :: String -> [String] -- words s = case dropWhile isSpace s of -- "" -> [] -- s' -> w : words s'' -- where (w, s'') = -- break isSpace s' -- One should keep in mind that enum_words is a more general, monadic -- function. -- More generally, we could have used sequence_stream to implement enum_words. enum_words :: Monad m => Enumeratee Char String m a enum_words = enee_check_if_done (\k -> dropWhile isSpace >> break isSpace >>= check_word k) where check_word k "" = return (liftI k) -- finished check_word k str = enum_words $ k (Chunk [str]) -- HTTP chunk decoding -- Each chunk has the following format: -- -- CRLF CRLF -- -- where is the hexadecimal number; is a -- sequence of bytes. -- The last chunk (so-called EOF chunk) has the format -- 0 CRLF CRLF (where 0 is an ASCII zero, a character with the decimal code 48). -- For more detail, see "Chunked Transfer Coding", Sec 3.6.1 of -- the HTTP/1.1 standard: -- http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1 -- The following enum_chunk_decoded has the signature of the enumerator -- of the nested (encapsulated and chunk-encoded) stream. It receives -- an iteratee for the embedded stream and returns the iteratee for -- the base, embedding stream. Thus what is an enumerator and what -- is an iteratee may be a matter of perspective. -- We have a decision to make: Suppose an iteratee has finished (either because -- it obtained all needed data or encountered an error that makes further -- processing meaningless). While skipping the rest of the stream/the trailer, -- we encountered a framing error (e.g., missing CRLF after chunk data). -- What do we do? We chose to disregard the latter problem. -- Rationale: when the iteratee has finished, we are in the process -- of skipping up to the EOF (draining the source). -- Disregarding the errors seems OK then. -- Also, the iteratee may have found an error and decided to abort further -- processing. Flushing the remainder of the input is reasonable then. -- One can make a different choice... -- The code is almost identical to that in IterateeM.hs enum_chunk_decoded :: Monad m => Enumeratee Char Char m a enum_chunk_decoded iter = read_size where read_size = break (== '\r') >>= checkCRLF iter . check_size checkCRLF iter m = do n <- heads "\r\n" if n == 2 then m else frame_err (exc "Bad Chunk: no CRLF") iter check_size "0" = checkCRLF iter (return iter) check_size str@(_:_) = maybe (frame_err (exc ("Bad chunk size: " ++ str)) iter) read_chunk $ read_hex 0 str check_size _ = frame_err (exc "Error reading chunk size") iter read_chunk size = take size iter >>= \r -> checkCRLF r $ enum_chunk_decoded r read_hex acc "" = Just acc read_hex acc (d:rest) | isHexDigit d = read_hex (16*acc + digitToInt d) rest read_hex acc _ = Nothing exc msg = toException (ErrorCall $ "Chunk decoding exc: " ++ msg) -- If the processing is restarted, we report the frame error to the inner -- Iteratee, and exit frame_err e iter = throwRecoverableErr (exc "Frame error") (\_ -> lift $ enum_err e iter) -- ------------------------------------------------------------------------ -- Primitive Tests test_iteratee = do drop 1 v1 <- head drop 2 v2 <- head return (v1,v2) testt1 = runIdentity $ enum_pure_nchunk "abcde" 5 test_iteratee >>= run -- ('b','e') testt2 = runIdentity $ enum_pure_nchunk "abcde" 2 test_iteratee >>= run -- ('b','e') -- Here we test passing `the state of parsing' from one enumerator -- to the other testt3 = runIdentity $ (enum_pure_nchunk "abc" 2 >>> enum_pure_nchunk "de" 1) test_iteratee >>= run -- ('b','e') -- ------------------------------------------------------------------------ -- Tests exc_IOErr = toException $ ErrorCall "IO Error" -- Pure tests, requiring no IO test_str1 = "header1: v1\rheader2: v2\r\nheader3: v3\nheader4: v4\n" ++ "header5: v5\r\nheader6: v6\r\nheader7: v7\r\n\nrest\n" read_lines_and_one_more_line = do lines <- joinI $ enum_lines stream2list after <- line return (lines,after) with_err iter = do v <- iter e <- is_stream_finished return (v,e) testp1 = let (lines,rest) = runIdentity (run =<< enum_pure_1chunk test_str1 read_lines_and_one_more_line) in lines == ["header1: v1","header2: v2","header3: v3","header4: v4", "header5: v5","header6: v6","header7: v7"] && rest == Right "rest" testp2 = let (lines,rest) = runIdentity (run =<< enum_pure_nchunk test_str1 5 read_lines_and_one_more_line) in lines == ["header1: v1","header2: v2","header3: v3","header4: v4", "header5: v5","header6: v6","header7: v7"] && rest == Right "rest" testw1 = let test_str = "header1: v1\rheader2: v2\r\nheader3:\t v3" expected = ["header1:","v1","header2:","v2","header3:","v3"] in let run_test test_str = runIdentity (run =<< enum_pure_nchunk test_str 5 (joinI (enum_words stream2list))) in and [run_test test_str == expected, run_test (test_str ++ " ") == expected] testw2 = let test_str = "header1: v1\rheader2: v2\r\nheader3:\t v3" expected = ["header1:","v1","header2:","v2","header3:"] in -- The analogue of List.take let take_as_list n = joinI $ take n stream2list in let run_test test_str = runIdentity (run =<< enum_pure_nchunk test_str 5 (joinI (enum_words $ take_as_list 5))) in run_test test_str == expected -- Test Fd driver test_driver line_collector filepath = do fd <- openFd filepath ReadOnly Nothing defaultFileFlags putStrLn "About to read headers" result <- run =<< enum_fd fd read_lines_and_one_more_line closeFd fd putStrLn "Finished reading headers" case result of ((headers,Nothing),after) -> do putStrLn $ "The line after headers is: " ++ show after putStrLn "Complete headers" print headers ((headers,Just e),after) -> do putStrLn $ "Problem " ++ show e putStrLn "Incomplete headers" print headers where read_lines_and_one_more_line = do lines <- joinI $ enum_lines line_collector e <- is_stream_finished after <- line return ((lines,e),after) -- Complete headers, up to "header7: v7" test11 = test_driver stream2list "test1.txt" -- The same test12 = test_driver stream2list "test2.txt" -- "header3: v3", then EOF test13 = test_driver stream2list "test3.txt" -- Incomplete headers [], EOF test14 = test_driver stream2list "/dev/null" test21 = test_driver print_lines "test1.txt" test22 = test_driver print_lines "test2.txt" test23 = test_driver print_lines "test3.txt" test24 = test_driver print_lines "/dev/null" -- Run the complete test, reading the headers and the body -- This simple iteratee is used to process a variety of streams: -- embedded, interleaved, etc. line_printer = enum_lines print_lines -- Two sample processors -- Read the headers, print the headers, read the lines of the chunk-encoded -- body and print each line as it has been read read_headers_print_body = do headers <- with_err . joinI $ enum_lines stream2list case headers of (headers, Nothing) -> lift $ do putStrLn "Complete headers" print headers (headers, Just err) -> lift $ do putStrLn $ "Incomplete headers due to " ++ show err print headers lift $ putStrLn "\nLines of the body follow" joinI $ enum_chunk_decoded line_printer -- Read the headers and print the header right after it has been read -- Read the lines of the chunk-encoded body and print each line as -- it has been read print_headers_print_body = do lift $ putStrLn "\nLines of the headers follow" line_printer lift $ putStrLn "\nLines of the body follow" joinI $ enum_chunk_decoded line_printer test_driver_full iter filepath = do fd <- openFd filepath ReadOnly Nothing defaultFileFlags putStrLn "About to read headers" run =<< enum_fd fd iter closeFd fd putStrLn "Finished reading" test31 = test_driver_full read_headers_print_body "test_full1.txt" {- Complete headers ["header1: v1","header2: v2","header3: v3","header4: v4"] Problem Just "EOF" Incomplete body ["body line 1","body line 2","body line 3","body line 4"] -} test32 = test_driver_full read_headers_print_body "test_full2.txt" -- *** Exception: control message: Just Chunk decoding exc: Frame error test33 = test_driver_full read_headers_print_body "test_full3.txt" {- Complete headers ["header1: v1","header2: v2","header3: v3","header4: v4"] Problem Just "EOF" Incomplete body ["body line 1","body line 2","body line 3","body line 4","body line 5"] -} test34 = test_driver_full print_headers_print_body "test_full3.txt" -- Interleaved reading from two descriptors using select -- -- If the two arguments are the names of regular files, the driver -- does simple round-robin interleaving, reading a block from one -- file and a block from the other file. If the arguments name -- pipes or devices, the reading becomes truly supply-driven. -- We use select for multiplexing. -- The first argument is the reader-iteratee. It is exactly -- the same iteratee that is being used in the `sequential' tests above. -- By design, two Fds are being read independently and in parallel, -- closely emulating two OS processes each reading from their own file. -- The code below is a simple, round-robin OS scheduler. test_driver_mux iter fpath1 fpath2 = do fd1 <- openFd fpath1 ReadOnly Nothing defaultFileFlags fd2 <- openFd fpath2 ReadOnly Nothing defaultFileFlags let fds = [fd1,fd2] putStrLn $ "Opened file descriptors: " ++ show fds mapM (\(fd,reader) -> obtain_status reader >>= return . ((,) fd)) (zip fds (repeat iter)) >>= allocaBytes (fromIntegral buffer_size) . loop mapM_ closeFd fds putStrLn $ "Closed file descriptors. All done" where -- we use one single IO buffer for reading buffer_size = 5 -- for tests; in real life, there should be 1024 or so loop fjque buf = do let fds = get_fds fjque if null fds then return () else do selected <- select'read'pending fds case selected of Left errno -> putStrLn "IO Error" >> tell_iteratee_err exc_IOErr fjque >> return () Right [] -> loop fjque buf Right sel -> process buf sel fjque -- get Fds from the jobqueue for the unfinished iteratees get_fds = foldr (\ (fd,ist) acc -> case ist of {IS_cont _ -> fd:acc; _ -> acc}) [] -- find the first ready jobqueue element, -- that is, the job queue element whose Fd is in selected. -- Return the element and the rest of the queue get_ready selected jq = (e, before ++ after) where (before,e:after) = Prelude.break (\(fd,_) -> fd `elem` selected) jq process buf selected fjque = do let ((fd,IS_cont step),fjrest) = get_ready selected fjque n <- myfdRead fd buf buffer_size putStrLn $ unwords ["Read buffer, size", either (const "IO err") show n, "from fd", show fd] case n of Left errno -> obtain_status (step (EOF (Just exc_IOErr))) >> loop fjrest buf Right 0 -> obtain_status (step (EOF Nothing)) >> loop fjrest buf Right n -> do str <- peekCAStringLen (buf,fromIntegral n) im <- obtain_status $ step (Chunk str) loop (fjrest ++ [(fd,im)]) buf -- round-robin tell_iteratee_err err = mapM_ (\ (_,IS_cont k) -> enum_err err (liftI k)) -- Running these tests shows true interleaving, of reading from the -- two file descriptors and of printing the results. All IO is interleaved, -- and yet it is safe. No unsafe operations are used. testm1 = test_driver_mux line_printer "test1.txt" "test3.txt" testm2 = test_driver_mux print_headers_print_body "test_full2.txt" "test_full3.txt"