De-biforestation
Deforestation is usually defined as the elimination of an intermediate
data structure between a single producer and a single consumer. Jorge
Adriano [1] showed an interesting example of one producer feeding two
independent consumers. The two streams of data are mutually
dependent. Furthermore, the rate of production is non-uniform:
Generating an element in one stream may require generating trillion
items in the other stream. If the first consumer is being evaluated,
that consumer causes the evaluation of the producer until the latter
yields an item. The trillion of items incidentally produced in the
other stream have to be stored somewhere. In more OS-centric terms,
the evaluator can't generally reduce two expressions in parallel. If
it chooses one stream consumer, the other consumer is set aside.
While the first stream consumer "waits" for a value, the second stream
consumer is "blocked", and hence the corresponding stream must be
buffered, sometimes at great expense.
[1] http://www.haskell.org/pipermail/haskell-cafe/2002-November/003546.html
We will consider the problem of one producer and two consumers in the
general setting. We derive a deforested version, which no longer needs
to buffer any produced items. The retaining profile shows no space
leaks. We also consider "parallel" writing of streams into two
distinct files. Our solution is safe but the i/o is effectively
interleaved. The deforested solution is indeed _derived_, via a
sequence of equivalent transformations. In fact, the derived code
worked on the first try.
In the original example, the two consumers were writing received data
into the their own files. Let us first generalize the problem. We
first assume that the two mutually-recursive streams are specified by
a transition function
transition:: (a,b) -> (a,b)
one the state of a _pre-stream_. The state of the latter is a pair of
values. We obtain two streams in question by splitting the pre-stream
into the fst and the snd components, and applying two different
filters. The filtering part makes the rate of production non-uniform.
> import IO
> infixr 0 $^$
> -- Can't do pattern-matching-deconstruction on state because
> -- pattern-matching is eager!
> (f,g) $^$ state = (f $ fst state, g $ snd state)
>
> stream transition (p,q) state@(xstate,ystate) =
> ((filter p) . (xstate :), (filter q) . (ystate :)) $^$
> (stream transition (p,q) $ transition state)
For illustration, we will try the following transition function and
the filters:
> trans1 (x,y) = (y+1,x)
> st_p x = x `mod` 1997 == 0
> st_q = const True
The streams are: without filtering
Main> take 9 $ fst $ stream trans1 (\x->True,\x->True) (0,0)
[0,1,1,2,2,3,3,4,4]
Main> take 9 $ snd $ stream trans1 (\x->True,\x->True) (0,0)
[0,0,1,1,2,2,3,3,4]
and with filtering:
Main> take 9 $ snd $ stream trans1 (st_p,st_q) (0,0)
[0,0,1,1,2,2,3,3,4]
(443 reductions, 782 cells)
Main> take 9 $ fst $ stream trans1 (st_p,st_q) (0,0)
[0,1997,1997,3994,3994,5991,5991,7988,7988]
(6262603 reductions, 9198209 cells, 42 garbage collections)
Generating each item in the first stream indeed generates quite a few
(thousand) items in the second stream.
In our first example, the consumers will sum the corresponding streams:
> consumers_sum stream n = (f,f) $^$ stream trans1 (st_p,st_q) (0,0)
> where f = sum . take n
More generally, we can write a consumer using a function sfoldl (stream
fold). The function is similar to the regular foldl but can be applied
to infinite streams (and so it needs a predicate to tell the
termination).
> sfoldl sf st z (x:_) | sf x z = z
> sfoldl sf st z (x:xs) = sfoldl sf st (st z x) xs
Our consumers_sum function can be written as
> consumers_sum' stream n =
> (fst,fst) $^$ (f,f) $^$ stream trans1 (st_p,st_q) (0,0)
> where
> f = sfoldl sf st (0,0)
> sf x (_,count) = count >= n
> st (sum,count) val = (val+sum,count+1)
In general, the essence of producing and consuming two streams is captured
by an expression
(sfoldl sf1 st1 z1, sfoldl sf2 st2 z2) $^$ stream trans (p,q) (x,y)
=== { definition of stream }
=== (sfoldl sf1 st1 z1, sfoldl sf2 st2 z2) $^$
( ((filter p) . (x :), (filter q) . (y :)) $^$
stream trans (p,q) $ trans (x,y) )
=== { We know a$(b$c) === (a.b) $ c
Similarly,
a $^$ (b $^$ c) === ((((.),(.)) $^$ a) $^$ b) $^$ c
or, in a less bizarre notation
a $^$ (b $^$ c) === ((fst a . fst b),(snd a . snd b)) $^$ c
}
===
((sfoldl sf1 st1 z1) . (filter p) . (x :),
(sfoldl sf2 st2 z2) . (filter q) . (y :)) $^$
stream trans (p,q) $ trans (x,y)
where we also took advantage of the associativity of composition. We
note that two folds, sfoldl and the fold implicit in filter, can be fused:
sfoldlf sf st p z
=def=
(sfoldl sf st z) . (filter p)
Then sfoldlf sf st p z (x:xs)
=== (sfoldl sf st z) $ (filter p (x:xs))
=== { definition of filter, lifting the condition }
if p x then (sfoldl sf st z (x:filter p xs))
else (sfoldl sf st z (filter p xs))
=== { definition of sfoldlf }
if p x then (sfoldl sf st z (x:filter p xs))
else sfoldlf sf st p z xs
=== { definition of sfoldl }
if p x then
if sf x z then z
else sfoldl sf st (st z x) (filter p xs)
else sfoldlf sf st p z xs
===
if p x then
if sf x z then z
else sfoldlf sf st p (st z x) xs
else sfoldlf sf st p z xs
We have just obtained a recursive equation for sfoldlf. Note that
sfoldlf (just as sfoldl) are undefined for empty lists.
We can now write our producer-consumers problem as
(sfoldl sf1 st1 z1, sfoldl sf2 st2 z2) $^$ stream trans (p,q) (x,y)
===
((sfoldlf sf1 st1 p z1) . (x :),
(sfoldlf sf2 st2 q z2) . (y :)) $^$
stream trans (p,q) $ trans (x,y)
We already see a notable simplification. The streams are now producing
at the same rate! So we can replace a pair of two streams with one
stream of pairs.
But we can do better. Let's look at the equation for sfoldlf again
sfoldlf sf st p z (x:xs)
===
if p x then
if sf x z then z
else sfoldlf sf st p (st z x) xs
else sfoldlf sf st p z xs
The equation suggests a useful generalization of sfoldl:
> sfoldl' sf st z True _ = z
> sfoldl' sf st z done (x:xs) | sf x z = sfoldl' sf st z True xs
> sfoldl' sf st z done (x:xs) = sfoldl' sf st done (st z x) xs
We can now introduce
sfoldlf' sf st p done z =def= (sfoldl' sf st done z) . (filter p)
which obeys the equation
sfoldlf' sf st p done z (x:xs)
===
if done then z
else
if p x then
if sf x z then sfoldlf' sf st z True xs
else sfoldlf' sf st p done (st z x) xs
else sfoldlf' sf st p done z xs
=== { pushing conditions inside }
sfoldlf' sf st p
(done || (p x && sf x z))
(ss sf st p done z x)
xs
where
ss sf st p done z x =
if done then z
else if p x then if sf x z then z else (st z x)
else z
Something remarkable has happened: we've got the left recursion!
We should note that the introduction of a 'done' argument is not
unlike a common trick in physics of converting a value into a
functional with the help of the Dirac delta-function.
We can now write
sfoldlfs sf1 st1 done1 z1 sf2 st2 done2 z2 (p,q) trans state@(x,y)
=def=
(sfoldl' sf1 st1 done1 z1, sfoldl' sf2 st2 done2 z2)
$^$ stream trans (p,q) (x,y)
===
((sfoldl' sf1 st1 done1 z1) . (filter p) . (x :),
(sfoldl' sf2 st2 done2 z2) . (filter q) . (y :)) $^$
stream trans (p,q) $ trans (x,y)
===
(\tail-> sfoldlf' sf1 st1 p done1 z1 (x:tail),
\tail-> sfoldlf' sf2 st2 q done2 z2 (y:tail) ) $^$
stream trans (p,q) $ trans (x,y)
===
(\tail-> sfoldlf' sf1 st1 p (done1 || (p x && sf1 x z1))
(ss sf1 st1 p done1 z1 x) tail,
\tail-> sfoldlf' sf2 st2 q (done2 || (q y && sf2 y z2))
(ss sf2 st2 q done2 z2 y) tail ) $^$
stream trans (p,q) $ trans (x,y)
=== { eta reduction }
(sfoldlf' sf1 st1 p (done1 || (p x && sf1 x z1))
(ss sf1 st1 p done1 z1 x),
sfoldlf' sf2 st2 q (done2 || (q y && sf2 y z2))
(ss sf2 st2 q done2 z2 y) ) $^$
stream trans (p,q) $ trans (x,y)
=== { definition of sfoldlfs }
sfoldlfs sf1 st1 (done1 || (p x && sf1 x z1))
(ss sf1 st1 p done1 z1 x)
sf2 st2 (done2 || (q y && sf2 y z2))
(ss sf2 st2 q done2 z2 y)
(p,q) trans (trans state)
And we have obtained the primitive recursive solution with no
lists whatsoever. We have achieved the full deforestation!
Now we just have to program it:
> ss:: (x->z->Bool) -> (z->x->z) -> (x->Bool) -> Bool -> z -> x -> z
> ss sf st p done z x =
> if done then z
> else if p x then (if sf x z then z else (st z x))
> else z
*> sfoldlfs sf1 st1 True z1 sf2 st2 True z2 (p,q) trans state = (z1,z2)
*> sfoldlfs sf1 st1 done1 z1 sf2 st2 done2 z2 flt@(p,q) trans state@(x,y) =
*> sfoldlfs sf1 st1 (done1 || (p x && sf1 x z1)) (ss sf1 st1 p done1 z1 x)
*> sf2 st2 (done2 || (q y && sf2 y z2)) (ss sf2 st2 q done2 z2 y)
*> flt trans (trans state)
Or, factoring out common arguments
> sfoldlfs sf1 st1 done1 z1 sf2 st2 done2 z2 flt@(p,q) trans state =
> aux done1 z1 done2 z2 state
> where
> aux True z1 True z2 _ = (z1,z2)
> aux done1 z1 done2 z2 state@(x,y) =
> aux (done1 || (p x && sf1 x z1)) (ss sf1 st1 p done1 z1 x)
> (done2 || (q y && sf2 y z2)) (ss sf2 st2 q done2 z2 y)
> (trans state)
> consumers_sum'' trans n =
> (fst,fst) $^$
> sfoldlfs sf st False (0,0) sf st False (0,0) (st_p,st_q) trans (0,0)
> where
> sf x (_,count) = count >= n
> st (sum,count) val = (val+sum,count+1)
Let us now compare the forested version consumers_sum' with the
deforested version consumers_sum'' in performance, using GHCi:
*Main> consumers_sum' stream 8
(31952,12)
(0.26 secs, 11934436 bytes)
*Main> consumers_sum'' trans1 8
(31952,12)
(0.18 secs, 7695084 bytes)
*Main> consumers_sum' stream 16
(127808,56)
(0.86 secs, 32660328 bytes)
*Main> consumers_sum'' trans1 16
(127808,56)
(0.32 secs, 14354768 bytes)
*Main> consumers_sum'' trans1 16
(127808,56)
(0.32 secs, 14579336 bytes)
*Main> consumers_sum' stream 32
(511232,240)
(3.32 secs, 97786900 bytes)
*Main> consumers_sum'' trans1 32
(511232,240)
(0.69 secs, 28490268 bytes)
It appears that the deforestation was worth it. The resulting code
needs less memory and executes notably faster.
In another test, we compile the code with optimization and profiling enabled:
> main = do
> print $ consumers_sum' stream 32;
> print $ consumers_sum'' trans1 32
The result indicate that consumers_sum' is responsible for only 15.4%
of the time and 13.7% of the allocations (most of which are due to the
transition function). What's more exhilarating, the retainer profile
of "print $ consumers_sum'' trans1 32" shows nothing. Nothing is
retained. In contrast, the retainer profile of "print $
consumers_sum' stream 32;" shows large a large space leak of 12 MB of
retained memory.
But what about the original question by Jorge Adriano? Can we write
two stream into two different files, and _safely_ interleave the i/o?
Yes, we can. We need a version of sfoldlfs that shares the singe seed
between two consumers:
> sfoldlfs1 sf1 st1 done1 sf2 st2 done2 flt@(p,q) trans z state =
> aux done1 done2 z state
> where
> aux True True z _ = z
> aux done1 done2 z state@(x,y) =
> aux (done1 || (p x && sf1 x z)) (done2 || (q y && sf2 y z))
> (ss sf2 st2 q done2 (ss sf1 st1 p done1 z x) y)
> (trans state)
> write_in_two_files file1 file2 trans max =
> (\(h1,h2) -> hClose h1 >> hClose h2) =<<
> sfoldlfs1 sf (st fst) False sf (st snd) False (st_p,st_q) trans
> (do h1 <- openFile file1 WriteMode;
> h2 <- openFile file2 WriteMode;
> return (h1,h2)) (0,0)
> where
> sf x _ = x >= max
> st selector two_handles val =
> do
> handles <- two_handles
> hPrint (selector handles) val;
> return handles
*> main = write_in_two_files "/tmp/a1" "/tmp/a2" trans1 80000
The profile shows that the i/o is interleaved indeed.