Nested Grouping-Aggregation

 

 

Introduction

Grouping and aggregation queries/processing are quite common in data analysis -- and in job interview questions and contest problems. One such job interview question was the subject of an article Grasping `all-the-apples-at-once' that looked at many solutions in many programming languages. Some people realized that the question was about grouping and aggregation -- and hence were able to give clear, obviously correct solutions, with no doubts about edge cases or iteration ranges. Many programming languages and their libraries have good facilities for grouping and aggregation. Once we know they are needed, using them is relatively easy, for a competent programmer. The hardest part is figuring out what the question is all about: the key words.

If the needed vocabulary -- the key words -- are not immediately clear, we can look for them: building a solution step-by-step, recognizing patterns, generalizing and naming them. Stream processing especially fits this stepwise approach. It lets us build pipelines one segment at a time, by ``coupling programs like garden hose -- screw in another segment when it becomes necessary to massage data in another way.'' (Doug McIlroy, 1964). We have seen such a program development in the cited article. The present article gives another illustration of the technique, on a twice more complicated problem.

The solution we develop is clear not just to the eyes -- to the computer as well. The data are accessed strictly sequentially; each input character is processed immediately without buffering. The entire pipeline may hence run in truly constant memory. We indeed obtain -- mechanically generate -- C code for such efficient processing, which worked, correctly, on first try. Clarity and performance do not have to be in opposition.

 

Problem

Our running example is a rather typical data analysis problem, originally posed in Advent of Code 2022. It was explained in a rather long-wound, literate way. Here is a short summary (taken from Abeysinghe and Rompf):
The input is a sequence of numbers separated by pipes into chunks where each chunk contains multiple comma-separated numbers. The task is to compute the sum of each chunk and find the maximum across all chunks.

A sample input is as follows:
100,200,300|400|500,600|700,800,900|1000

The original problem used newlines and empty lines as separators. We return to it later.

Looking ahead, the eventual solution is

    of_string ~terminator:'\000' 
    >> parse_ints  
    >> group_by_aggregate ',' {unit=0;op=(+)}
    >> group_by_aggregate '|' {unit=Int.min_int;op=max}
    >> iter (fun (n,_) -> print_int n)
It is a pipeline, of (stream) operations connected by the left-to-right functional composition >>. The pipeline is typed. The types are not shown since they are all inferred -- one may still attach type annotations if desired, e.g., for documentation purposes.

Our solution is incremental: the data are read as a stream, one character at a time, and processed one character at a time, with no buffering. With a good implementation for streams (see later for the C code), the processing is done in truly constant time, without creating any garbage along the way.

The rest of the page shows how it was developed.

References
<https://adventofcode.com/2022/day/1>
Advent of Code 2022, Day 1 problem

S. Abeysinghe and T. Rompf. Rhyme: a data-centric expressive query language for nested data structures. PADL 2024.

adv22.ml [12K]
Complete code for the article

 

Stream vocabulary

A problem is easier to solve if it can be decomposed into smaller steps: for example, represented as a streaming pipeline. Our problem, luckily, fits streaming processing. Indeed, adding new characters to the input does not affect the parsing and interpretation of the earlier portion of the input. Although extending the input may change the global maximum, max is associative, can be associated to the left and hence computed incrementally.

To solve our problem using streams we need an API for streams (and an implementation of that API). At the very least, we should be able to construct and observe a stream. The initial API would hence look like:

    module type simple_seq = sig
      type 'a seq
      val of_string : ?terminator:char -> string -> char seq
      val iter : ('a -> unit) -> 'a seq -> unit
      val to_list : 'a seq -> 'a list
    end

Here 'a seq is the type for a stream implementation, which we do not want to expose. Therefore, the type is abstract. In the running example the input is given as a string, which we then have to convert to (or, view as) a stream. It is often convenient (see the FILE IO in C) to have an explicit terminator as the element of the stream itself -- so we do not have to look for an out-of-band mechanism to detect termination. Therefore, the stream constructor of_string accepts, as an optional argument, the character to add to the stream at the very end.

The natural stream observation functions are to_list (to convert a stream to an OCaml list, which can be easily shown), and iter, to print out the stream elements.

Here is our first pipeline, to build a stream and observe it, as a list:

    let terminator = '\000'
    let r = of_string ~terminator >> to_list
To run it, we need an implementation of simple_seq. OCaml sequences from the standard library fit well (see the accompanying source code). The result of running the initial pipeline is then
    - : char list =
    ['1'; '0'; '0'; ','; '2'; '0'; '0'; ','; '3'; '0'; '0'; '|'; '4'; '0'; '0';
     '|'; '5'; '0'; '0'; ','; '6'; '0'; '0'; '|'; '7'; '0'; '0'; ','; '8'; '0';
     '0'; ','; '9'; '0'; '0'; '|'; '1'; '0'; '0'; '0'; '\000']
References
adv22.ml [12K]
Complete code for the article

 

Parsing

The obvious next step is to parse the sequences of digits as integers. We need a parser. Parsers are commonly finite- or infinite-state (e.g., push-down) machines. Therefore, we extend the simple_seq API with an operation to build machines.

One of the most general is Mealy machine: a state machine that reads an input, perhaps outputs something and changes the state in any case. It can be described by the following API, with the unified transition/output function. The name map_accum_filter is historical: the Mealy machine under this name has already appeared in an earlier article.

    module type simple_seq = sig
      include simple_seq
      val map_accum_filter : 
          'state -> ('state ->'a -> 'b option * 'state) -> 'a seq -> 'b seq
    end
As for implementation, the OCaml standard library provides filter_map on sequences, which is easy to make stateful (accumulating): see the accompanying code.

The parser for sequences of digits is an instance of Mealy machine. Its state is the accumulator for the parsed number, and is, strictly speaking, unbounded. In practice, the commonly used integer data types are bounded.

    let parse_ints : char seq -> (int * char) seq = 
      map_accum_filter 0 @@ begin fun s ->
      function
      | '0' .. '9' as c -> (None,10*s + (Char.code c - Char.code '0'))
      | c          -> (Some (s,c),0)
      end
The parser hence converts a sequence of characters -- stretches of digits terminated by a non-digit -- into a stream of numbers paired with the delimiting character. In short, we collapse the sequences of digits into the corresponding integers.

The pipeline of the previous section can now be extended to:

    of_string ~terminator >> parse_ints >> to_list
which produces
    - : (int * char) list =
    [(100, ','); (200, ','); (300, '|'); (400, '|'); (500, ','); (600, '|');
     (700, ','); (800, ','); (900, '|'); (1000, '\000')]
We are ready for grouping and aggregation.
References
adv22.ml [12K]
Complete code for the article

 

Monoid aggregation

Looking at the stream with parsed digit sequences suggests the next step: summing up the comma-separated integers. The summing up is also a Mealy machine:
    of_string ~terminator >> parse_ints >> 
        map_accum_filter 0 (fun s -> function
          | (n,',') -> (None, n + s)
          | (n,c)   -> (Some (n+s,c),0)
       ) >> to_list
whose output is
    - : (int * char) list =
   [(600, '|'); (400, '|'); (1100, '|'); (2400, '|'); (1000, '\000')]

The accumulation code we just wrote is not particularly tied to summation: it could equally apply, after adjustments, to aggregating (or, reducing) any sequences of items separated by a given delimiter. In other words, the code can perform reduction over any monoid:

    type 'a monoid = {unit: 'a; op: 'a -> 'a -> 'a}
that is: a set with an (associative) binary operation that has a neutral (unit) element. We ought to extract and name this general pattern:
    let group_by_aggregate : 'c -> 'a monoid -> ('a*'c) seq -> ('a*'c) seq =
      fun sep m ->
      map_accum_filter m.unit (fun s (x,c) -> 
        let s' = m.op s x in
        if c = sep then (None,s') else (Some (s',c), m.unit)) 

The pipeline now reads:

    of_string ~terminator >> parse_ints 
      >> group_by_aggregate ',' {unit=0;op=(+)} 
      >> to_list
producing, as before
    - : (int * char) list =
   [(600, '|'); (400, '|'); (1100, '|'); (2400, '|'); (1000, '\000')]

We have indeed summed up the comma-separated integers. The next task is to find the maximum of the resulting numbers. It is also a monoid reduction, over a different monoid:

    of_string ~terminator >> parse_ints
      >> group_by_aggregate ',' {unit=0;op=(+)}
      >> group_by_aggregate '|' {unit=Int.min_int;op=max}
      >> to_list
It produces the final answer:
    - : (int * char) list = [(2400, '\000')]

The original Advent of Code 2022 problem used a different input format, similar to the gnuplot: one number per line, with groups separated by empty lines. This format is easily accommodated, with only minor changes to the number parser: see the accompanying code for detail.

References
adv22.ml [12K]
Complete code for the article

 

Conclusions

As before, once we have recognized what the problem is all about -- the reduction over a monoid -- the clear and efficient solution is immediate. Incidentally, the number parsing can also be presented as a monoid reduction. Do you see that?

The article relied on the standard OCaml sequences to implement the stream API. If we use strymonas instead, with an appropriate backend, we obtain, mechanically, the C code for our pipeline:

    void main(){
      int x_1 = -2147483648;
      int x_2 = 0;
      int x_3 = 0;
      bool x_4 = true;
      while (x_4)
      {
        int const t_5 = getc_term(0);
        if (t_5 == 0)
          x_4 = false;
        int const t_6 = x_3;
        if ((t_5 >= 48) && (t_5 <= 57))
          x_3 = (10 * t_6) + (t_5 - 48);
        else {
          x_3 = 0;
          int const t_7 = x_2;
          int const t_8 = t_7 + t_6;
          if (t_5 == 44)
            x_2 = t_8;
          else {
            x_2 = 0;
            int const t_9 = x_1;
            int const t_10 = (t_9 < t_8 ? t_8 : t_9);
            if (t_5 == 124)
              x_1 = t_10;
            else {
              x_1 = -2147483648;
              printf("%d\n",t_10);
            }
          }
        }
      }
    }
An input character is processed immediately without any buffering; the processing loop indeed runs in constant memory.
References
strymonas: Highest-performance Stream Processing

adv22.c [<1K]
Complete generated C code