Structured Concurrency wurde – zusammen mit virtuellen Threads und Scoped Values – in Project Loom entwickelt. Structured Concurrency ist seit Java 19 als Incubator-Feature (JEP 428) und seit Java 21 als Preview Feature (JEP 453) im JDK enthalten.
In diesem Artikel erfährst du:
- Warum benötigen wir Structured Concurrency?
- Was ist Structured Concurrency?
- Wie funktioniert
StructuredTaskScope
? - Welche StructuredTaskScope-Policies gibt es, und wie können wir solch eine Policy selbst schreiben?
- Was ist der Vorteil von Structured Concurrency?
Eine begleitende Demo-Anwendung findest du in diesem GitHub-Repository.
Schauen wir uns zuerst einmal an, wie wir nebenläufige Teilaufgaben bisher implementiert haben.
Warum benötigen wir Structured Concurrency?
Wenn eine Aufgabe aus verschiedenen – vor allem blockierenden – Teilaufgaben besteht, die nebenläufig erledigt werden können (z. B. Zugriff auf Daten aus einer Datenbank oder Aufruf einer Remote API), so konnten wir hierfür bisher das Java-Executor-Framework einsetzen.
Das könnte dann z. B. so aussehen (Klasse InvoiceGenerator3_ThreadPool in der Demo-Anwendung):
Invoice createInvoice(int orderId, int customerId, String language)
throws InterruptedException, ExecutionException {
Future<Order> orderFuture =
executor.submit(() -> orderService.getOrder(orderId));
Future<Customer> customerFuture =
executor.submit(() -> customerService.getCustomer(customerId));
Future<InvoiceTemplate> invoiceTemplateFuture =
executor.submit(() -> invoiceTemplateService.getTemplate(language));
Order order = orderFuture.get();
Customer customer = customerFuture.get();
InvoiceTemplate invoiceTemplate = invoiceTemplateFuture.get();
return Invoice.generate(order, customer, invoiceTemplate);
}
Code-Sprache: Java (java)
Wir übergeben die drei Teilaufgaben an den Executor und warten auf die Teilergebnisse. Der Happy Path ist schnell implementiert. Aber wie behandeln wir Ausnahmen?
- Wenn in einem Subtask ein Fehler auftritt – wie können wir dann die anderen abbrechen? Wenn im Beispiel oben
loadOrderFromOrderService(...)
fehlschlägt, dann wirftorderFuture.get()
eine Exception, diecreateInvoice(...)
-Methode endet, und wir haben evtl. zwei noch weiterlaufende Threads. - Wie können wir die Subtasks abbrechen, wenn der Parent Task („Erstelle eine Rechung”) abgebrochen wird – oder wenn die komplette Anwendung heruntergefahren wird?
- Wie können wir – in einem alternativen Use Case – verbleibende Subtasks abbrechen, wenn lediglich das Ergebnis eines einzigen Subtasks benötigt wird?
Alles ist machbar, erfordert aber äußerst komplexen, schwer wartbaren Code (im GitHub-Repository findest du zwei Beispiele dafür: InvoiceGenerator2b_CompletableFutureCancelling und InvoiceGenerator4b_NewVirtualThreadPerTaskCancelling).
Und was, wenn wir Code dieser Art debuggen möchten? Ein Thread-Dump z. B. würde uns haufenweise Threads mit dem Namen „pool-X-thread-Y” liefern – wir wüssten aber nicht, welcher Pool-Thread zu welchem aufrufenden Thread gehört, da sich alle aufrufenden Threads den Thread-Pool des Executors teilen.
Was ist Unstructured Concurrency?
„Unstructured Concurrency” bedeutet, dass unsere Tasks in einem Netz von Threads ablaufen, deren Start und Ende im Code schwer erkennbar ist. Eine saubere Fehlerbehandlung ist meist nicht vorhanden, und oft kommt es zu verwaisten Threads, wenn eine Kontrollstruktur (im Beispiel oben: die createInvoice(...)
-Methode) endet:
Was ist Structured Concurrency?
Die in Java 19 als Incubator und Java 21 als Preview eingeführte „Structured Concurrency” ist ein Konzept, das die Implementierung, Lesbarkeit und Wartbarkeit von Code für die Aufteilung einer Aufgabe in Teilaufgaben und deren nebenläufige Abarbeitung erheblich verbessert.
Dazu führt sie mit der Klasse StructuredTaskScope
eine Kontrollstruktur ein, die
- einen klaren Scope definiert, an dessen Anfang die Threads der Teilaufgaben starten und an dessen Ende die Threads der Teilaufgaben enden,
- die eine saubere Fehlerbehandlung ermöglicht
- und die einen sauberen Abbruch von Teilaufgaben erlaubt, deren Ergebnisse nicht mehr benötigten werden.
Was das genau bedeutet, zeige ich dir in den folgenden Abschnitten an mehreren Beispielen.
StructuredTaskScope Beispiel
Structured Concurrency wird mit der Klasse StructuredTaskScope
implementiert. Mit dieser Klasse können wir das Beispiel wie folgt umschreiben (Klasse InvoiceGenerator5_StructuredTaskScope.java in der Demo-Anwendung):
Invoice createInvoice(int orderId, int customerId, String language)
throws InterruptedException {
try (var scope = new StructuredTaskScope<>()) {
Subtask<Order> orderSubtask =
scope.fork(() -> orderService.getOrder(orderId));
Subtask<Customer> customerSubtask =
scope.fork(() -> customerService.getCustomer(customerId));
Subtask<InvoiceTemplate> invoiceTemplateSubtask =
scope.fork(() -> invoiceTemplateService.getTemplate(language));
scope.join();
Order order = orderSubtask.get();
Customer customer = customerSubtask.get();
InvoiceTemplate template = invoiceTemplateSubtask.get();
return Invoice.generate(order, customer, template);
}
}
Code-Sprache: Java (java)
Wir ersetzen also den im Scope der Klasse liegenden ExecutorService
durch einen im Scope der Methode liegenden StructuredTaskScope
– und executor.submit()
durch scope.fork()
.
Mit scope.join()
warten wir darauf, dass alle Tasks erledigt sind. Das Risiko verwaister Tasks besteht damit nicht mehr.
Danach können wir über Subtask.get()
die Ergebnisse der drei Tasks auslesen. Sollte es in einem der Tasks zu einer Exception gekommen sein, wirft Subtask.get()
eine IllegalStateException
. Besser ist es daher mit state()
den Status eines Subtasks abzufragen, bevor wir get()
aufrufen:
Order order;
if (orderSubtask.state() == Subtask.State.SUCCESS) {
order = orderSubtask.get();
} else {
// Handle error
}
Code-Sprache: Java (java)
Falls du das Beispiel selbst ausprobieren möchtest: Preview-Features müssen explizit freigeschaltet werden, in diesem Fall mit --enable-preview --source 21
. Eine genaue Anleitung findest du in der README der Demo-Anwendung.
StructuredTaskScope Policies
Im Beispiel oben haben wir mit StructuredTaskScope.join()
darauf gewartet, dass alle Tasks abgeschlossen sind. Wenn aber in einem der Tasks eine Exception auftritt, können wir mit den Ergebnissen der anderen zwei Tasks nichts anfangen – warum also auf sie warten?
Hier kommen die sogenannten „Policies” ins Spiel, mit denen wir unter anderem festlegen können, wann ein Scope beendet wird. StructuredTaskScope
definiert zwei solcher Policies; wir können aber auch eigene implementieren (dazu später mehr).
„Shutdown on Failure”-Policy
Mit der „Shutdown on Failure”-Policy können wir festlegen, dass das Auftreten einer Exception in einem Task dazu führt, dass alle anderen Tasks abgebrochen werden.
Die „Shutdown on Failure”-Policy können wir wie folgt einsetzen (du findest den Code in der Klasse InvoiceGenerator6_ShutdownOnFailure im GitHub-Repo):
Invoice createInvoice(int orderId, int customerId, String language)
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Subtask<Order> orderSubtask =
scope.fork(() -> orderService.getOrder(orderId));
Subtask<Customer> customerSubtask =
scope.fork(() -> customerService.getCustomer(customerId));
Subtask<InvoiceTemplate> invoiceTemplateSubtask =
scope.fork(() -> invoiceTemplateService.getTemplate(language));
scope.join();
scope.throwIfFailed();
Order order = orderSubtask.get();
Customer customer = customerSubtask.get();
InvoiceTemplate template = invoiceTemplateSubtask.get();
return Invoice.generate(order, customer, template);
}
}
Code-Sprache: Java (java)
Verglichen mit dem vorherigen Beispiel musste ich nur zwei Dinge ändern:
- Ich habe in der dritten Zeile
new StructuredTaskScope<>()
durchnew StructuredTaskScope.ShutdownOnFailure()
ersetzt. - Ich habe nach
scope.join()
das Kommandoscope.throwIfFailed()
eingefügt.
Sollte nun in einem der drei Tasks eine Exception auftreten, werden alle anderen Subtasks sofort abgebrochen, scope.join()
kehrt zurück und scope.throwIfFailed()
wirft die Exception des fehlgeschlagenen Subtasks, eingebettet in eine ExecutionException
.
Im Beispielcode werfen die drei Subtasks mit einer gewissen Wahrscheinlichkeit eine Exception. Wenn du das Programm ein paar mal startest, wirst du sehen, wie eine Exception in einem Task zu einer Interruption in den anderen Tasks und einer Beendigung des Programms führt:
$ java -cp target/classes --enable-preview eu.happycoders.structuredconcurrency/demo1_invoice/InvoiceGenerator6_ShutdownOnFailure
[Thread[#1,main,5,main]] Forking tasks
[Thread[#1,main,5,main]] Waiting for all tasks to finish or one to fail
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-2] Loading customer
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3] Loading order
[VirtualThread[#35]/runnable@ForkJoinPool-1-worker-1] Loading template
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-1] Finished loading customer
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-2] Error loading order
[VirtualThread[#35]/runnable@ForkJoinPool-1-worker-1] Template loading was interrupted
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error loading order
[...]
Code-Sprache: Klartext (plaintext)
An dieser Ausgabe siehst du übrigens auch, dass alle Tasks in virtuellen Threads ausgeführt werden.
„Shutdown on Success”-Policy
Alternativ kann durch new StructuredTaskScope.ShutdownOnSuccess()
ein Scope mit der „Shutdown on Success”-Policy erzeugt werden. Bei dieser Policy wird der Scope beendet, sobald ein Subtask erfolgreich war. Die anderen Subtasks werden abgebrochen, und die Methode scope.result()
liefert das Ergebnis des erfolgreichen Subtasks zurück.
Hier ein Beispiel dazu – mit einem anderen Use Case: Wir wollen eine Kundenadresse über mehrere externe APIs gleichzeitig verifizieren und wollen nur das erste Ergebnis verwenden (du findest den Code in der Klasse AddressVerification2_ShutdownOnSuccess im GitHub-Repo):
AddressVerificationResponse verifyAddress(Address address)
throws InterruptedException, ExecutionException {
try (var scope = new ShutdownOnSuccess<AddressVerificationResponse>()) {
scope.fork(() -> verificationService.verifyViaServiceA(address));
scope.fork(() -> verificationService.verifyViaServiceB(address));
scope.fork(() -> verificationService.verifyViaServiceC(address));
scope.join();
return scope.result();
}
}
Code-Sprache: Java (java)
Sollten wider Erwarten alle drei Aufrufe der verifyViaServiceX()
-Methoden eine Exception geworfen haben, wirft scope.result()
die erste davon, eingebettet in eine ExecutionException
.
Wenn du den Beispielcode ausführst, siehst du, wie der erste erfolgreiche Subtask zu einem Ergebnis führt und die anderen Tasks abgebrochen werden:
$ java -cp target/classes --enable-preview eu.happycoders.structuredconcurrency/demo2_address/AddressVerification2_ShutdownOnSuccess
[Thread[#1,main,5,main]] Forking tasks
[Thread[#1,main,5,main]] Waiting for one task to finish
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-2] Verifying address via service B
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3] Verifying address via service A
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1] Verifying address via service C
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-1] Finished loading address via service C
[Thread[#1,main,5,main]] Retrieving result
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-3] Verifying address via service B was interrupted
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-2] Verifying address via service A was interrupted
Code-Sprache: Klartext (plaintext)
Beachte, dass die result()
-Methode nur bei ShutdownOnSuccess
verfügbar ist und die throwIfFailed()
-Methode nur bei ShutdownOnFailure
.
Benutzerdefinierte StructuredTaskScope-Policy
Sollte keine der beiden Standard-Policies für deinen Einsatzzweck geeignet sein, kannst du mit relativ geringem Aufwand eine eigene Policy schreiben.
Nehmen wir an, wir wollen die Verfügbarkeit eines Produkts bei mehreren Lieferanten prüfen, und wir wollen nicht das erste Ergebnis verwenden, sondern das mit der schnellsten Verfügbarkeit. Gleichzeitig wollen wir fehlgeschlagene Anfragen nur dann propagieren, wenn die Anfragen bei allen Lieferanten fehlgeschlagen sind.
Das lässt sich überraschend einfach – und zugleich für andere Einsatzszenarien wiederverwendbar – realisieren. Hier zunächst die Policy (Klasse BestResultScope im GitHub-Repo). Diese nimmt als Konstruktor-Parameter einen Comparator
entgegen, über den wir später definieren werden, dass wir als best result – also als bestes Ergebnis – die schnellste Verfügbarkeit erwarten.
BestResultScope
erweitert zudem die Klasse StructuredTaskScope
, überschreibt deren handleComplete(…)
-Methode und fügt eine resultOrElseThrow()
-Methode hinzu:
public class BestResultScope<T> extends StructuredTaskScope<T> {
private final Comparator<T> comparator;
private T bestResult;
private final List<Throwable> exceptions =
Collections.synchronizedList(new ArrayList<>());
public BestResultScope(Comparator<T> comparator) {
this.comparator = comparator;
}
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
switch (subtask.state()) {
case UNAVAILABLE -> {
// Ignore
}
case SUCCESS -> {
T result = subtask.get();
synchronized (this) {
if (bestResult == null || comparator.compare(result, bestResult) > 0) {
bestResult = result;
}
}
}
case FAILED -> exceptions.add(subtask.exception());
}
}
public <X extends Throwable> T resultOrElseThrow(
Supplier<? extends X> exceptionSupplier) throws X {
ensureOwnerAndJoined();
if (bestResult != null) {
return bestResult;
} else {
X exception = exceptionSupplier.get();
exceptions.forEach(exception::addSuppressed);
throw exception;
}
}
}
Code-Sprache: Java (java)
Die handleComplete(…)
-Methode wird für jeden beendeten Subtask aufgerufen – sowohl für erfolgreiche als auch für solche, die eine Exception geworfen haben. Welcher Fall eingetreten ist, prüfen wir mit subtask.state()
.
Im Erfolgsfall holen wir mit subtask.get()
das Resultat und schreiben dieses – sofern es besser ist als das bisher beste – threadsicher in das Feld bestResult
.
Im Fall einer Exception sammeln wir diese threadsicher in einer Liste.
Die resultOrElseThrow()
-Methode stellt zunächst durch den Aufruf von ensureOwnerAndJoined()
sicher, dass sie aus demjenigen Thread aufgerufen wurde, der den StructuredTaskScope
erzeugt hat, und dass dieser Thread zuvor join()
oder joinUntil(…)
aufgerufen hat.
Daraufhin prüft resultOrElseThrow()
, ob ein erfolgreiches Ergebnis vorliegt, und gibt dieses zurück. Andernfalls wirft es die spezifizierte Exception, an die es die gesammelten Exceptions als „suppressed exceptions” anhängt.
Die benutzerdefinierte Policy können wir wie folgt einsetzen (Klasse SupplierDeliveryTimeCheck2_StructuredTaskScope im GitHub-Repo):
SupplierDeliveryTime getSupplierDeliveryTime(String productId, List<String> supplierIds)
throws SupplierDeliveryTimeCheckException, InterruptedException {
try (var scope =
new BestResultScope<>(
Comparator.comparing(SupplierDeliveryTime::deliveryTimeHours).reversed())) {
for (String supplierId : supplierIds) {
scope.fork(() -> service.getDeliveryTime(productId, supplierId));
}
scope.join();
return scope.result();
}
}
Code-Sprache: Java (java)
Die Ausgabe des Beispielprogramms könnte z. B. so aussehen:
$ java -cp target/classes --enable-preview eu.happycoders.structuredconcurrency/demo3_suppliers/SupplierDeliveryTimeCheck2_StructuredTaskScope
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-2] Retrieving delivery time from supplier B
[VirtualThread[#33]/runnable@ForkJoinPool-1-worker-4] Retrieving delivery time from supplier D
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-3] Retrieving delivery time from supplier E
[VirtualThread[#32]/runnable@ForkJoinPool-1-worker-5] Retrieving delivery time from supplier C
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-3] Retrieving delivery time from supplier A
[VirtualThread[#31]/runnable@ForkJoinPool-1-worker-3] Error retrieving delivery time from supplier B
[VirtualThread[#29]/runnable@ForkJoinPool-1-worker-5] Finished retrieving delivery time from supplier A: 110 hours
[VirtualThread[#32]/runnable@ForkJoinPool-1-worker-3] Finished retrieving delivery time from supplier C: 104 hours
[VirtualThread[#34]/runnable@ForkJoinPool-1-worker-3] Error retrieving delivery time from supplier E
[VirtualThread[#33]/runnable@ForkJoinPool-1-worker-3] Finished retrieving delivery time from supplier D: 51 hours
[Thread[#1,main,5,main]] Response: SupplierDeliveryTime[supplier=D, deliveryTimeHours=51]
Code-Sprache: Klartext (plaintext)
Es ist schön zu sehen, wie zwar der Aufruf für die Lieferanten B und E fehlerhaft war, die restlichen Lieferanten aber Ergebnisse geliefert haben und schließlich das beste Ergebnis – Lieferant D mit 51 Stunden Lieferzeit – zurückgeliefert wird.
Verschachtelte StructuredTaskScopes
Falls wir nicht nur die Lieferanten für ein Produkt gleichzeitig abfragen möchten, sondern die Lieferanten für mehrere Produkte, so können wir das ganz einfach z. B. mit folgender Methode lösen (Klasse SupplierDeliveryTimeCheck3_NestedStructuredTaskScope im GitHub-Repo):
List<SupplierDeliveryTime> getSupplierDeliveryTimes(
List<String> productIds, List<String> supplierIds) throws InterruptedException {
try (var scope = new StructuredTaskScope<SupplierDeliveryTime>()) {
List<Subtask<SupplierDeliveryTime>> subtasks =
productIds.stream()
.map(productId ->
scope.fork(() -> getSupplierDeliveryTime(productId, supplierIds)))
.toList();
scope.join();
return subtasks.stream()
.filter(subtask -> subtask.state() == State.SUCCESS)
.map(Subtask::get)
.toList();
}
}
Code-Sprache: Java (java)
Hier erzeugen wir einen StructuredTaskScope
– und innerhalb dieses Scopes forken wir Subtasks, die wiederum die im vorherigen Abschnitt gezeigte Methode getSupplierDeliveryTime(...)
aufrufen, welche damit also innerhalb des Scopes von getSupplierDeliveryTimes(...)
verschachtelte Scopes öffnen.
Die folgende Grafik zeigt diese Scopes als gestrichelte Linien:
Vorteile von Structured Concurrency
Structured Concurrency zeichnet sich durch klar im Code ersichtliche Start- und Endpunkte nebenläufiger Subtasks aus. Fehler in den Subtasks werden an den Eltern-Scope propagiert. Das macht den Code besser les- und wartbar und stellt sicher, dass zum Ende eines Scopes alle gestarteten Threads beendet sind.
Die folgende Grafik zeigt Unstructured und Structured Concurrency gegenübergestellt:
Vorteile von StructuredTaskScope
Mit StructuredTaskScope
haben wir ein Sprachkonstrukt für Structured Concurrency:
- Task und Subtasks bilden im Code eine abgeschlossene Einheit – es gibt keinen
ExecutorService
in einem höheren Scope, wie z. B. der Klasse. Die Threads kommen nicht aus einem Threadpool; stattdessen wird jeder Subtask in einem neuen virtuellen Thread ausgeführt. - Durch den mittels try-with-resources-Block aufgespannten Scope ergeben sich klare Start- und Endpunkte aller Threads.
- Am Ende des Scopes sind alle Threads beendet.
- Fehler innerhalb der Subtasks werden sauber an den Eltern-Scope propagiert.
- Je nach Policy werden die verbleibenden Subtasks abgebrochen, wenn ein Subtasks erfolgreich war oder wenn in einem Subtask ein Fehler auftrat.
- Wenn der aufrufende Thread abgebrochen wird, werden auch die Subtasks abgebrochen.
Darüber hinaus hilft StructuredTaskScope
beim Debuggen: Wenn wir einen Thread-Dump im neuen JSON-Format ausgeben (jcmd <pid> Thread.dump_to_file -format=json <file>
), dann ist darin die Aufrufhierarchie zwischen Eltern- und Kind-Threads ersichtlich.
StructuredTaskScope und Scoped Values
Die in Java 20 als Incubator und ebenfalls in Java 21 als Preview eingeführten Scoped Values werden beim Einsatz von StructuredTaskScope
innerhalb eines Scopes automatisch an alle durch StructuredTaskScope.fork(...)
erzeugten Kind-Threads weitervererbt.
Wie das genau funktioniert, zeige ich dir an folgendem Code-Beispiel (Klasse SupplierDeliveryTimeCheck4_NestedStructuredTaskScopeUsingScopedValue in der Demo-Anwendung). Wir erzeugen ein ScopedValue
– im Beispiel für einen API-Key, binden dieses an den API-Key und rufen dann die im Abschnitt „Verschachtelte StructuredTaskScopes” gezeigte Methode getSupplierDeliveryTimes(...)
innerhalb des Scopes per call()
auf:
public static final ScopedValue<String> API_KEY = ScopedValue.newInstance();
List<SupplierDeliveryTime> getSupplierDeliveryTimes(
List<String> productIds, List<String> supplierIds, String apiKey) throws Exception {
return ScopedValue.where(API_KEY, apiKey)
.call(() -> getSupplierDeliveryTimes(productIds, supplierIds));
}
Code-Sprache: Java (java)
Durch die Vererbung des Scoped Values API_KEY
kann auf diesen auch innerhalb der SupplierDeliveryTimeService.getDeliveryTime(...)
-Methode zugegriffen werden ohne ihn über Methodenargumente an diese Methode durchschleifen zu müssen – und das selbst dann, wenn die Methoden nicht in demjenigen Thread ausgeführt werden, der ScopedValue.where(...)
aufruft, sondern eben in den durch StructuredTaskScope.fork(...)
erzeugten Kind- bzw. in diesem Beispiel sogar Enkel-Threads.
Fazit
Strukturierte Concurrency wird – aufbauend auf virtuelle Threads – die Verwaltung von Tasks, die in nebenläufige Subtasks aufgeteilt werden, deutlich vereinfachen. Policies erlauben uns das Verhalten von StructuredTaskScope
zu beeinflussen, z. B. um alle Tasks abzubrechen, sollte einer fehlgeschlagen sein.
Bitte beachte, dass sich Structured Concurrency zum Stand von Java 24 noch im Preview-Stadium befindet und somit noch kleineren Änderungen unterliegen kann.
Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.