Looking at Java 21: Structured Concurrency

 ยท 8 min
Pixabay on Pexels

Project Loom, with its aim to deliver “easy-to-use, high-throughput, lightweight concurrency”, will most likely change how we approach concurrency in the future. I’ve written about Scoped Values before, but so far, only Virtual Threads (JEP 444) has been released out of preview. But today, we will look at the “glue” that can hold it all together: Structured Concurrency (JEP 453).


What is Structured Concurrency Anyway?

Structured Concurrency is a paradigm that approaches concurrent workloads by encapsulating threads into specialized control flow structures, treating groups of related tasks running in different threads as a single unit of work. This allows for thread hierarchies, leading to simpler thread and error management. As a concept, that’s nothing new, and many languages, like Go, Kotlin, or Swift, support this paradigm, too.

To better understand this approach’s benefits, let’s take a look at its antagonist first: Unstructured Concurrency.

Unstructured Concurrency

When dealing with multiple threads, they’re usually “unstructured”. That means threads are created and have to be managed and observed (mainly) manually. However, there are certain control flow structures already available for thread handling, like the (Completable)Future APIs for asynchronous tasks or a ExecutorService running tasks independently of each other.

Think of an AI to gather traffic data that works with a ExecutorService:

Whoop
TrafficResponse retrieveData() throws ExecutionException, InterruptedException {

  // CREATE INDEPENDENT TASK VIA THE EXECUTORSERVICE
  Future<TrafficData> highwayTask =
    this.executorService.submit(this::retrieveHighwayData);
  Future<TrafficData> localTask =
    this.executorService.submit(this::retrieveLocalData);


  // WAIT FOR THE TASKS TO FINISH
  TrafficData highwayTraffic = highwayTask.get();
  TrafficData localTraffic = localTask.get();

  // RETURN COMBINED RESPONSEE
  return new TrafficResponse(highwayTraffic, localTraffic);
}

Either or both of the tasks can fail independently of each other, as there is no hierarchy or relationship between the actually related tasks. A task can leak without a relationship if an interruption isn’t propagated to sub-tasks.

The pre-existing concurrency APIs provide a certain amount of organization and thread management, but they’re not easy to use and can easily create hard-to-debug code. As threads run independently, they can succeed and fail independently, too.

Unstructured Concurrency
Unstructured Concurrency

However, this unrestricted/unstructured approach makes it versatile and highly flexible but also breeds complexity. To untangle threads running wild, some form of “structure” is needed.

Structured Concurrency to the Rescue

Introduced in Java 19 as an incubator feature (JEP 428) with a second incubation round in Java 20 (JEP 437), Structured Concurrency was promoted to a preview feature in version 21. It gives us a new concept and API to create straightforward and easier-to-maintain concurrent code by treating tasks and their related sub-tasks as a single unit of work.

The new control structure java.util.concurrent.StructuredTaskScope organizes multiple tasks running on different threads as an atomic operation, giving us back control over error handling and cancellation, and simplifies dealing with concurrent code immensely.

Structured Concurrency
Structured Concurrency

It’s an addition to the concurrency tool set already available and is not supposed to be a replacement.


Working with StructuredTaskScope

To treat a group of related concurrent tasks as a single unit, we need to give them a well-defined scope. The key class to so is the newly added StructuredTaskScope in the java.util.concurrent package.

The general workflow is as follows:

  1. Create a new scope. The current thread is its owner. If we use a try-with-resources block, we don’t have to handle closing the scope when we’re finished.

  2. Use scope.fork(Callable) to create subtasks in the scope, as you would do with an ExecutorService and its submit(Callable) method. Each call creates a new (virtual) thread for the subtask and returns a StructuredTaskScope.Subtask<T> instance, which is also a Supplier<T>.

  3. Join the subtasks by calling join() to wait until either (a) all subtasks are completed (successfully or failed) or (b) the subtasks have been canceled via shutdown().

  4. Handle any subtask errors and process the results.

  5. Close the scope explicitly if it wasn’t created in a try-with-resources block.

StructuredTaskScope Workflow
StructuredTaskScope Workflow

In code form, it looks something like this:

java
TrafficResponse retrieveData() throws ExecutionException, InterruptedException {

  // CREATE A NEW SCOPE
  try (var scope = new StructuredTaskScope.ShutdownOnFailure) {

      // FORK RELATED SUBTASKS
      Supplier<TrafficData> highwaySubtask = scope.fork(this::retrieveHighwayData);
      Supplier<TrafficData> localSubtask = scope.fork(this::retrieveLocalData);

      // JOIN ALL TASKS IN THE SCOPE
      scope.join()
           .throwIfFailed();  // AND PROPAGATE ERRORS

      // GET THE RESULTS
      return new TrafficResponse(highwaySubtask.get(),
                                 localTraffic.get());
    }
}

Let’s dissect the example and the general workflow a little further.

Scope Policies

To create a scope, we need to create a new StructuredTaskScope<T> instance, either by using it directly or one of its subclasses:

  • StructuredTaskScope<T>()
  • StructuredTaskScope.ShutdownOnSuccess()
  • StructuredTaskScope.ShutdownOnFailure()

The two subclasses implement a policy on how subtask completion affects the scope.

StructuredTaskScope.ShutdownOnSuccess captures the first successfully completed subtask result and shuts down the scope afterward. This will interrupt any unfinished threads and wake up the scope’s owner. Choose this policy if you only need the result of a singular subtask (“invoke any”).

StructuredTaskScope.ShutdownOnFailure is the opposite, as it shuts down the scope on the first failed subtask. If you require all results (“invoke all”), this policy ensures that other subtasks get discarded if any of them fails.

Of course, we can create our own policy if needed by extending StructuredTaskScope<T> and overriding some of its methods, like handleComplete(Subtask). Here’s a simple scope that gathers successfully completed subtasks in order:

java
public class OrderedSuccessfulScope<T> extends StructuredTaskScope<T> {

  private final Queue<Subtask<? extends T>> subtasks = new LinkedTransferQueue<>();

  @Override
  protected void handleComplete(Subtask<? extends T> subtask) {
    if (subtask.state() != Subtask.State.SUCCESS) {
      return;
    }

    subtasks.add(subtask);
  }

  // ADDITONAL FUNCTIONALITY PROVIDED BY THE CUSTOM SCOPE
  public Stream<Subtask<? extends T>> completedSuccessfully() {
    super.ensureOwnerAndJoined();
    return subtasks.stream();
  }
}

The super.ensureOwnerAndJoined() call ensures that all forked tasks are joined, and the current thread is the owner of the scope, or it throws an Exception.

As you can see, there’s no “magic” or ultra-complex code behind the scenes, and creating a custom policy is quite straightforward.

Tree Structure

Scopes can be nested, which creates a tree-like relationship. A thread started in a scope can create a nested scope of its own, with a parent-child relationship between them.

This tree structure supports another preview feature introduced in Java 21, Scoped Values (JEP 446). Any ScopedValue is inherited across threads:

java
private static final ScopedValue<String> HOMETWON = ScopedValue.newInstance();

ScopedValue.runWhere(USERNAME, "Heidelberg", () -> {
  try (var scope = new StructuredTaskScope<String>()) {
    scope.fork(() -> doStuff());
    // ...
  }
});

String doStuff() {
  String name = USERNAME.get();
  // => "Heidelberg"
}

The parent-child relationship in the tree structure is also required for confinement checks. Threads are contained in their respective task scopes and are, therefore, easier to observe and manage.


Structured Vs. Unstructured Concurrency

Now that we’ve looked at the new API, it’s time to compare it to the already available tools to manage concurrent task.

ExecutorService and thread pools are high-level APIs to create and manage threads. Depending on your requirements, its unrestrictedness can be a curse or a blessing.

The CompletableFuture API gives us an easier way to handle asynchronous computation, including combining tasks and error handling. It gives concurrency a certain structure but doesn’t inherently enforce “Structured Concurrency”. However, asynchronous code has the major drawback of rewriting your code to adapt to the overall paradigm, resulting often in hard-to-understand complex code that’s difficult to debug.

The new Structured Concurrency API, on the other hand, enforces a parent-child relationship between tasks, creating an effective and easier-to-manage scope for tasks and their related subtasks. This reduces resource leaks, as all resources are cleaned up when the whole hierarchy is completed. The structure approach also simplifies error handling and propagation.


Enabling the Structure Concurrency Preview

As the feature is still in preview, the API is disabled by default. If you want to check it out, you have to enable it explicitly:

shell
# COMPILING & RUNNING (seperate steps)
javac --release 21 --enable-preview Main.java
java --enable-preview Main

# COMPILING & RUNNING (using the sorce code launcher)
java --source 21 --enable-preview Main.java

# JSHELL
jshell --enable-preview

Conclusion

The new API surrounding StructuredTaskScope, light-weight virtual threads and scoped values make it easier for us to write straightforward, safer, maintainable, and more efficient concurrent code. Many of the “usual” pitfalls and common risks are taken care of or are more manageable, like better error handling. Subtask policies provide a versatile and reusable way to deal with success and failure. Resources are utilized better, reducing leaks and orphaned threads.

Project Loom, in general, will definitely have a lasting effect on how we deal with concurrent Java code. Having more lightweight threads that are easier and safer to manage in well-defined scopes might even trigger a paradigm shift! However, it will also take some time for the ecosystem to adapt to and fully utilize these new possibilities, especially since only Virtual Threads are out of preview at the time of writing.

I’m really looking forward to Structured Concurrency as an addition to the available concurrency models, not a replacement. Java is evolving faster than ever, and the often perceived “gap” to other “more modern” languages is shrinking with each release.


A Functional Approach to Java Cover Image
Interested in using functional concepts and techniques in your Java code?
Check out my book!
Available in English, Polish, Korean, and soon, Chinese.

Resources

Looking at Java 21