-- Haskell98! -- Pure Iteratees and parsing combinators -- The running example, parts 1 and 2 -- Part 1 is reading the headers, the sequence of lines terminated by an -- empty line. Each line is terminated by CR, LF, or CRLF. -- We should return the headers in order. In the case of error, -- we should return the headers read so far and the description of the error. -- Part 2 is reading the headers and reading all the lines from the -- HTTP-chunk-encoded content that follows the headers. Part 2 thus -- verifies layering of streams, and processing of one stream -- embedded (chunk encoded) into another stream. module Iteratee where import System.Posix import Foreign.C import Foreign.Ptr import Foreign.Marshal.Alloc import Data.List (splitAt) import Prelude hiding (head, drop, dropWhile, take, break, catch) import qualified Prelude import Control.Monad import Data.Char (isHexDigit, digitToInt) import Control.Monad.Identity import Control.Exception import Data.Typeable import LowLevelIO -- A stream is a (continuing) sequence of elements bundled in Chunks. -- The first variant means no more data will be coming: the stream -- is exhausted, either due to EOF or some error. -- Chunk str gives the currently available part of the stream: -- The stream is not terminated yet. -- The case (Chunk "") signifies a stream with no currently available -- data but which is still continuing. A stream processor should, -- informally speaking, ``suspend itself'' and wait for more data -- to arrive. type ErrMsg = SomeException data Stream = EOF (Maybe ErrMsg) | Chunk String deriving Show empty_stream = Chunk "" -- Iteratee -- a generic stream processor, what is being folded over -- a stream -- Iteratee exists in one of the three states: -- -- Done state: the iteratee has computed the result -- and doesn't want any more data; -- -- More data state, holding the continuation (the `step function') -- to process a chunk of data and yield a new state; -- -- A message to the stream producer (e.g., to rewind the stream) -- or an error indication. -- We assume that all iteratees are `good' -- given bounded input, -- they do the bounded amount of computation and take the bounded amount -- of resources. -- We also assume that given a terminated stream, an iteratee -- moves to the done state, so the results computed so far could be returned. -- The basic design suggests several implementations. Here is the latest: data Iteratee a = IE_done a | IE_cont (Maybe ErrMsg) (Stream -> (Iteratee a,Stream)) -- It is quite obvious that the second argument of IE_cont is -- the State monad over stream: the step function consumes -- (part of) the stream, computes the new state and returns -- the unconsumed part of the stream. -- The first argument to IE_cont, if non-empty, is the error indicator, -- or a control message in general. -- Since we carry the continuation anyway, the exception/error -- is generally recoverable: we get resumable exceptions by default. -- Indeed, the EOF error is essentially a control message, a request -- for more data. If more data do become available, the error can be recovered -- from. -- It turns out, Iteratee forms a monad. That should not be too surprising -- given that the step function does look like the State monad, and -- IE_cont is like the `operating system call' to read in another chunk. -- So, Iteratee is a State + Cont monad (with a back-trackable state). -- We can use the familiar `do' notation for composing Iteratees. instance Monad Iteratee where return = IE_done IE_done a >>= f = f a IE_cont e k >>= f = IE_cont e (docase . k) where docase (IE_done a, stream) = case f a of IE_cont Nothing k -> k stream i -> (i,stream) docase (i, s) = (i >>= f, s) -- It is easy to see that the monad laws are satisfied {- The second design data Iteratee a = IE_done a Stream | IE_cont (Stream -> Iteratee a) (Maybe ErrMsg) had a drawback: a Done Iteratee returned by an Enumerator could contain the ``rest of the stream''. This is meaningless since a stream is produced by the enumerator and is the resource of the enumerator, disposed at the end of the traversal. We should not leak any of enumerator's state and should return Chunk [] as the second argument of the IE_done constructor. However, we could not enforce that requirement. Another drawback was a more complex Monad instance, shown below. -- The second argument to IE_done is the remaining part of the stream; -- it should really be the remaining part of the chunk that the Iteratee -- has received earlier. We can enforce this linearity restriction using -- the rank-2 types. If the stream chunk is a string or any array for that -- matter, we can write just IE_done a String; the empty array indicates -- the absence of the remainder. instance Monad Iteratee where return x = IE_done x (Chunk "") IE_done x (Chunk "") >>= f = f x IE_done x stream >>= f = case f x of IE_done y _ -> IE_done y stream IE_cont k Nothing -> k stream i -> i IE_cont k e >>= f = IE_cont ((>>= f) . k) e -- Justification for the case IE_done x s >>= f: -- if (f x) is IE_done x s', then s' must be (Chunk "") because an -- iteratee must always return the remainder of the received input. -- Since (f x) returned IE_done without reading any input, the rest -- of the stream s remains unconsumed. -- If the linearity of the stream is enforced statically, -- this return value of f x is likewise enforced. -- If f x returns a control message, the remainder of the input s -- is disregarded: the input buffer is flushed before the control. -} -- Throw an irrecoverable error throwErr :: ErrMsg -> Iteratee a throwErr e = IE_cont (Just e) (\s -> (throwErr e, s)) -- Throw a recoverable error throwRecoverableErr :: ErrMsg -> (Stream -> (Iteratee a,Stream)) -> Iteratee a throwRecoverableErr e k = IE_cont (Just e) k -- Send EOF to Iteratee and disregard the unconsumed part of the stream run :: Iteratee a -> a run (IE_done x) = x run (IE_cont Nothing k) = check . fst $ k (EOF Nothing) where check (IE_done x) = x check (IE_cont e _) = error $ "possibly divergent, control message: " ++ show e run (IE_cont (Just e) _) = error $ "control message: " ++ show e {- Here is the third implementation newtype Iteratee a = Iteratee{runIter:: Stream -> IterV a} data IterV a = IE_done a Stream | IE_cont (Iteratee a) (Maybe ErrMsg) instance Monad Iteratee where return x = Iteratee (IE_done x) Iteratee m >>= f = Iteratee $ \s -> case m s of IE_done x s -> runIter (f x) s IE_cont m e -> IE_cont (m >>= f) e -- See IterateeM.hs for the discussion of its drawbacks. -- Throw an unrecoverable error throwErr :: ErrMsg -> Iteratee a throwErr e = Iteratee (\s -> IE_cont (throwErr e) (Just e)) -- Throw a recoverable error throwRecoverableErr :: ErrMsg -> Iteratee a -> Iteratee a throwRecoverableErr e i = Iteratee (\_ -> IE_cont i (Just e)) -- Send EOF to Iteratee and disregard the unconsumed part of the stream run :: Iteratee a -> a run iter = case runIter iter (EOF Nothing) of IE_done x _ -> x IE_cont _ e -> error $ "control message: " ++ show e -} -- Produce the EOF error message to be passed to throwErr. -- If the stream was terminated because of an error, keep the original -- error message. setEOF :: Stream -> ErrMsg setEOF (EOF (Just e)) = e setEOF _ = exc_EOF exc_EOF = toException $ ErrorCall "EOF" exc_divergent = toException $ ErrorCall "divergent iteratee" {- tryIter :: Iteratee a -> Iteratee (Either ErrMsg a) tryIter (Iteratee m) = Iteratee $ \s -> case m s of IE_done x s -> IE_done (Right x) s IE_cont m Nothing -> IE_cont (tryIter m) Nothing IE_cont _ (Just e) -> IE_cont (Iteratee (IE_done (Left e))) Nothing -- The last line could have been like the one below: -- IE_cont _ (Just e) -> IE_done (Left e) s -- That amounts to back-tracking on the stream. By default, we avoid any -- backtracking, as a possible memory leak. Also, the stream in question -- may be a linear stream and backtracking is not possible. -- If one wishes to use a back-tracking parser, one has to transform -- the stream into the one that permits backtracking and has the backing -- storage for that. -} -- ------------------------------------------------------------------------ -- Primitive iteratees: parser combinators -- Useful combinators for implementing iteratees and enumerators ie_cont :: (Stream -> (Iteratee a, Stream)) -> Iteratee a ie_cont = IE_cont Nothing ie_contM :: (Stream -> (Iteratee a, Stream)) -> (Iteratee a, Stream) ie_contM k = (IE_cont Nothing k, empty_stream) -- The analogue of List.break -- It takes a character predicate and returns a string of characters, -- which is the (possibly empty) prefix of the stream. None of the -- characters in the string satisfy the character predicate. -- If the stream is not terminated, the first character of the remaining -- stream satisfies the predicate break :: (Char -> Bool) -> Iteratee String break cpred = ie_cont (step "") where step before (Chunk "") = ie_contM (step before) step before (Chunk str) = case Prelude.break cpred str of (_,"") -> ie_contM (step (before ++ str)) (str,tail) -> (IE_done (before ++ str), (Chunk tail)) step before stream = (IE_done before, stream) -- Look ahead at the next element of the stream, without removing -- it from the stream. -- Return (Just c) if successful, return Nothing if the stream is -- terminated (by EOF or an error) peek :: Iteratee (Maybe Char) peek = ie_cont step where step (Chunk "") = (peek, empty_stream) step s@(Chunk (c:_)) = (IE_done (Just c), s) step stream = (IE_done Nothing, stream) -- Attempt to read the next element of the stream and return it -- Raise a (recoverable) error if the stream is terminated head :: Iteratee Char head = ie_cont step where step (Chunk "") = (head, empty_stream) step (Chunk (c:t)) = (IE_done c, (Chunk t)) step stream = (IE_cont (Just (setEOF stream)) step, stream) -- Given a sequence of characters, attempt to match them against -- the characters on the stream. Return the count of how many -- characters matched. The matched characters are removed from the -- stream. -- For example, if the stream contains "abd", then (heads "abc") -- will remove the characters "ab" and return 2. heads :: String -> Iteratee Int heads str = loop 0 str where loop cnt "" = return cnt loop cnt str = ie_cont (step cnt str) step cnt str (Chunk "") = (loop cnt str, empty_stream) step cnt (c:t) s@(Chunk (c':t')) = if c == c' then step (succ cnt) t (Chunk t') else (IE_done cnt, s) step cnt _ stream = (IE_done cnt, stream) -- Skip the rest of the stream skip_till_eof :: Iteratee () skip_till_eof = ie_cont step where step (Chunk _) = (skip_till_eof, empty_stream) step s = (IE_done (), s) -- Skip n elements of the stream, if there are that many -- This is the analogue of List.drop drop :: Int -> Iteratee () drop 0 = return () drop n = ie_cont step where step (Chunk str) | length str <= n = (drop (n - length str), empty_stream) step (Chunk str) = (IE_done (), (Chunk (Prelude.drop n str))) step stream = (IE_done (), stream) -- Check if the stream is finished is_finished :: Iteratee (Maybe ErrMsg) is_finished = ie_cont check where check (Chunk "") = (is_finished, empty_stream) check s@(EOF e) = (IE_done (Just $ setEOF s), s) check s = (IE_done Nothing, s) -- ------------------------------------------------------------------------ -- Combining the primitive iteratees to solve the running problem: -- reading the headers and the content from an HTTP-like stream type Line = String -- The line of text, terminators are not included -- Read a sequence of lines from the stream up to the empty line -- The line can be terminated by CR, LF or CRLF -- or by EOF or stream error. -- Return the read lines, in order, not including the terminating empty line -- Upon EOF or stream error, return the complete, terminated lines accumulated -- so far. -- Compare the code below with GHCBufferIO.line_lazy read_lines :: Iteratee (Either [Line] [Line]) read_lines = lines' [] where lines' acc = break (\c -> c == '\r' || c == '\n') >>= \l -> terminators >>= check acc l check acc _ 0 = return . Left . reverse $ acc -- no terminator was found check acc "" _ = return . Right . reverse $ acc check acc l _ = lines' (l:acc) terminators = heads "\r\n" >>= \n -> if n == 0 then heads "\n" else return n -- ------------------------------------------------------------------------ -- Enumerators -- Each enumerator takes an iteratee and returns an iteratee: -- an Enumerator is an iteratee transformer. -- The enumerator normally stops when the stream is terminated -- or when the iteratee moves to the done state, whichever comes first. -- When to stop is of course up to the enumerator... -- It is typical for an enumerator to disregard the remaining-stream -- part of the state of the Iteratee computations. Some enumerators -- may use this remaining stream data to report the location of an error -- in stream terms, for example. -- We have two choices of composition: compose iteratees or compose -- enumerators. The latter is useful when one iteratee -- reads from the concatenation of two data sources. type Enumerator a = Iteratee a -> Iteratee a -- In the present file, Iteratees don't have any effects. Enumerators may: -- they may need to read from a file. Hence we need the following type. -- In the main library IterateeM.hs, both Iteratees and Enumerators -- could be effectful, and we don't need two types of Enumerators type EnumeratorM m a = Iteratee a -> m (Iteratee a) -- The most primitive enumerator: applies the iteratee to the terminated -- stream. The result is the iteratee usually in the done state. -- A `good' iteratee must move to the done state upon receiving the EOF -- It is almost the same as 'run' -- modulo error handling -- The two enumerators show a pattern that will occur again and again -- (see the GUIDELINES in IterateeM.hs) enum_eof :: Enumerator a enum_eof (IE_cont Nothing k) = check . fst $ k (EOF Nothing) where check i@IE_done{} = i check i@(IE_cont (Just _) _) = i check _ = throwErr exc_divergent enum_eof i = i -- Another primitive enumerator: tell the Iteratee the stream terminated -- with an error enum_err :: ErrMsg -> Enumerator a enum_err e (IE_cont Nothing k) = check . fst $ k (EOF (Just e)) where check i@IE_done{} = i check i@(IE_cont (Just _) _) = i check _ = throwErr exc_divergent enum_err _ i = i -- The composition of two enumerators: just the functional composition -- or its monadic alternative. -- It is convenient to flip the order of the arguments of the composition -- though: in e1 >>> e2, e1 is executed first -- The operator (>>>) for left-to-right composition is also defined in -- Control.Category (>>>):: Enumerator a -> Enumerator a -> Enumerator a (>>>) = flip (.) -- The pure 1-chunk enumerator -- It passes a given string to the iteratee in one chunk -- This enumerator does no IO and is useful for testing of base parsing enum_pure_1chunk :: String -> Enumerator a enum_pure_1chunk str (IE_cont Nothing k) = fst $ k (Chunk str) enum_pure_1chunk _ iter = iter -- The pure n-chunk enumerator -- It passes a given string to the iteratee in chunks no larger than n. -- This enumerator does no IO and is useful for testing of base parsing -- and handling of chunk boundaries enum_pure_nchunk :: String -> Int -> Enumerator a enum_pure_nchunk str@(_:_) n (IE_cont Nothing k) = enum_pure_nchunk s2 n . fst $ k (Chunk s1) where (s1,s2) = splitAt n str enum_pure_nchunk _ _ iter = iter -- The enumerator of a POSIX Fd -- Unlike fdRead (which allocates a new buffer on -- each invocation), we use the same buffer all throughout enum_fd :: Fd -> EnumeratorM IO a enum_fd fd iter = allocaBytes (fromIntegral buffer_size) (loop iter) where buffer_size = 5 -- for tests; in real life, there should be 1024 or so loop (IE_cont Nothing k) = do_read k loop iter = \p -> return iter do_read k p = do n <- myfdRead fd p buffer_size case n of Left errno -> return . fst $ k (EOF (Just (toException (ErrorCall "IO error")))) Right 0 -> return $ ie_cont k Right n -> do str <- peekCAStringLen (p,fromIntegral n) loop (fst $ k (Chunk str)) p -- ------------------------------------------------------------------------ -- Stream adapters: Iteratees and Enumerators at the same time -- -- Stream adapters, or Enumeratees, handle nested (or, encapsulated) streams. -- Stream nesting is rather common: buffering, character encoding, compression, -- encryption, SSL are all examples of stream nesting. On one -- hand, an Enumeratee is an Enumerator of a nested stream: -- it takes an iteratee for a nested stream, feeds its some data, -- returning the resulting iteratee when the nested stream is finished -- or when the iteratee is done. On the other hand, an Enumeratee -- in an Iteratee for the outer stream, taking data from the parent -- enumerator. -- One can view an Enumeratee as a AC/DC or voltage converter, or as -- a `vertical' composition of iteratees (compared to monadic bind, which -- plumbs two iteratees `horizontally') -- -- In the trivial case (e.g., Word8 to Char conversion), one element -- of the output stream is mapped to one element of the nested stream. -- Generally, we may need to read several elements from the outer stream to -- produce one element for the nested stream. Sometimes we can produce -- several nested stream elements from a single outer stream element. -- -- That many-to-many correspondence between the outer and nested streams -- brings a complication. Suppose that the enumeratee has received EOF on -- its, that is, the outer stream. The enumeratee, as the outer iteratee, -- must move to the Done state. Yet the nested iteratee is not finished. -- The enumeratee then has to return the nested iteratee as its result. type Enumeratee a = Iteratee a -> Iteratee (Iteratee a) -- One of the simplest Enumeratee: the nested stream is a prefix -- of the outer stream exactly n elements long. Such nesting arises -- when several independent streams are concatenated. -- -- Read n elements from a stream and apply the given (nested) iteratee to the -- stream of the read elements. Unless the stream is terminated early, we -- read exactly n elements (even if the iteratee has accepted fewer). -- The latter property implies that -- take n i1 >> take m i2 /= take (n+m) (i1 >> i2) -- -- which should not surprise us since we already have -- atomically (m1 >> m2) /= atomically m1 >> atomically m2 -- or -- round (x1 + x2) /= round x1 + round x2 take :: Int -> Enumeratee a take 0 iter@IE_cont{} = return iter take n (IE_cont Nothing k) = ie_cont (step n k) where step n k (Chunk "") = ie_contM (step n k) step n k chunk@(Chunk str) | length str < n = (take (n - length str) . fst $ k chunk, empty_stream) step n k (Chunk str) = (IE_done (fst $ k (Chunk s1)), (Chunk s2)) where (s1,s2) = splitAt n str step n k stream = (IE_done (fst $ k stream), stream) take n iter = drop n >> return iter -- HTTP chunk decoding -- Each chunk has the following format: -- -- CRLF CRLF -- -- where is the hexadecimal number; is a -- sequence of bytes. -- The last chunk (so-called EOF chunk) has the format -- 0 CRLF CRLF (where 0 is an ASCII zero, a character with the decimal code 48). -- For more detail, see "Chunked Transfer Coding", Sec 3.6.1 of -- the HTTP/1.1 standard: -- http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html#sec3.6.1 -- We have a decision to make: Suppose an iteratee has finished (either because -- it obtained all needed data or encountered an error that makes further -- processing meaningless). While skipping the rest of the stream/the trailer, -- we encountered a framing error (e.g., missing CRLF after chunk data). -- What do we do? Earlier, we chose to disregard the problem. -- Rationale: when the iteratee has finished, we are in the process -- of skipping up to the EOF (draining the source). -- Disregarding the errors seems OK then. -- Also, the iteratee may have found an error and decided to abort further -- processing. Flushing the remainder of the input is reasonable then. -- Upon further consideration, I reversed the earlier decision: -- if we detected a framing error, we can't trust the rest of the stream -- We can't skip till the EOF chunk as we aren't even sure we can -- recognize the EOF chunk any more. -- So, we just report the _recoverable_ error upstream: -- the recovery will be to report the accumulated nested iteratee. enum_chunk_decoded :: Enumeratee a enum_chunk_decoded iter = read_size where read_size = break (== '\r') >>= checkCRLF iter . check_size checkCRLF iter m = do n <- heads "\r\n" if n == 2 then m else frame_err (exc "Bad Chunk: no CRLF") iter check_size "0" = checkCRLF iter (return iter) check_size str@(_:_) = maybe (frame_err (exc ("Bad chunk size: " ++ str)) iter) read_chunk $ read_hex 0 str check_size _ = frame_err (exc "Error reading chink size") iter read_chunk size = take size iter >>= \r -> checkCRLF r $ enum_chunk_decoded r read_hex acc "" = Just acc read_hex acc (d:rest) | isHexDigit d = read_hex (16*acc + digitToInt d) rest read_hex acc _ = Nothing frame_err e iter = throwRecoverableErr (exc "Frame error") (\s -> (return (enum_err e iter), s)) exc msg = toException (ErrorCall $ "Chunk decoding exc: " ++ msg) -- ------------------------------------------------------------------------ -- Tests -- Pure tests, requiring no IO test_str1 = "header1: v1\rheader2: v2\r\nheader3: v3\nheader4: v4\n" ++ "header5: v5\r\nheader6: v6\r\nheader7: v7\r\n\nrest\n" read_lines_rest :: Iteratee (Either [Line] [Line], String) read_lines_rest = do ls <- read_lines rest <- break (const False) return (ls,rest) testp1 = let (Right lines, rest) = run $ enum_pure_1chunk test_str1 read_lines_rest in lines == ["header1: v1","header2: v2","header3: v3","header4: v4", "header5: v5","header6: v6","header7: v7"] && rest == "rest\n" testp2 = let (Right lines, rest) = run $ enum_pure_nchunk test_str1 5 read_lines_rest in lines == ["header1: v1","header2: v2","header3: v3","header4: v4", "header5: v5","header6: v6","header7: v7"] && rest == "rest\n" -- Test Fd driver test_driver filepath = do fd <- openFd filepath ReadOnly Nothing defaultFileFlags putStrLn "About to read headers" result <- fmap run $ enum_fd fd read_lines_and_one_more_line closeFd fd putStrLn "Finished reading headers" case result of (Right headers,after,_) -> do putStrLn $ "The line after headers is: " ++ show after putStrLn "Complete headers" print headers (Left headers,_,status) -> do putStrLn $ "Problem " ++ show status putStrLn "Incomplete headers" print headers where read_lines_and_one_more_line = do lines <- read_lines after <- break (\c -> c == '\r' || c == '\n') status <- is_finished return (lines,after,status) test11 = test_driver "test1.txt" -- Complete headers, up to "header7: v7" test12 = test_driver "test2.txt" -- The same test13 = test_driver "test3.txt" -- "header3: v3", then EOF test14 = test_driver "/dev/null" -- Incomplete headers [], EOF -- Run the complete test, reading the headers and the body test_driver_full filepath = do fd <- openFd filepath ReadOnly Nothing defaultFileFlags putStrLn "About to read headers" result <- fmap run $ enum_fd fd read_headers_body closeFd fd putStrLn "Finished reading" case result of (Right headers,Right body,_) -> do putStrLn "Complete headers" print headers putStrLn "\nComplete body" print body (Left headers,_,status) -> do putStrLn $ "Problem " ++ show status putStrLn "Incomplete headers" print headers (Right headers,Left body,status) -> do putStrLn "Complete headers" print headers putStrLn $ "Problem " ++ show status putStrLn "Incomplete body" print body where read_headers_body = do headers <- read_lines body <- return . run =<< enum_chunk_decoded read_lines status <- is_finished return (headers,body,status) test21 = test_driver_full "test_full1.txt" {- Complete headers ["header1: v1","header2: v2","header3: v3","header4: v4"] Problem Just EOF Incomplete body ["body line 1","body line 2","body line 3","body line 4"] -} test22 = test_driver_full "test_full2.txt" -- *** Exception: control message: Just "Frame error" test23 = test_driver_full "test_full3.txt" {- Complete headers ["header1: v1","header2: v2","header3: v3","header4: v4"] Problem Just EOF Incomplete body ["body line 1","body line 2","body line 3","body line 4","body line 5"] -} -- LocalWords: Enumeratee enumeratee