-- Processing two streams in parallel by one iteratee module IterateeN where import IterateeM import Control.Monad.Trans import Control.Monad.Identity import Prelude hiding (head, drop, dropWhile) import Data.List (isInfixOf) -- Example 1: -- Taking heads of two streams. Both streams should be read incrementally. -- Neither stream should be read beyond the first element. -- We use only high-level (monadic) operations on Iteratees, -- abstracting over the implementation -- The first nested Iteratee: i1 :: Monad m => Iteratee el1 (Iteratee el2 m) (el1, el2) i1 = do e1 <- head e2 <- lift head return (e1,e2) -- (all signatures can be and have been inferred; we write -- them for clarity) -- First we pass i1 to enumerator -- The (inferred) type shows the nesting: el2 is the type of the elements -- in the outer stream; the inner stream contains Ints t11 :: (Monad m) => Iteratee el2 m (Iteratee Int (Iteratee el2 m) (Int, el2)) t11 = enum_pure_1chunk [1::Int,2,3,4] i1 -- We complete the enumeration by running it. -- The result is the Iteratee (of the outer stream), which -- is suitable for giving to an enumerator t12 :: (Monad m) => Iteratee el2 m (Int, el2) t12 = run =<< enum_pure_1chunk [1::Int,2,3,4] i1 t13 :: (Monad m) => m (Iteratee Char m (Int, Char)) t13 = enum_pure_1chunk "5678" $ run =<< enum_pure_1chunk [1::Int,2,3,4] i1 -- we can run the result, the outer stream enumeration t14 :: (Monad m) => m (Int, Char) t14 = run =<< (enum_pure_1chunk "5678" $ run =<< enum_pure_1chunk [1::Int,2,3,4] i1) -- Here is the complete example, taking head from both streams t15 = (== (1,'5')) . runIdentity $ t14 -- Here is how we prove that neither stream is read beyond what -- is needed to produce the result t16 = (== (1,'5')) . runIdentity $ run =<< (enum_pure_1chunk ("5"++undefined) $ run =<< enum_pure_1chunk ([1::Int]++undefined) i1) -- True -- In order for enum_pure_nchunk to be able to split a given list -- into 2-element chunks, it (or, splitAt, actually) needs at -- least three defined cons cells. t17 = (== (1,'5')) . runIdentity $ run =<< (enum_pure_nchunk ("567"++undefined) 2 $ run =<< enum_pure_nchunk ([1::Int,2,3]++undefined) 2 i1) -- True -- Example 2: -- implementing a `zip' function, zipping two streams -- together stopping when at least one stream is exhausted. -- Again, both streams should be read incrementally. -- Again, we should use only high-level stream operations. z1 :: (MonadIO m, Show el1, Show el2) => Iteratee el1 (Iteratee el2 m) () z1 = do e1 <- peek -- Attempt to read e2 <- lift peek -- from both streams case (e1,e2) of (Just v1, Just v2) -> do liftIO $ print (v1,v2) head -- now we consume both heads lift head z1 -- and continue _ -> return () tz1 :: IO () tz1 = run =<< (enum_pure_1chunk "56780" $ (run =<< enum_pure_1chunk [1::Int,2,3,4] z1)) -- (1,'5') -- (2,'6') -- (3,'7') -- (4,'8') tz2 :: IO () tz2 = run =<< (enum_pure_1chunk "5678" $ (run =<< enum_pure_1chunk [1::Int,2,3,4,0] z1)) -- the same tz3 :: IO () tz3 = run =<< (enum_pure_nchunk ("5678000"++undefined) 2 $ (run =<< enum_pure_1chunk [1::Int,2,3,4] z1)) -- the same -- Example 3: -- Read two streams, one stream by words and the other by lines. -- Write a sequence of pairs, of a word from one stream and a line from -- the other. The example requires reading streams in parallel, -- and at different speeds. -- The action, zipping, remains the same: -- We use the existing zipping iteratee z1 tlw1 :: (MonadIO m, Show el2) => Iteratee el2 m (Iteratee Char (Iteratee el2 m) ()) tlw1 = enum_file_gen "test2.txt" (runI =<< enum_lines z1) tlw2 :: (MonadIO m, Show el2) => Iteratee el2 m () tlw2 = run =<< enum_file_gen "test2.txt" (runI =<< enum_lines z1) -- We see that tlw2 is of the type of the normal Iteratee. -- So we can use it for the outer stream, like we used z1 before. tlw3 :: (MonadIO m) => m () tlw3 = run =<< enum_file_gen "test1.txt" (runI =<< enum_lines tlw2) tlw4 :: IO () tlw4 = tlw3 {- -- The last few lines of the output show we indeed zipped two files -- while reading them incrementally Read buffer, size 5 Read buffer, size 5 ("header6: v6","header6: v6") Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 ("header7: v7","header7: v7") closed file test2.txt closed file test1.txt -} -- But we wanted to read the other file by words. No problem, -- we just change the enumeratee. tlw5 :: IO () tlw5 = run =<< enum_file_gen "test1.txt" (runI =<< enum_words tlw2) {- Read buffer, size 5 ("header6: v6","v3") Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 ("header7: v7","header4:") Read buffer, size 5 closed file test2.txt closed file test1.txt -} -- Example 4: -- Dependent reading of two streams, not in lock-step. -- The amount to read from one stream depends on what -- we have read from the other. -- We will read the nth-word from the file test2.txt -- and find the line from the file test1.txt that -- contains that word. We return that line paired -- with the word. -- We write our iteratee in the most straightforward way zwl :: (MonadIO m) => Int -> Iteratee String (Iteratee Line m) (Line, String) zwl n = do drop n -- Drop the words from the word stream before we w <- head -- get the desired n-th word liftIO $ putStrLn $ "Got the word: " ++ w -- From the line stream, drop the lines that -- do not contain the word w lift $ dropWhile (not . isInfixOf w) -- Read the first line that does contain w l <- lift $ head return (l,w) -- The inner stream is the stream of words. Feed it to zwl. tzwlI :: (MonadIO m) => Int -> Iteratee Line m (Line, String) tzwlI n = run =<< enum_file_gen "test2.txt" (runI =<< enum_words (zwl n)) -- Feed the outer stream, of lines from test1.txt tzwlO :: (MonadIO m) => Int -> m (Line, String) tzwlO n = run =<< enum_file_gen "test1.txt" (runI =<< enum_lines (tzwlI n)) tzwlO5 = tzwlO 5 >>= print {- *IterateeN> tzwlO5 opened file test1.txt Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 opened file test2.txt Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Got the word: v3 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 Read buffer, size 5 closed file test2.txt closed file test1.txt ("header3: v3","v3") (0.03 secs, 1595188 bytes) -}