-- Another sequence of simple examples demonstrating Iteratees -- in comparison with Lazy and Handle IO -- -- We implement progressively more complex examples -- by freely combining independently written, primitive iteratees. -- We stress assembling of the whole program from independent -- building blocks, replacing and reshuffling the blocks to -- suit the task. -- -- Each primitive block may be stateful; it encapsulates and manages -- its state without leaking it. The overall program state -- -- composed of the state of the participating building blocks -- -- is left implicit. We never have to worry about initializing -- the state at the beginning of the program; each iteratee -- will take care of the initialization, accumulation and -- finalization of its own state. -- -- The examples revolve around reading text data and -- counting specific words and whitespace. The text may -- contain a terminator sequence. We should stop immediately -- as we recognized the sequence. We should not read even -- a single character after the terminator. (If the input -- comes from a pipe, reading an extra character may lead -- to a deadlock.) -- -- To compile this code -- ghc --make -O2 -rtsopts -main-is IterDemo1.twsthei1 IterDemo1.hs -- To run this code -- GHCRTS="-tstderr" /usr/bin/time ./IterDemo1 module IterDemo1 where import System.IO import System.IO.Error (isEOFError) import Data.Char (isSpace) import Data.List (isPrefixOf) import Control.Exception (try, throw, IOException, fromException, bracket) import Control.Monad.Trans (lift) import IterateeM as IM -- ------------------------------------------------------------------------ -- We start with a trivial example: counting white space -- characters in a file. -- Lazy IO example -- Standard pattern-matching on a string -- (see for a better code below) countWS_lazy :: String -> Int countWS_lazy "" = 0 countWS_lazy (c:str) | isSpace c = 1 + countWS_lazy str countWS_lazy (_:str) = countWS_lazy str -- The complete example, counting white-space characters in a file run_countWSL fname = readFile fname >>= print . countWS_lazy twsl = run_countWSL "/etc/motd" -- 190 -- Handle-based IO -- We now differentiate EOF from other IO errors: -- we could not do that with the Lazy IO. -- The code is much more explicit -- with error handling and -- ensuring that the handle is always closed -- -- but is still quite simple. countWS_handle :: Handle -> IO Int countWS_handle h = loop 0 where loop n = try (hGetChar h) >>= check n check n (Right c) = loop (if isSpace c then n+1 else n) check n (Left e) | Just ioe <- fromException e, isEOFError ioe = return n check _ (Left e) = throw e run_countWSH fname = bracket (openFile fname ReadMode) hClose $ \h -> countWS_handle h >>= print twsh = run_countWSH "/etc/motd" -- The Iteratee IO -- We use pattern-matching on the stream, like in the Lazy IO case. -- The stream is implicit here. The code is tail-recursive. -- Again, this is not the best code we can write: see below for -- a better version. -- We should make n+1 strict -- as usual in Haskell. countWS_iter :: Monad m => Iteratee Char m Int countWS_iter = loop 0 where loop n = getchar >>= check n check n Nothing = return n check n (Just c) = loop (if isSpace c then n+1 else n) -- The code is close to that for Lazy IO and Handle IO. -- We did not have to deal with errors: getchar will -- take care for that. IO Errors other than EOF will be reported -- explicitly (like with the Handle IO) run_countWSI fname = print =<< run =<< enum_file fname countWS_iter -- No bracketing is needed: there is no handle to leak twsi = run_countWSI "/etc/motd" -- countWS_iter = loop 0 -- where loop n = IM.dropWhile (not . isSpace) >> -- IM.break (not . isSpace) >>= check n -- check n "" = return n -- check n l = loop (n + length l) -- ------------------------------------------------------------------------ -- More elegant solution to the white-space counting problem -- Lazy IO permits a far more elegant solution -- It is a one-liner, using the standard Prelude functions on lists countWS'_lazy :: String -> Int countWS'_lazy = length . filter isSpace run_countWSL' fname = readFile fname >>= print . countWS'_lazy twsl' = run_countWSL' "/etc/motd" -- 54 -- Iteratee IO affords the same elegance -- We use nested streams: en_filter is like -- ===+=== -- | -- connector, transforming the `thick' stream to the `thin' stream. -- When the thin stream consumer is finished, the thick stream continues -- through the right end. We don't need such a behavior here; so we use -- (.|) to plug the right end. countWS'_iter :: Monad m => Iteratee Char m Int countWS'_iter = id .| (en_filter isSpace) count_i run_countWSI' fname = print =<< run =<< enum_file fname countWS'_iter twsi' = run_countWSI' "/etc/motd" -- ------------------------------------------------------------------------ -- A more elaborate problem: counting the occurrences of the word ``the'' -- (assuming the input is text with words of bounded size) -- We must count ``the'' as the word by itself, not being a part of -- another word. That is, in order for ``the'' to be counted, the -- character before and after (if exist) must be whitespace. -- Lazy IO -- We use the standard library function `words' to parse the input string -- into a list of words. We filter out the words other than ``the'' -- and count the remainder. countTHE_lazy :: String -> Int countTHE_lazy = length . filter (=="the") . words run_countTHEL fname = readFile fname >>= print . countTHE_lazy tthel = run_countTHEL "/etc/motd" -- The IterateeIO does the same stream processing as Lazy IO, -- converting one stream to another (characters to words, words to -- filtered words) countTHE_iter :: Monad m => Iteratee Char m Int countTHE_iter = id .| enum_words .| en_filter (=="the") count_i run_countTHEI fname = print =<< run =<< enum_file fname countTHE_iter tthei = run_countTHEI "/etc/motd" -- Handle-based IO -- We have little choice but to implement a finite state machine -- More abstraction (even in the form of a separate tool: lexer generator) -- is direly needed. -- One may think of cheating: since line terminators are word terminators, -- we could use the standard library function hGetLine to read whole lines, -- using `words' to split the line into words and count `the' words. -- Alas, that cheating won't work for our following example. countTHE_handle :: Handle -> IO Int countTHE_handle h = getchar >>= s1 0 where s1 n (Just c) | isSpace c = getchar >>= s1 n s1 n (Just 't') = getchar >>= st n s1 n (Just _) = getchar >>= sskip n s1 n Nothing = return n st n (Just 'h') = getchar >>= sth n st n x = sskip n x sth n (Just 'e') = getchar >>= sthe n sth n x = sskip n x sthe n (Just c) | isSpace c = getchar >>= s1 (n+1) sthe n Nothing = return (n+1) sthe n _ = getchar >>= sskip n sskip n Nothing = return n sskip n (Just c) | isSpace c = getchar >>= s1 n sskip n _ = getchar >>= sskip n getchar :: IO (Maybe Char) getchar = try (hGetChar h) >>= \c -> case c of Right x -> return $ Just x Left e | Just ioe <- fromException e, isEOFError ioe -> return Nothing Left e -> throw e run_countTHEH fname = bracket (openFile fname ReadMode) hClose $ \h -> countTHE_handle h >>= print ttheh = run_countTHEH "/etc/motd" -- ------------------------------------------------------------------------ -- What if the input may contain words of any size? -- For example, take this input -- dd bs=1024 count=2048 if=/dev/zero of=/tmp/z -- echo " the " >> /tmp/z -- which has a 2MB-long ``word'' (made of zeros) -- The enumeratee `words' (just like List.words) is implemented in terms of -- break. Here is that code from GHC's library: words' :: String -> [String] words' s = case Prelude.dropWhile isSpace s of "" -> [] s' -> w : words' s'' where (w, s'') = break' isSpace s' -- We can implement `break' tail-recursively. -- IterateeM.break implements the same tail-recursive algorithm break' :: (a -> Bool) -> [a] -> ([a],[a]) break' p = loop [] where loop acc [] = (reverse acc,[]) loop acc xs@(x:_) | p x = (reverse acc, xs) loop acc (x:xs) = loop (x:acc) xs -- It is clear that we have to load the whole word in memory. -- That is indeed what happens, using Iteratee library tthei1 = run_countTHEI "/tmp/z" -- (41MB was needed) {- ./IterDemo1 +RTS -tstderr opened file /tmp/z closed file /tmp/z 1 <> 1.14 real 1.04 user 0.10 sys -} -- or using lazy IO tthel2 = readFile "/tmp/z" >>= print . length . filter (=="the") . words' -- (53MB was needed) {- ./IterDemo1 +RTS -tstderr 1 <> 1.16 real 1.01 user 0.14 sys -} -- However, break in the GHC standard library was implemented differently, -- without tail recursion: {- GHC/HBC break break' :: (a -> Bool) -> [a] -> ([a],[a]) break' _ xs@[] = (xs, xs) break' p xs@(x:xs') | p x = ([],xs) | otherwise = let (ys,zs) = break' p xs' in (x:ys,zs) -} -- For the present example, it really helps: tthel1 = run_countTHEL "/tmp/z" -- Only 1MB was needed (and the max and avg residency is the same, -- meaning constant memory processing) {- ./IterDemo1 +RTS -tstderr 1 <> 0.55 real 0.52 user 0.02 sys -} -- Alas, it was a mere luck: if we change the example slightly ttl1 = readFile "/tmp/z" >>= print . map length . filter (/="the") . words -- we observe that the whole word is loaded in memory, although the -- whole program clearly could be executed in constant space. -- That is the problem with lazy IO: its space usage is unreliable -- and very hard to predict. A small change in a program can have huge -- difference in space use. {- ./IterDemo1 +RTS -tstderr [2097152] <> 1.08 real 0.97 user 0.10 sys -} -- What can we do with iteratees? -- If we really mean that words should be of reasonable size and -- 2MB `words' are the evidence of a denial of service attack, -- we can easily catch and report the words longer than a threshold. -- Moreover, we can add that guard without changing the main processing -- pipeline! We merely pair the main counting pipeline with an iteratee -- that counts the distance between two separators, exiting or -- throwing an error whenever that distance goes above the threshold. -- See enum_pair below for an example of pairing. See also IterDemo.hs -- for an example of a guard for the main processing pipeline. -- If we do expect that the input may contain arbitrarily long words -- we can arrange for incremental processing countTHE_incr :: Monad m => Iteratee Char m Int countTHE_incr = id .| en_group (word (eqv "the")) .| en_filter id count_i where eqv str = do c <- heads str f <- peek return $ c == length str && maybe True (const False) f run_countTHEX fname = print =<< run =<< enum_file fname countTHE_incr tthex = run_countTHEX "/etc/motd" tthex1 = run_countTHEX "/tmp/z" -- All the signs of constant-memory processing: avg residency = max residency, -- only 1M is used: {- ./IterDemo1 +RTS -tstderr opened file /tmp/z closed file /tmp/z 1 <> 0.63 real 0.60 user 0.02 sys -} -- We ensure incremental processing -- there is no more surprizes and -- second-guessing what GHC might do. -- For example, earlier we were surprized by the following lazy IO program: -- ttl1 = readFile "/tmp/z" >>= print . map length . filter (/="the") . words ttx1 = print =<< run =<< enum_file "/tmp/z" .| en_group (word (eqv "the" `enumPair` count_i)) .| en_filter (not . fst) stream2list where eqv str = do c <- heads str f <- peek return $ c == length str && maybe True (const False) f -- It now runs in constant memory. {- ./IterDemo1 +RTS -tstderr opened file /tmp/z closed file /tmp/z [(False,2097152),(False,0)] <> 0.78 real 0.76 user 0.01 sys -} -- ------------------------------------------------------------------------ -- counting the occurrences of the word ``the'' in the sequence of files -- assuming the files are concatenated together. -- Therefore, we will count the word `the' if the letter `t' is in one -- file but `he' is in the next one. -- Lazy IO: we re-use the previously written counting function countTHE_lazy -- We will pass it a string that is the concatenation of files' contents. run_manyTHEL fnames = mapM readFile fnames >>= print . countTHE_lazy . concat tmanyl = run_manyTHEL ["/etc/motd","/etc/motd"] -- The code is elegant. -- The files are read incrementally; the second file will be read only after -- the first one finished. Alas, we have to open all of the files first! -- (the readFile action is performed first, which opens the file and prepares -- it for lazy reading). -- Therefore, we need as many descriptors as there are files in the fnames -- list. If the list is obtained from scanning a directory tree, -- we may run out of file descriptors! That is particularly disturbing given -- that we only need one file descriptor, opening and closing it as we go. -- We get the first intimation how Lazy IO mis-manages resources -- sometimes -- taking much more than needed, and giving the programmer no facilities -- to control the resources. -- The Handle IO solution -- The first approximation run_manyTHEH fnames = mapM counter fnames >>= print . sum where counter fname = bracket (openFile fname ReadMode) hClose countTHE_handle -- We only need one file descriptor for the whole operation: the next -- file is opened only after the previous file is closed. -- Alas, this solution is deficient since EOF is counted as a word -- terminator. We can't handle the case of the word `the' split across the -- files. We have to re-write our state machine to perform an action -- upon the detection of EOF to re-associate the handle with another file. -- This re-writing is left as an exercises to the reader, to drive down -- the point of how low-level Handle IO is. tmanyh = run_manyTHEH ["/etc/motd","/etc/motd"] -- Iteratee IO -- Composing enumerators (via the Kleisli composition) concatenates their -- sources run_manyTHEI fnames = print =<< run =<< foldr1 (>>>) (map enum_file fnames) countTHE_iter -- The iteratee code again looks pretty much like Lazy IO code. However, -- only one file descriptor is used for all processing. -- enum_file prints the debugging message when the file is opened -- and closed. We clearly see that, unlike Lazy IO, only one file is -- open at any given time. tmanyi = run_manyTHEI ["/etc/motd","/etc/motd"] -- ------------------------------------------------------------------------ -- Counting the occurrences of the word ``the'' and the white space, together -- Lazy IO -- We just combine the previously written counters. -- It looks great! run_countPairL fname = do str <- readFile fname print (countWS'_lazy str, countTHE_lazy str) twsthel = run_countPairL "/etc/motd" -- (190,8) -- Alas, it doesn't work great! Now, the whole file is loaded in memory. -- The processing is no longer incremental. We have come across one -- of many surprises of Lazy IO. -- The file /usr/share/dict/words is 2,435K long -- The memory statistics below show that Lazy IO loads the whole file -- in memory (a Char is represented by about 10 bytes in GHC run-time). -- Max residency is 28 MB twsthel1 = run_countPairL "/usr/share/dict/words" {- (235923,1) <> 1.43 real 1.32 user 0.10 sys -} -- The handle IO code is dreadful to contemplate. Cheating like using -- hGetLine does not help since hGetLine does not report if the last -- line in the file was terminated with a line terminator. So, we can't -- use hGetLine to reliably count whitespace. We have to make the product -- of two finite automata (the-counter and whitespace counter) -- by hand. -- The automation is direly needed. -- The Iteratee code -- Like Lazy IO, we re-use the previously written counters, pairing them. -- Unlike Lazy IO, the processing remains incremental. As we read a -- block from file, we send the block to two iteratees for handling. run_countPairI fname = print =<< run =<< enum_file fname (countWS'_iter `en_pair` countTHE_iter) twsthei = run_countPairI "/etc/motd" -- (190,8) -- Now, the max residency is about 80K and is almost the same as the -- average residency. The whole processing is done in constant memory. -- Clearly, the file was NOT loaded in memory (since that would take -- about 20MB). twsthei1 = run_countPairI "/usr/share/dict/words" {- ./IterDemo1 +RTS -tstderr opened file /usr/share/dict/words closed file /usr/share/dict/words (235923,1) <> 1.41 real 1.35 user 0.05 sys -} -- ------------------------------------------------------------------------ -- Early termination: -- Counting the occurrences of the word ``the'' and the white space -- within the prefix of the stream of the size at most N. The example -- is abstracted from reading HTTP request content with the -- explicitly specified Content-Length. -- We should not attempt to read even a single byte after N since -- the web client expects the reply first, before it will send -- the next request. If we attempt to read ahead after N bytes, -- the deadlock ensues. run_ntermL n fname = do str0 <- readFile fname let str = Prelude.take n str0 print (countWS_lazy str, countTHE_lazy str) tnterml = run_ntermL 160 "/etc/motd" -- As before, run_countPairL, this code does not run in constant memory. -- There is another problem: since we may read only a part of the file, -- the file descriptor will not be closed (until a finalizer is run, which -- may happen very late). There is a real danger of running out of -- file descriptors (which regularly happens in practice with Lazy IO). -- There is the third problem: the run-time system may speculatively -- read-ahead, at any time and for any reason. The programmer has -- no way whatsoever to control this read-ahead or even be informed about it. -- Therefore, deadlock may happen (and does routinely happen in practice, when -- using lazy IO for interactive services). -- Lazy IO was designed to give the impression that IO is not even happening. -- Alas, when dealing with request-response servers and multiple IO -- operations, even reading is an observable effect. -- Iteratee IO run_ntermI n fname = print =<< run =<< enum_file fname .| IM.take n (countWS_iter `en_pair` countTHE_iter) tntermi = run_ntermI 160 "/etc/motd" -- The enumeratee IM.take will ensure that no more than n characters -- are read; afterwards, it will not speculatively ask for any further -- file data. -- ------------------------------------------------------------------------ -- Early termination: -- Counting the occurrences of the word ``the'' and the white space -- up to the occurrence of the terminating string ``the end'' -- (``the'' in the terminating string should be counted). -- Exercise: don't count ``the'' occurring in the terminator. -- The example is abstracted from reading HTTP multi-part messages, -- which are terminated by a so-called boundary string. -- Lazy IO run_btermL fname = do str0 <- readFile fname let str = until "the end" str0 print (countWS_lazy str, countTHE_lazy str) where until :: Eq a => [a] -> [a] -> [a] until pattern [] = [] until pattern str | isPrefixOf pattern str = pattern until pattern (h:t) = h : until pattern t -- All the problems of run_ntermL occur here. tbterml = run_btermL "IterDemo1.hs" tbterml' = run_btermL "IterDemo.hs" -- Again, it does not run in constant memory: the whole file -- is read in memory -- We won't even think of the Handle-based solution: too complex -- The iteratee solution 1 run_bterm1I fname = print =<< run =<< enum_file fname .| until_the_end (countWS_iter `en_pair` countTHE_iter) where -- The following function takes advantage of the fact that -- no suffix of "the end" is its prefix. Generally, we should pre-process -- the pattern and so back-off by a smaller amount. -- KMP search has been implemented with iteratees. until_the_end :: Monad m => Enumeratee Char Char m a until_the_end (IE_cont Nothing k) = heads pattern >>= check k until_the_end i = return i pattern = "the end" check k n | n == length pattern = yield_to k pattern check k 0 = IM.peek >>= finished k check k n = yield_to k (Prelude.take n pattern) >>= until_the_end finished k Nothing = return (ie_cont k) finished k (Just c) = IM.head >> yield_to k [c] >>= until_the_end -- we define yield_to to make the analogy with generators clearer yield_to k v = lift (feedI k (Chunk v)) tbterm1i = run_bterm1I "IterDemo1.hs" tbterm1i' = run_bterm1I "IterDemo.hs" -- The iteratee solution 2 -- We use a general take_until_match run_bterm2I fname = print =<< run =<< enum_file fname .| take_until_match "the end" (countWS_iter `en_pair` countTHE_iter) tbterm2i = run_bterm2I "IterDemo1.hs" tbterm2i' = run_bterm2I "IterDemo.hs" -- ------------------------------------------------------------------------ -- The following functions are general and should be in an iteratee -- library (and in some libraries, they are) -- Count the stream -- This iteratee does not do any IO (betrayed by its type, polymorphic -- over the monad m). The counting iteratee is also polymorphic over stream -- elements: it can count any streams. count_i :: Monad m => Iteratee el m Int count_i = ie_cont $ step 0 where step acc (Chunk []) = ie_contM (step acc) step acc (Chunk [_]) = ie_contM (step $! succ acc) step acc (Chunk ls) = ie_contM (step $! acc + length ls) step acc stream = ie_doneM acc stream -- Like a combination of head and peek -- (Some Iteratee libraries may provide this function) -- It is also easy to write. getchar :: Monad m => Iteratee el m (Maybe el) getchar = headM en_filter :: Monad m => (el -> Bool) -> Enumeratee el el m a en_filter test i@(IE_cont Nothing k) = ie_cont step where step (Chunk l) = case filter test l of [] -> ie_contM step l -> feedI k (Chunk l) >>= \i -> return (en_filter test i, empty_stream) step s = ie_doneM i s en_filter _ i = return i -- Parallel composition of Iteratees -- The following enumeratee enum2' applies two iteratees to the same stream, -- in effect, splitting the stream in two. -- There is no buffering however: enum2' receives a chunk and passes -- it to both iteratees, before asking for the next chunk. -- The enumeratee enum2' finishes either when the stream is over or -- when at least one of the given iteratees finish. -- The IterateeM.hs library has the similar combinator enum2, -- which continues requesting stream data so long as at least -- one of the iteratees wants them. enum2' :: Monad m => Iteratee el m a -> Iteratee el m b -> Iteratee el m (Iteratee el m a, Iteratee el m b) enum2' (IE_cont Nothing k1) (IE_cont Nothing k2) = ie_cont step where step (Chunk []) = ie_contM step step str@Chunk{} = feedI k1 str >>= \i1 -> feedI k2 str >>= \i2 -> ie_ret $ enum2' i1 i2 step str = ie_doneM (ie_cont k1, ie_cont k2) str -- at least one iteratee is finished and doesn't want any data enum2' i1 i2 = return (i1,i2) -- A convenient combinator for the frequently occurring pattern en_pair :: Monad m => Iteratee el m a -> Iteratee el m b -> Iteratee el m (a,b) en_pair i1 i2 = enum2' i1 i2 >>= runI2 -- The incremental version of `span' (or, not break): create a substream -- of the elements that satisfy the predicate -- Like IM.take, we read up the elements that satisfy the predicate -- regardless of whether the inner iteratee wants them take_while :: Monad m => (el -> Bool) -> Enumeratee el el m a take_while cpred i = ie_cont (step i) where step i (Chunk []) = ie_contM (step i) step i (Chunk str) = case Prelude.span cpred str of (_,[]) -> try_feedI i str >>= ie_contM . step (str,tail) -> try_feedI i str >>= \i -> ie_doneM i (Chunk tail) step i stream = ie_doneM i stream try_feedI (IE_cont Nothing k) v = feedI k (Chunk v) try_feedI i _ = return i -- A substream of characters that constitute a word word :: Monad m => Enumeratee Char Char m a word i = IM.dropWhile isSpace >> take_while (not . isSpace) i -- Make a stream out of the results of processing of each substream, -- something like map f . groupBy en_group :: Monad m => Iteratee elo m (Iteratee eli m el) -> Enumeratee elo el m a en_group ii i@(IE_cont Nothing k) = is_stream_finished >>= maybe (handle .| ii) (\_ -> return i) where handle r = r >>= \v -> lift (feedI k (Chunk [v])) >>= en_group ii en_group ii i = return i -- A different implementation of enum_word enum_words' :: Monad m => Enumeratee Char String m a enum_words' = en_group (word stream2list) -- A few tests -- the first word of /etc/motd tw1 = print =<< run =<< enum_file "/etc/motd" .| word stream2list -- all words of /etc/motd tw2 = print =<< run =<< enum_file "/etc/motd" .| enum_words' stream2list -- tw2' = print =<< run =<< enum_file "/tmp/c" .| enum_words' stream2list putBack :: Monad m => [e] -> Iteratee e m () putBack pbs = ie_cont step where step (Chunk s) = ie_doneM () (Chunk (pbs ++ s)) step stream = ie_doneM () stream -- The incremental version of `span' (or, not break): create a substream -- of the elements until the stream matches the pattern -- Like IM.take, we read up the elements until the pattern is found -- or stream is ended, regardless of whether the inner iteratee wants them. -- That is the behavior we need for reading multi-part messages, for example. -- The code is not very efficient, but it mimics the simplicity -- of the Lazy IO code run_btermL above. take_until_match :: (Eq e, Monad m) => [e] -> Enumeratee e e m a take_until_match [] i = return i take_until_match [e] i = take_while (/= e) i take_until_match pattern i = heads pattern >>= check i where -- check how much of the pattern matched check i n | n == length pattern = try_feedI i pattern check i 0 = headM >>= finished i -- Nothing matched check i 1 = try_feedI i [Prelude.head pattern] >>= take_until_match pattern check i n = putBack (Prelude.take (n-1) (tail pattern)) >> try_feedI i [Prelude.head pattern] >>= take_until_match pattern finished i Nothing = return i -- EOF finished i (Just c) = try_feedI i [c] >>= take_until_match pattern try_feedI (IE_cont Nothing k) v = lift $ feedI k (Chunk v) try_feedI i _ = return i