- Grouping and aggregation queries/processing are quite common in data
analysis -- and in job interview questions and contest problems. One
such job interview question was the subject of an article
Grasping
`all-the-apples-at-once' that looked at many solutions in many
programming languages. Some people realized that the question was
about grouping and aggregation -- and hence were able to give clear,
obviously correct solutions, with no doubts about edge cases or
iteration ranges. Many programming languages and their libraries have
good facilities for grouping and aggregation. Once we know they are
needed, using them is relatively easy, for a competent programmer.
The hardest part is figuring out what the question is all about: the
key words.
If the needed vocabulary -- the key words -- are not immediately clear, we can look for them: building a solution step-by-step, recognizing patterns, generalizing and naming them. Stream processing especially fits this stepwise approach. It lets us build pipelines one segment at a time, by ``coupling programs like garden hose -- screw in another segment when it becomes necessary to massage data in another way.'' (Doug McIlroy, 1964). We have seen such a program development in the cited article. The present article gives another illustration of the technique, on a twice more complicated problem.

The solution we develop is clear not just to the eyes -- to the computer as well. The data are accessed strictly sequentially; each input character is processed immediately without buffering. The entire pipeline may hence run in truly constant memory. We indeed obtain -- mechanically generate -- C code for such efficient processing, which worked, correctly, on first try. Clarity and performance do not have to be in opposition.

- Our running example is a rather typical data analysis problem,
originally posed in Advent of Code 2022. It was explained in a rather
long-wound, literate way. Here is a short summary (taken from
Abeysinghe and Rompf):
The input is a sequence of numbers separated by pipes into chunks where each chunk contains multiple comma-separated numbers. The task is to compute the sum of each chunk and find the maximum across all chunks.
A sample input is as follows:

`100,200,300|400|500,600|700,800,900|1000`

The original problem used newlines and empty lines as separators. We return to it later.

Looking ahead, the eventual solution is

of_string ~terminator:'\000' >> parse_ints >> group_by_aggregate ',' {unit=0;op=(+)} >> group_by_aggregate '|' {unit=Int.min_int;op=max} >> iter (fun (n,_) -> print_int n)

It is a pipeline, of (stream) operations connected by the left-to-right functional composition`>>`

. The pipeline is typed. The types are not shown since they are all inferred -- one may still attach type annotations if desired, e.g., for documentation purposes.Our solution is incremental: the data are read as a stream, one character at a time, and processed one character at a time, with no buffering. With a good implementation for streams (see later for the C code), the processing is done in truly constant time, without creating any garbage along the way.

The rest of the page shows how it was developed.

**References**- <https://adventofcode.com/2022/day/1>

Advent of Code 2022, Day 1 problemS. Abeysinghe and T. Rompf. Rhyme: a data-centric expressive query language for nested data structures. PADL 2024.

adv22.ml [12K]

Complete code for the article

- A problem is easier to solve if it can be decomposed into smaller
steps: for example, represented as a streaming pipeline. Our problem,
luckily, fits streaming processing. Indeed, adding new characters to
the input does not affect the parsing and interpretation of the
earlier portion of the input. Although extending the input may change
the global maximum,
`max`

is associative, can be associated to the left and hence computed incrementally.To solve our problem using streams we need an API for streams (and an implementation of that API). At the very least, we should be able to construct and observe a stream. The initial API would hence look like:

module type simple_seq = sig type 'a seq val of_string : ?terminator:char -> string -> char seq val iter : ('a -> unit) -> 'a seq -> unit val to_list : 'a seq -> 'a list end

Here

`'a seq`

is the type for a stream implementation, which we do not want to expose. Therefore, the type is abstract. In the running example the input is given as a string, which we then have to convert to (or, view as) a stream. It is often convenient (see the FILE IO in C) to have an explicit terminator as the element of the stream itself -- so we do not have to look for an out-of-band mechanism to detect termination. Therefore, the stream constructor`of_string`

accepts, as an optional argument, the character to add to the stream at the very end.The natural stream observation functions are

`to_list`

(to convert a stream to an OCaml list, which can be easily shown), and`iter`

, to print out the stream elements.Here is our first pipeline, to build a stream and observe it, as a list:

let terminator = '\000' let r = of_string ~terminator >> to_list

To run it, we need an implementation of`simple_seq`

. OCaml sequences from the standard library fit well (see the accompanying source code). The result of running the initial pipeline is then- : char list = ['1'; '0'; '0'; ','; '2'; '0'; '0'; ','; '3'; '0'; '0'; '|'; '4'; '0'; '0'; '|'; '5'; '0'; '0'; ','; '6'; '0'; '0'; '|'; '7'; '0'; '0'; ','; '8'; '0'; '0'; ','; '9'; '0'; '0'; '|'; '1'; '0'; '0'; '0'; '\000']

**References**- adv22.ml [12K]

Complete code for the article

- The obvious next step is to parse the sequences of digits as
integers. We need a parser. Parsers are commonly finite- or
infinite-state (e.g., push-down) machines. Therefore, we
extend the
`simple_seq`

API with an operation to build machines.One of the most general is Mealy machine: a state machine that reads an input, perhaps outputs something and changes the state in any case. It can be described by the following API, with the unified transition/output function. The name

`map_accum_filter`

is historical: the Mealy machine under this name has already appeared in an earlier article.module type simple_seq = sig include simple_seq val map_accum_filter : 'state -> ('state ->'a -> 'b option * 'state) -> 'a seq -> 'b seq end

As for implementation, the OCaml standard library provides`filter_map`

on sequences, which is easy to make stateful (accumulating): see the accompanying code.The parser for sequences of digits is an instance of Mealy machine. Its state is the accumulator for the parsed number, and is, strictly speaking, unbounded. In practice, the commonly used integer data types are bounded.

let parse_ints : char seq -> (int * char) seq = map_accum_filter 0 @@ begin fun s -> function | '0' .. '9' as c -> (None,10*s + (Char.code c - Char.code '0')) | c -> (Some (s,c),0) end

The parser hence converts a sequence of characters -- stretches of digits terminated by a non-digit -- into a stream of numbers paired with the delimiting character. In short, we collapse the sequences of digits into the corresponding integers.The pipeline of the previous section can now be extended to:

of_string ~terminator >> parse_ints >> to_list

which produces- : (int * char) list = [(100, ','); (200, ','); (300, '|'); (400, '|'); (500, ','); (600, '|'); (700, ','); (800, ','); (900, '|'); (1000, '\000')]

We are ready for grouping and aggregation. **References**- adv22.ml [12K]

Complete code for the article

- Looking at the stream with parsed digit sequences suggests the next
step: summing up the comma-separated integers. The summing up is also
a Mealy machine:
of_string ~terminator >> parse_ints >> map_accum_filter 0 (fun s -> function | (n,',') -> (None, n + s) | (n,c) -> (Some (n+s,c),0) ) >> to_list

whose output is- : (int * char) list = [(600, '|'); (400, '|'); (1100, '|'); (2400, '|'); (1000, '\000')]

The accumulation code we just wrote is not particularly tied to summation: it could equally apply, after adjustments, to aggregating (or, reducing) any sequences of items separated by a given delimiter. In other words, the code can perform reduction over any monoid:

type 'a monoid = {unit: 'a; op: 'a -> 'a -> 'a}

that is: a set with an (associative) binary operation that has a neutral (unit) element. We ought to extract and name this general pattern:let group_by_aggregate : 'c -> 'a monoid -> ('a*'c) seq -> ('a*'c) seq = fun sep m -> map_accum_filter m.unit (fun s (x,c) -> let s' = m.op s x in if c = sep then (None,s') else (Some (s',c), m.unit))

The pipeline now reads:

of_string ~terminator >> parse_ints >> group_by_aggregate ',' {unit=0;op=(+)} >> to_list

producing, as before- : (int * char) list = [(600, '|'); (400, '|'); (1100, '|'); (2400, '|'); (1000, '\000')]

We have indeed summed up the comma-separated integers. The next task is to find the maximum of the resulting numbers. It is also a monoid reduction, over a different monoid:

of_string ~terminator >> parse_ints >> group_by_aggregate ',' {unit=0;op=(+)} >> group_by_aggregate '|' {unit=Int.min_int;op=max} >> to_list

It produces the final answer:- : (int * char) list = [(2400, '\000')]

The original Advent of Code 2022 problem used a different input format, similar to the gnuplot: one number per line, with groups separated by empty lines. This format is easily accommodated, with only minor changes to the number parser: see the accompanying code for detail.

**References**- adv22.ml [12K]

Complete code for the article

- As before, once we have recognized what the problem is all about --
the reduction over a monoid -- the clear
*and*efficient solution is immediate. Incidentally, the number parsing can also be presented as a monoid reduction. Do you see that?The article relied on the standard OCaml sequences to implement the stream API. If we use strymonas instead, with an appropriate backend, we obtain, mechanically, the C code for our pipeline:

void main(){ int x_1 = -2147483648; int x_2 = 0; int x_3 = 0; bool x_4 = true; while (x_4) { int const t_5 = getc_term(0); if (t_5 == 0) x_4 = false; int const t_6 = x_3; if ((t_5 >= 48) && (t_5 <= 57)) x_3 = (10 * t_6) + (t_5 - 48); else { x_3 = 0; int const t_7 = x_2; int const t_8 = t_7 + t_6; if (t_5 == 44) x_2 = t_8; else { x_2 = 0; int const t_9 = x_1; int const t_10 = (t_9 < t_8 ? t_8 : t_9); if (t_5 == 124) x_1 = t_10; else { x_1 = -2147483648; printf("%d\n",t_10); } } } } }

An input character is processed immediately without any buffering; the processing loop indeed runs in constant memory. **References**- strymonas: Highest-performance Stream Processing
adv22.c [<1K]

Complete generated C code