Looking at Java 22: Stream Gatherers

 ยท 8 min
AI-generated by DALL-E

The Stream API provides a rich and versatile set of intermediate operations, even though certain ops are still missing or are hard to implement with the given ones.

Whereas terminal ops are quite customizable by writing our own Collector, an equivalent wasn’t available until now as a preview: Stream Gatherers.


Custom Intermediate Operations (before 22)

Let’s look at a non-customizable intermediate operation to understand better what Gatherers can provide and how to use them to fill in the gaps of missing functionality in any Stream pipeline.

A good example of a limited op is distinct. It’s a simple operation that collects all unique elements according to equals(Object) without an option to provide our own criteria for distinctiveness.

From an implementation point of view, this operation requires two things:

  • Internal state to hold the unique elements found
  • A way to compare a passing element with the already found ones

As there’s only a single criterion available, the OpenJDK uses a LinkedHashSet to delegate the necessary work.

To implement a customizable distinct op without using Stream Gatherers, we can choose between two ways to do so:

  • Either create a wrapper type handling the custom criterion in it’s equals/hashCode method…
  • … or use a filter op that captures an outside state

Creating a Custom Type for distinct

Instead of creating a fully generic solution, I want to do a more specific one as it still illustrates the point I want to make and is more straightforward.

Let’s say we want to distinguish String elements by their length. To do so, we’d need to map the Stream’s element to a distinct()-compatible type, call distinct(), and map the custom type back to a String:

java
Stream.of("apple", "orange", "banana", "melon", "pineapple")
      .map(DistinctByLength::new) // MAP TO CUSTOM TYPE
      .distinct()                 // PREFORM DISTINCT
      .map(DistinctByLength::str) // MAP BACK TO STRING
      ...

The implementation of the custom type itself isn’t that relevant, but in this case, it is quite straightforward:

java
public record DistinctByLength(String str) {

  @Override
  public boolean equals(Object obj) {
    return obj instanceof DistinctByLength(String other)
           && str.length() == other.length();
  }

  @Override
  public int hashCode() {
    return str == null ? 0 : Integer.hashCode(str.length());
  }
}

This approach works but greatly complicates a simple task by introducing an additional type and requiring two mapping ops plus a distinct() call.

Capturing Outside State

The second option uses a filter op in combination with an outside state to do its job:

java
Set<Integer> state = new HashSet<>();

Stream.of("apple", "orange", "banana", "melon", "pineapple")
      .filter(str -> state.add(str.length()))
      ...;

This approach is also an undesirable one. It must keep track of all seen elements, making the operation reliant on an outside captured state. This is problematic because capturing an external state affects a stream’s capabilities, such as efficiency and parallelizability.

A better solution is to use a Gatherer and the new gather intermediate operation.


Gatherer<T, A, R>

The operation gather transforms a Stream of input elements into a Stream of output elements and is, therefore, a “mapper”-like operation.

Unlike map, which maps one-to-one, and flatMap, which maps one-to-many, the gather operation is more versatile and supports all variations:

  • one-to-one
  • one-to-many
  • many-to-one
  • many-to-many

Thanks to their flexibility, Gatherers enable a variety of tasks:

  • Grouping elements into batches mid-Stream
  • Custom de-duplication/distinct operations
  • Incremental accumulation or reordering of elements
  • Transform infinite streams into finite ones

Behind the Scenes

The concept of Gatherers is heavily influenced by Collectors, and they share the same generic signature: <T, A, R>

T
โ€“ the type of input elements to the gather operation
A
โ€“ the mutable state to keep track of previously seen elements.
R
โ€“ the result type of output elements.

Even though both share a common modus operandi, they still use slightly different terminology, method names, and the involved types differ, too:

Collector<T, A, R>Gatherer<T, A, R>
Creating a results container
Supplier<A> initializer()
Creating a potentially mutable state
Supplier<A> initializer()
Incorporating a new element to the result container
BiConsumer<A, T> accumulator()
Integrating a new input element
Gatherer.Integrator<A, T, R> integrator()
Combining two result containers
BinaryOperator<A> combiner()
Combining two states
BinaryOperator<A> combiner()
Performing an optional final transform on the container
Function<A, R> finisher()
Performing an optional final operation
BiConsumer<A, Gatherer.Downstream<R>> finisher()

The initializer, a Supplier<A>, creates the Gatherer’s private state, which is accessible by the other methods.

The integrator, a Gatherer.Integrator<A,T,R>, accepts three arguments: the state (A), the current element (T), and a Gatherer.Downstream<R>. It returns a boolean, which can short-circuit the processing by returning false to indicate that no further elements need to be processed.

The combiner, a BinaryOperator<A>, is used for parallel gathering. There are ways to simplify creating a Gatherer if it’s only supposed to run sequentially.

And finally, the finisher, a BiConsumer<A, Gatherer.Downstream<R>>, an optional operation after all elements have been processed.

Creating a Gatherer

To create the Gatherer<T, A, R>, first, the involved types must be determined.

For our example of “uniqueness by length”, it needs to handle String elements. The state must only keep track of the seen elements’ length, as the integrator has access to the state and the current element, making a Set<Integer> the apparent choice. Finally, the resulting elements are still of type String.

These types lead to the following signature for our Gatherer:

java
Gatherer<String,       // T: Incoming Stream's elements
         Set<Integer>, // A: State
         String>       // R: Element type of the returned Stream

Once the Gatherer is implemented, the previous Stream pipeline gets simplified to the code below:

java
Stream.of("apple", "orange", "banana", "melon", "pineapple")
      .gather(distinctByLength)
      ....;

We still need the custom logic, but it’s still quite an improvement over a custom type plus multiple additional ops, or capturing outside state and more ops.

Implementing the Gatherer is either done by implementing Gatherer<T, A, R> ourselves, which is quite tedious. Instead, we can choose the more convenient approach of using one of the type’s static of... methods:

java
Gatherer<String, Set<Integer>, String> distinctByLength =
  Gatherer.ofSequential( 
    // INTIIALIZER
    HashSet::new,

    // INTEGRATOR
    (state, element, downstream) -> {
      var added = state.add(element.length());
      return added ? downstream.push(element)
                   : !downstream.isRejecting();
    },

    // FINISHER
    Gatherer.defaultFinisher()
  );

The ofSequential factory method creates a stateful but non-parallelizable Gatherer. But there are multiple overloads and parallelizable variants available.

First, the supplier must create a state instance of type A.

Next, the integrator, a specialized BiFunction variant of type Gatherer.Integrator<A,T,R> defines the logic being called for each element. It has access to the state, the current element, and the downstream.

And finally, the finisher. In our case, no additional actions are needed, so the no-op defaultFinisher will do.

As you can see, it’s pretty similar to creating a Collector. Unlike a Collector, though, the Stream isn’t terminated, and the operation does not immediately trigger data processing.

Instead, elements are pushed downstream as required. We can push elements into the interface Gatherer.Downstream<T> but should test if the downstream is still accepting new elements by checking isRejecting().


Built-in Gatherers

Another thing Gatherers share with Collectors is that you don’t have to start from scratch.

Like the Collectors utility type, there a Gatherers type, too. However, there are only five Gatherers available:

fold(Supplier<R> initial, BiFunction<R, T, R> folder):
A many-to-one Gatherer used when no combiner-function can be implemented or when it depends on the intrinsic order.

mapConcurrent(int maxConcurrency, Function<T, R> mapper):
A one-to-one Gatherer that maps the elements concurrently using Virtual Threads, even when used in non-parallel Streams.

scan(Supplier<R> initial, BiFunction<R, T, R> scanner):
Another one-to-one Gatherer for _prefix scans, an incremental accumulation that starts with the value returned by initial and computes subsequent values by applying the current value with the current element.

windowFixed(int windowSize):
The first of two available window... Gatherers that gather Stream elements many-to-many into encounter-ordered element groups, so called windows. Each window has the maximum length of windowSize and the elements are split up accordingly.

windowSliding(int windowSize):
The second window... Gatherer also creates windows, but with a twist. Instead of using each element only once, each subsequent window contains all expect the first element of the previous one, plus the next element from the Stream.

Check out the documentation for more details and examples.


Conclusion

The introduction of Stream pipelines with Java 8 marked a new age for sophisticated data processing tasks in Java. However, even though the JDK included a lot of operations, there were always particular tasks that couldn’t be implemented in a straightforward way or even at all!

With Gatherers, there’s now a (preview) way to create versatile and flexible intermediate operations that open up new data processing and analysis possibilities for more effective and purposeful Stream pipelines.

A few ready-to-use gatherers are already available, but we can easily create our own toolbelt for commonly used operations. And I’m sure the Java ecosystem will provide a plethora of capabilities over time!


A Functional Approach to Java Cover Image
Interested in using functional concepts and techniques in your Java code?
Check out my book!
Available in English, Polish, Korean, and soon, Chinese.

Resources

Looking at Java 22