If the needed vocabulary -- the key words -- are not immediately clear, we can look for them: building a solution step-by-step, recognizing patterns, generalizing and naming them. Stream processing especially fits this stepwise approach. It lets us build pipelines one segment at a time, by ``coupling programs like garden hose -- screw in another segment when it becomes necessary to massage data in another way.'' (Doug McIlroy, 1964). We have seen such a program development in the cited article. The present article gives another illustration of the technique, on a twice more complicated problem.
The solution we develop is clear not just to the eyes -- to the computer as well. The data are accessed strictly sequentially; each input character is processed immediately without buffering. The entire pipeline may hence run in truly constant memory. We indeed obtain -- mechanically generate -- C code for such efficient processing, which worked, correctly, on first try. Clarity and performance do not have to be in opposition.
The input is a sequence of numbers separated by pipes into chunks where each chunk contains multiple comma-separated numbers. The task is to compute the sum of each chunk and find the maximum across all chunks.A sample input is as follows:
100,200,300|400|500,600|700,800,900|1000
The original problem used newlines and empty lines as separators. We return to it later.
Looking ahead, the eventual solution is
of_string ~terminator:'\000' >> parse_ints >> group_by_aggregate ',' {unit=0;op=(+)} >> group_by_aggregate '|' {unit=Int.min_int;op=max} >> iter (fun (n,_) -> print_int n)It is a pipeline, of (stream) operations connected by the left-to-right functional composition
>>
.
The pipeline is typed. The types are not shown since they are all
inferred -- one may still attach type annotations if desired,
e.g., for documentation purposes.
Our solution is incremental: the data are read as a stream, one character at a time, and processed one character at a time, with no buffering. With a good implementation for streams (see later for the C code), the processing is done in truly constant time, without creating any garbage along the way.
The rest of the page shows how it was developed.
S. Abeysinghe and T. Rompf. Rhyme: a data-centric expressive query language for nested data structures. PADL 2024.
adv22.ml [12K]
Complete code for the article
max
is associative, can be associated to the
left and hence computed incrementally.
To solve our problem using streams we need an API for streams (and an implementation of that API). At the very least, we should be able to construct and observe a stream. The initial API would hence look like:
module type simple_seq = sig type 'a seq val of_string : ?terminator:char -> string -> char seq val iter : ('a -> unit) -> 'a seq -> unit val to_list : 'a seq -> 'a list end
Here 'a seq
is the type for a stream implementation, which we do not
want to expose. Therefore, the type is abstract. In the running
example the input is given as
a string, which we then have to convert to (or, view as) a
stream. It is often convenient (see the FILE IO in C) to have an
explicit terminator as the element of the stream itself -- so we do not
have to look for an out-of-band mechanism to detect termination. Therefore,
the stream constructor of_string
accepts, as an optional argument,
the character to add to the stream at the very end.
The natural stream observation functions are to_list
(to convert a stream
to an OCaml list, which can be easily shown), and iter
,
to print out the stream elements.
Here is our first pipeline, to build a stream and observe it, as a list:
let terminator = '\000' let r = of_string ~terminator >> to_listTo run it, we need an implementation of
simple_seq
. OCaml sequences
from the standard library fit well (see the accompanying source code).
The result of running the initial pipeline is then
- : char list = ['1'; '0'; '0'; ','; '2'; '0'; '0'; ','; '3'; '0'; '0'; '|'; '4'; '0'; '0'; '|'; '5'; '0'; '0'; ','; '6'; '0'; '0'; '|'; '7'; '0'; '0'; ','; '8'; '0'; '0'; ','; '9'; '0'; '0'; '|'; '1'; '0'; '0'; '0'; '\000']
simple_seq
API with an operation to build machines.
One of the most general is Mealy machine: a state machine that reads
an input, perhaps outputs something and changes the state in any case.
It can be described by the following API, with the unified
transition/output function. The name map_accum_filter
is historical:
the Mealy machine under this name has already appeared in an earlier article.
module type simple_seq = sig include simple_seq val map_accum_filter : 'state -> ('state ->'a -> 'b option * 'state) -> 'a seq -> 'b seq endAs for implementation, the OCaml standard library provides
filter_map
on
sequences, which is easy to make stateful (accumulating): see the
accompanying code.
The parser for sequences of digits is an instance of Mealy machine. Its state is the accumulator for the parsed number, and is, strictly speaking, unbounded. In practice, the commonly used integer data types are bounded.
let parse_ints : char seq -> (int * char) seq = map_accum_filter 0 @@ begin fun s -> function | '0' .. '9' as c -> (None,10*s + (Char.code c - Char.code '0')) | c -> (Some (s,c),0) endThe parser hence converts a sequence of characters -- stretches of digits terminated by a non-digit -- into a stream of numbers paired with the delimiting character. In short, we collapse the sequences of digits into the corresponding integers.
The pipeline of the previous section can now be extended to:
of_string ~terminator >> parse_ints >> to_listwhich produces
- : (int * char) list = [(100, ','); (200, ','); (300, '|'); (400, '|'); (500, ','); (600, '|'); (700, ','); (800, ','); (900, '|'); (1000, '\000')]We are ready for grouping and aggregation.
of_string ~terminator >> parse_ints >> map_accum_filter 0 (fun s -> function | (n,',') -> (None, n + s) | (n,c) -> (Some (n+s,c),0) ) >> to_listwhose output is
- : (int * char) list = [(600, '|'); (400, '|'); (1100, '|'); (2400, '|'); (1000, '\000')]
The accumulation code we just wrote is not particularly tied to summation: it could equally apply, after adjustments, to aggregating (or, reducing) any sequences of items separated by a given delimiter. In other words, the code can perform reduction over any monoid:
type 'a monoid = {unit: 'a; op: 'a -> 'a -> 'a}that is: a set with an (associative) binary operation that has a neutral (unit) element. We ought to extract and name this general pattern:
let group_by_aggregate : 'c -> 'a monoid -> ('a*'c) seq -> ('a*'c) seq = fun sep m -> map_accum_filter m.unit (fun s (x,c) -> let s' = m.op s x in if c = sep then (None,s') else (Some (s',c), m.unit))
The pipeline now reads:
of_string ~terminator >> parse_ints >> group_by_aggregate ',' {unit=0;op=(+)} >> to_listproducing, as before
- : (int * char) list = [(600, '|'); (400, '|'); (1100, '|'); (2400, '|'); (1000, '\000')]
We have indeed summed up the comma-separated integers. The next task is to find the maximum of the resulting numbers. It is also a monoid reduction, over a different monoid:
of_string ~terminator >> parse_ints >> group_by_aggregate ',' {unit=0;op=(+)} >> group_by_aggregate '|' {unit=Int.min_int;op=max} >> to_listIt produces the final answer:
- : (int * char) list = [(2400, '\000')]
The original Advent of Code 2022 problem used a different input format, similar to the gnuplot: one number per line, with groups separated by empty lines. This format is easily accommodated, with only minor changes to the number parser: see the accompanying code for detail.
The article relied on the standard OCaml sequences to implement the stream API. If we use strymonas instead, with an appropriate backend, we obtain, mechanically, the C code for our pipeline:
void main(){ int x_1 = -2147483648; int x_2 = 0; int x_3 = 0; bool x_4 = true; while (x_4) { int const t_5 = getc_term(0); if (t_5 == 0) x_4 = false; int const t_6 = x_3; if ((t_5 >= 48) && (t_5 <= 57)) x_3 = (10 * t_6) + (t_5 - 48); else { x_3 = 0; int const t_7 = x_2; int const t_8 = t_7 + t_6; if (t_5 == 44) x_2 = t_8; else { x_2 = 0; int const t_9 = x_1; int const t_10 = (t_9 < t_8 ? t_8 : t_9); if (t_5 == 124) x_1 = t_10; else { x_1 = -2147483648; printf("%d\n",t_10); } } } } }An input character is processed immediately without any buffering; the processing loop indeed runs in constant memory.
adv22.c [<1K]
Complete generated C code