Looking at Java 22: Stream Gatherers
The Stream API provides a rich and versatile set of intermediate operations, even though certain ops are still missing or are hard to implement with the given ones.
Whereas terminal ops are quite customizable by writing our own Collector
, an equivalent wasn’t available until now as a preview: Stream Gatherers.
Table of Contents
Custom Intermediate Operations (before 22)
Let’s look at a non-customizable intermediate operation to understand better what Gatherers can provide and how to use them to fill in the gaps of missing functionality in any Stream pipeline.
A good example of a limited op is distinct
.
It’s a simple operation that collects all unique elements according to equals(Object)
without an option to provide our own criteria for distinctiveness.
From an implementation point of view, this operation requires two things:
- Internal state to hold the unique elements found
- A way to compare a passing element with the already found ones
As there’s only a single criterion available, the OpenJDK uses a LinkedHashSet
to delegate the necessary work.
To implement a customizable distinct op without using Stream Gatherers, we can choose between two ways to do so:
- Either create a wrapper type handling the custom criterion in it’s
equals
/hashCode
method… - … or use a
filter
op that captures an outside state
Creating a Custom Type for distinct
Instead of creating a fully generic solution, I want to do a more specific one as it still illustrates the point I want to make and is more straightforward.
Let’s say we want to distinguish String
elements by their length.
To do so, we’d need to map the Stream’s element to a distinct()
-compatible type, call distinct()
, and map the custom type back to a String
:
The implementation of the custom type itself isn’t that relevant, but in this case, it is quite straightforward:
This approach works but greatly complicates a simple task by introducing an additional type and requiring two mapping ops plus a distinct()
call.
Capturing Outside State
The second option uses a filter
op in combination with an outside state to do its job:
This approach is also an undesirable one. It must keep track of all seen elements, making the operation reliant on an outside captured state. This is problematic because capturing an external state affects a stream’s capabilities, such as efficiency and parallelizability.
A better solution is to use a Gatherer and the new gather
intermediate operation.
Gatherer<T, A, R>
The operation gather
transforms a Stream of input elements into a Stream of output elements and is, therefore, a “mapper”-like operation.
Unlike map
, which maps one-to-one, and flatMap
, which maps one-to-many, the gather
operation is more versatile and supports all variations:
- one-to-one
- one-to-many
- many-to-one
- many-to-many
Thanks to their flexibility, Gatherers enable a variety of tasks:
- Grouping elements into batches mid-Stream
- Custom de-duplication/distinct operations
- Incremental accumulation or reordering of elements
- Transform infinite streams into finite ones
Behind the Scenes
The concept of Gatherers is heavily influenced by Collectors, and they share the same generic signature: <T, A, R>
- T
- โ the type of input elements to the gather operation
- A
- โ the mutable state to keep track of previously seen elements.
- R
- โ the result type of output elements.
Even though both share a common modus operandi, they still use slightly different terminology, method names, and the involved types differ, too:
Collector<T, A, R> | Gatherer<T, A, R> |
---|---|
Creating a results containerSupplier<A> initializer() | Creating a potentially mutable stateSupplier<A> initializer() |
Incorporating a new element to the result containerBiConsumer<A, T> accumulator() | Integrating a new input elementGatherer.Integrator<A, T, R> integrator() |
Combining two result containersBinaryOperator<A> combiner() | Combining two statesBinaryOperator<A> combiner() |
Performing an optional final transform on the containerFunction<A, R> finisher() | Performing an optional final operationBiConsumer<A, Gatherer.Downstream<R>> finisher() |
The initializer, a Supplier<A>
, creates the Gatherer’s private state, which is accessible by the other methods.
The integrator, a Gatherer.Integrator<A,T,R>
, accepts three arguments: the state (A
), the current element (T
), and a Gatherer.Downstream<R>
.
It returns a boolean
, which can short-circuit the processing by returning false
to indicate that no further elements need to be processed.
The combiner, a BinaryOperator<A>
, is used for parallel gathering.
There are ways to simplify creating a Gatherer if it’s only supposed to run sequentially.
And finally, the finisher, a BiConsumer<A, Gatherer.Downstream<R>>
, an optional operation after all elements have been processed.
Creating a Gatherer
To create the Gatherer<T, A, R>
, first, the involved types must be determined.
For our example of “uniqueness by length”, it needs to handle String
elements.
The state must only keep track of the seen elements’ length, as the integrator
has access to the state and the current element, making a Set<Integer>
the apparent choice.
Finally, the resulting elements are still of type String
.
These types lead to the following signature for our Gatherer:
Once the Gatherer is implemented, the previous Stream pipeline gets simplified to the code below:
We still need the custom logic, but it’s still quite an improvement over a custom type plus multiple additional ops, or capturing outside state and more ops.
Implementing the Gatherer is either done by implementing Gatherer<T, A, R>
ourselves, which is quite tedious.
Instead, we can choose the more convenient approach of using one of the type’s static of...
methods:
The ofSequential
factory method creates a stateful but non-parallelizable Gatherer.
But there are multiple overloads and parallelizable variants available.
First, the supplier
must create a state instance of type A
.
Next, the integrator
, a specialized BiFunction
variant of type Gatherer.Integrator<A,T,R>
defines the logic being called for each element.
It has access to the state, the current element, and the downstream.
And finally, the finisher
.
In our case, no additional actions are needed, so the no-op defaultFinisher
will do.
As you can see, it’s pretty similar to creating a Collector
.
Unlike a Collector
, though, the Stream isn’t terminated, and the operation does not immediately trigger data processing.
Instead, elements are pushed downstream as required.
We can push elements into the interface Gatherer.Downstream<T>
but should test if the downstream is still accepting new elements by checking isRejecting()
.
Built-in Gatherers
Another thing Gatherers share with Collectors is that you don’t have to start from scratch.
Like the Collectors
utility type, there a Gatherers
type, too.
However, there are only five Gatherers available:
fold(Supplier<R> initial, BiFunction<R, T, R> folder)
:
A many-to-one Gatherer used when no combiner-function can be implemented or when it depends on the intrinsic order.
mapConcurrent(int maxConcurrency, Function<T, R> mapper)
:
A one-to-one Gatherer that maps the elements concurrently using Virtual Threads, even when used in non-parallel Streams.
scan(Supplier<R> initial, BiFunction<R, T, R> scanner)
:
Another one-to-one Gatherer for _prefix scans, an incremental accumulation that starts with the value returned by initial
and computes subsequent values by applying the current value with the current element.
windowFixed(int windowSize)
:
The first of two available window...
Gatherers that gather Stream elements many-to-many into encounter-ordered element groups, so called windows.
Each window has the maximum length of windowSize
and the elements are split up accordingly.
windowSliding(int windowSize)
:
The second window...
Gatherer also creates windows, but with a twist.
Instead of using each element only once, each subsequent window contains all expect the first element of the previous one, plus the next element from the Stream.
Check out the documentation for more details and examples.
Conclusion
The introduction of Stream pipelines with Java 8 marked a new age for sophisticated data processing tasks in Java. However, even though the JDK included a lot of operations, there were always particular tasks that couldn’t be implemented in a straightforward way or even at all!
With Gatherers, there’s now a (preview) way to create versatile and flexible intermediate operations that open up new data processing and analysis possibilities for more effective and purposeful Stream pipelines.
A few ready-to-use gatherers are already available, but we can easily create our own toolbelt for commonly used operations. And I’m sure the Java ecosystem will provide a plethora of capabilities over time!
Resources
- JEP 461: Stream Gatherers (Preview)
- Java 22 Core Library Documentation (Oracle)
- Using Preview Features (Oracle)
- Write Your Own Stream Operations (HappyCoder.eu)