Streams and Incremental Processing

 
Stream processing defines a pipeline of operators that transform, combine, or reduce (even to a single scalar) large amounts of data. Characteristically, data is accessed strictly linearly rather than randomly and repeatedly -- and processed uniformly. The upside of the limited expressiveness is the opportunity to process large amount of data efficiently, in constant and small space.

 

Introduction

Stream-wise data processing, already present in one of the first programming languages (COBOL), has been coming to forefront with the popularity of big data and MapReduce. The widespread automated trading is one example of extremely high-performant stream processing; software-defined radio is another such example. The uniformity and the predictability of data access help efficiently handle vast amount of data, far more than fits within the memory of a single processor.

Stream processing also exhibits the painful abstraction vs. performance trade-off. Manually written loops and state-machines offer the highest performance and the least memory overhead, but are not reusable or extensible. Libraries of freely composable stream components let programmers quickly assemble an obviously correct stream application, but suffer from the high overhead of abstractions, mainly due to the repeated creation and disposal of intermediate data structures such as closures, captured continuations, objects and collections -- let alone intermediate streams. Eliminating such intermediate structures is broadly known as stream fusion.

According to a survey of stream processing, ``Within computer science the term stream has been attributed to P.J. Landin, formulated during the development of operational constructs presented as part of his work on the correspondence between ALGOL 60 and the lambda-calculus. Indeed, we note that P.J. Landin's original use for streams was to model the histories of loop variables, but he also observed that streams could have been used as a model for I/O in ALGOL 60.''

References
Melvin E. Conway: Design of a Separable Transition-diagram Compiler
Commun. ACM, July 1963, 396--408 doi:10.1145/366663.366704
This is the paper that for the first time introduced streams, coroutines and stream fusion -- in the context of building a one-pass COBOL compiler in assembly

R. Stephens: A Survey Of Stream Processing
Acta Informatica, July 1997, Volume 34, Issue 7, pp 491–541

All Things Flow: Unfolding the History of Streams

Even Better Stream Fusion

Stream Seminar
<http://www.win.tue.nl/~hzantema/strsem.html>

Stream Fusion, to Completeness
The Related Work section of the paper

 

Highest-performance Stream Processing

Strymonas is a streaming library for fast, bulk, single-thread in-memory processing -- of the sort epitomized by Software Defined Radio or Java Streams. It attains the speed and memory efficiency of hand-written state machines, yet provides the familiar declarative interface for finite and infinite streams, supporting any combination of map, filter, take(while), drop(while), zip, flatmap combinators and tupling. Experienced users may use the lower-level interface of stateful streams and implement accumulating maps, compression and windowing.

The library is based on assured code generation (at present, OCaml, C and Scala/Java) and guarantees in all cases complete fusion: if each operation in a pipeline individually runs without any function calls and memory allocations, the entire streaming pipeline runs without calls and allocations. Thus strymonas per se introduces not even constant-size intermediary data structures. The main processing loop thus may run in constant memory and stack space.

In essence, strymonas is a DSL that generates high-performance single-core stream processing code from declarative descriptions of stream pipelines and user actions -- something like Yacc. Unlike (ocaml)yacc, strymonas is an embedded DSL. Therefore, it integrates as is with the existing OCaml/Scala code and tools. Any typing or pipeline mis-assembling errors are reported immediately (even during editing).

There are two flavors of the library, with the host language being OCaml and Scala 3. The C generation back-end needs no other dependencies; the OCaml back-end relies on BER MetaOCaml; the Scala backend uses Scala 3 native metaprogramming facilities.

The present strymonas is the completely re-written and the much extended and improved version of the library described in POPL 2017 paper. The main differences are:

Joint work with Aggelos Biboudis, Tomoaki Kobayashi and Nick Palladinos.

Version
The current version is September 2023
References
strymonasv2-intro.pdf [428K]
Introduction to strymonas v2: the paper presented at the ACM SIGPLAN OCaml 2022 Workshop (Ljubljana, Slovenia, September 16, 2022) doi:10.48550/arXiv.2211.13461

<http://strymonas.github.io/>
The official, user-facing repository repository (not the development repository)

<https://github.com/strymonas/strymonas-ocaml/tree/main/examples/TryFirst>
Step-by-step explanation of the main facilities of the library

<https://github.com/strymonas/strymonas-ocaml/tree/main/examples>
Further, realistic examples: sliding-window, run-length-encoding

<https://github.com/strymonas/strymonas-ocaml/tree/main/benchmarks>
Extensive benchmarks, comparing strymonas with the hand-written code and other (OCaml) libraries

Complete Stream Fusion for Software-Defined Radio

Stream Fusion, to Completeness
The earlier version of the library

 

Stream Fusion, to Completeness

Stream processing is mainstream (again): Widely-used stream libraries are now available for virtually all modern OO and functional languages, from Java to C# to Scala to OCaml to Haskell. Yet expressivity and performance are still lacking. For instance, the popular, well-optimized Java 8 streams do not support the zip operator and are still an order of magnitude slower than hand-written loops.

We present the first approach that represents the full generality of stream processing and eliminates overheads, via the use of staging. It is based on an unusually rich semantic model of stream interaction. We support any combination of zipping, nesting (or flat-mapping), sub-ranging, filtering, mapping --of finite or infinite streams. Our model captures idiosyncrasies that a programmer uses in optimizing stream pipelines, such as rate differences and the choice of a ``for'' vs. ``while'' loops. Our approach delivers hand-written--like code, but automatically. It explicitly avoids the reliance on black-box optimizers and sufficiently-smart compilers, offering highest, guaranteed and portable performance.

Our approach relies on high-level concepts that are then readily mapped into an implementation. Accordingly, we have two distinct implementations: an OCaml stream library, staged via MetaOCaml, and a Scala library for the JVM, staged via LMS. In both cases, we derive libraries richer and simultaneously many tens of times faster than past work. We greatly exceed in performance the standard stream libraries available in Java, Scala and OCaml, including the well-optimized Java 8 streams.

Joint work with Aggelos Biboudis, Nick Palladinos and Yannis Smaragdakis.

Version
The current version is January 2017
References
Highest-performance Stream Processing
The new version of the library

strymonas.pdf [614K]
<https://arxiv.org/abs/1612.06668>
The complete paper, whose shorter version (without Appendices) is published in the Proceedings of POPL 2017 doi:10.1145/3009837

<http://strymonas.github.io/>
The complete code for both MetaOCaml and Scala/Java versions of the strymonas library, and the complete code for all benchmarks. The code received the Artifact Evaluated badge from the POPL 2017 artifact evaluation committee.

Even Better Stream Fusion
This talk explains at the end the essence of complete fusion: eliminating even constant-size intermediary data structures

 

All Things Flow: Unfolding the History of Streams

Heraclitus observed that all things flow and nothing remains still; ``you cannot step into the same river twice''. So what is a stream in computer science, and where did this notion come from? We divide streaming abstractions into four categories: Following these four axes, we unfold the history of streams, and give an overview of how this abstraction started to come into existence as a mainstream programming language facility. Our goal is to present briefly the related concepts through literature review, drawing connections between programming language features and technologies. This discussion will be of interest to the young computer science researcher, the curious software engineer, and the grizzled database query optimization specialist.

The joint work with Aggelos Biboudis and Jeremy Gibbons.

Version
The current version is October 2021
References
streams-hapoc2021.pdf [363K]
The extended abstract published in the Proceedings of HAPOC 21: 6th International Conference on the History and Philosophy of Computing (October 27-29, Zuerich, Switzerland)

 

Even Better Stream Fusion

Stream processing is one of the key data processing modes, related to dataflow programming. It was dominant in the punch-card era, and is becoming prevalent again, in the era of huge data, ubiquitous sensors and distributed computing. Its characteristic is incremental, sequential processing with bounded buffering, which lets one handle possibly unbounded amount of data in limited space. Another characteristic is the ease of specifying it as a Xmas-lights diagram: if some further processing is needed, just plug in another segment.

Although the diagrams are easy to draw, they are difficult to implement with low latency and in low memory. This talk is about the key optimization: stream fusion, which is combining several simple processing steps into one complex step, reducing the amount of intermediary data and communication overhead. Specifically, we will talk about complete fusion: not just reduction but complete elimination. This is hard, especially for diagrams with "fat pipes" (flatmap) and "joins" (zip).

This talk introduces the ongoing work on strymonas, which is a high-performance code generation library (DSL) that converts a diagram-like specification into hand-written-like code -- with assured complete fusion. We describe the main ideas behind the complete fusion of diagrams with joins, and illustrate on the example of the software FM radio.

References
oxford-seminar.pdf [923K]
Slides of the talk presented at the Seminar on Tensor Computation, University of Oxford Department of Computer Science. February 18, 2022.

<http://www.cs.ox.ac.uk/seminars/2447.html>
The web page of the seminar talk, with the pointer to the YouTube recording

 

Complete Stream Fusion for Software-Defined Radio

Software-Defined Radio (SDR) is widely used not only as a practical application but also as a fitting benchmark of high-performance signal processing. We report using the SDR benchmark -- specifically, FM Radio reception -- to evaluate the recently developed single-thread stream processing library strymonas, contrasting it with the synchronous dataflow system StreamIt. Despite the absence of parallel processing or windowing as a core primitive, strymonas turns out to easily support SDR, offering high expressiveness and performance, approaching the peak single-core floating-point performance, sufficient for real-time FM reception.

Joint work with Tomoaki Kobayashi.

Version
The current version is August 2022
References
sdr-bench.pdf [437K]
Preprint (ArXiv, cs.PL) doi:10.48550/arXiv.2208.08732

 

Functional Stream Libraries and Fusion: What's Next?

This meeting on the state of the art and challenges of (functional) stream processing took place at the Shonan Village Center, Kanagawa, Japan on October 22--25, 2018.

Functional stream libraries let us easily build stream processing pipelines, by composing sequences of simple transformers such as map or filter with producers (backed by an array, a file, or a generating function) and consumers (reducers). The purely applicative approach of building a complex pipeline from simple immutable pieces simplifies programming and reasoning: the assembled pipeline is an executable specification. To be practical, however, a library has to be efficient: at the very least, it should avoid creating intermediate structures -- especially structures like files and lists whose size grows with the length of the stream. Even the bounded-size intermediate structures significantly, up to two orders of magnitude, slow down the processing. Eliminating the intermediate structures is the central problem in stream processing: so-called stream fusion.

Stream fusion has been the subject of intensive research since late 1950's. By now, the low-hanging fruit in stream processing has been all picked up -- although some of it quite recently, POPL 2017. Stream fusion made it to the front page of CACM (May 2017). Java 8 Streams and Haskell compilers, among others, have implemented some of the earlier research results. We have attained a milestone. What are the further challenges?

That was the topic of many discussions at the workshop. Several main questions have come up over and over again:

As a tangible outcome, the meeting has identified a set of problems -- challenges -- to help drive and evaluate further research: filterMax, sorted merge, multiple appends, parallel merge. The report of the meeting discusses them in detail.

The workshop is organized together with Aggelos Biboudis and Martin Odersky. The Shonan seminar series is sponsored by Japan's National Institute of Informatics (NII).

References
<http://shonan.nii.ac.jp/seminars/136/>
The web page of the seminar

shonan-streams.pdf [280K]
The final report

 

Streams in Linear Algebra

The linear matrix library LinAlg is built upon matrix streams, to access the whole matrix or its column, row, diagonal, etc. as a linear, possibly mutable sequence. Many of Linear Algebra algorithms turn out to require only sequential access to a matrix or its parts, which is simpler and faster than random access. All streams in LinAlg are light-weight: they use no heap storage and leave no garbage.

A stream-wise access to a collection is an important access method, which may even be supported by hardware. For example, Pentium III floating-point extension (Internet Streaming SIMD Extension) lets programmers designate arrays as streams and provides instructions to handle such data efficiently (Internet Streaming SIMD Extensions, Shreekant (Ticky) Thakkar and Tom Huff, Computer, Vol. 32, No. 12, December 1999, pp. 26-34). Streaming is a typical memory access model of DSPs: that's why DSP almost never incorporate a data cache (See ``DSP Processors Hit the Mainstream'', Jennifer Eyre and Jeff Bier, Computer, Vol. 31, No. 8, August 1998, pp. 51-59). A memory architecture designed in the article ``Smarter Memory: Improving Bandwidth for Streamed References'' (IEEE Computer, July 1998, pp.54-63) achieves low overall latencies because the CPU is told by a compiler that a stream operation is to follow. LinAlg offers this streaming access model to an application programmer.

Matrix streams may stride a matrix by an arbitrary amount. This lets us traverse a matrix along the diagonal, by columns, by rows, etc. Streams can be constructed of a Matrix itself, or from other matrix views (MatrixColumn, MatrixRow, MatrixDiag). In the latter case, the streams are confined only to the specific portions of the matrix.

Many functions of LinAlg are written in terms of streams: for example, the computation of vector norms, the addition of a vector to the diagonal or the anti-diagonal of a matrix, Aitken-Lagrange interpolation. Singular value decomposition SVD demonstrates many applications of streams: e.g., multiplying a matrix by a rotation matrix avoids random access to matrix elements and the corresponding range checks and offset calculations. The stream code is also more lucid. One may create a stream that spans over a part of another stream. We use substreams, for example, to efficiently reflect the upper triangle of a square matrix onto the lower one, yielding a symmetric matrix. The SVD computation uses subranging extensively, e.g., for left Householder transformations.

LinAlg's streams may span an arbitrary rectangular block of a matrix, including the whole matrix, a single matrix element, a matrix row or a column, or a part thereof. Assigning a block of one matrix to a block of another takes only one line -- which, due to inlining, is just as efficient as the direct loop with pointer manipulation.

References
LinAlg: Basic Linear Algebra and Optimization classlib

 

How to zip folds: A library of fold transformers (streams)

We show how to merge two folds `elementwise' into the resulting fold. Furthermore, we present a library of potentially infinite ``lists'' represented as folds -- or, streams, or success-failure-continuation--based generators. Whereas the standard Haskell Prelude functions such as map and take transform lists, we transform folds. We implement the range of progressively more complex transformers -- from map, filter, takeWhile to take, drop, dropWhile, and finally, zip and zipWith. The standard list API is also provided.

Emphatically we never convert a stream to a list and so we do not use recursion or recursive types. All iterative processing is driven by the fold itself. Therefore, all operations of the library are assuredly terminating. We only need higher-ranked types, because lists cannot be fully implemented in simply typed lambda-calculus.

The implementation of zip also solves the problem of ``parallel loops''. One can think of a fold as an accumulating loop and realize a nested loop as a nested fold. Representing a parallel loop as a fold is a challenge, answered at the end of the article. This becomes especially interesting in the case of general backtracking computations, or backtracking computations in direct style, with delimited continuations modeling `lists'.

Version
The current version is 1.6, Jun 16, 2008
References
zip-folds.lhs [13K]
Complete literate Haskell code
An earlier version was posted on the Haskell mailing list on Tue, 11 Oct 2005 17:25:24 -0700 (PDT). That version implemented zip with the help of a recursive type. The present version, inspired by a question from Chung-chieh Shan, introduces no extra data types, no recursion, and rather relies on the already defined functions to deconstruct a fold. Version 1.6 adds two examples of surprisingly simple expressions of list intersperse and Fibonacci in terms of fold.

Beyond Church encoding: Boehm-Berarducci isomorphism of algebraic data types and polymorphic lambda-terms

LogicT: backtracking monad transformer with fair operations and pruning
which illustrates the close connection with foldr/build list-fusion, aka ``short-cut deforestation''.

Predecessor and lists are not representable in simply-typed lambda calculus
Therefore, higher-rank or recursive/inductive types are necessary for lists

Parallel composition of streams: several sources to one sink
Folding over multiple streams using monad transformers