stream gatherersstream gatherers
HappyCoders Glasses

Stream Gatherers Schreibe deine eigenen Stream-Operationen!

Sven Woltmann
Sven Woltmann
Aktualisiert: 4. Dezember 2024

Die Java-Stream-API wurde mit Java 8 im März 2014 veröffentlicht und hat uns ein grundlegend neues Werkzeug an die Hand gegeben, um Datenströme zu verarbeiten.

Allerdings führt der begrenzte Satz an intermediären Operationen – filter, map, flatMap, mapMulti, distinct, sorted, peak, limit, skip, takeWhile und dropWhile – dazu, dass komplexere Datentransformationen durch die Stream-API nicht ausgedrückt werden können.

Es fehlen z. B. verbreitete intermediäre Operationen wie window und fold und zahlreiche mehr, wenn man sich die Feature-Requests der Java-Community ansieht.

Anstatt nun all diese Operationen im Stream-Interface zu implementieren, entschieden die JDK-Developer eine API zu entwickeln, die zum einen im JDK selbst genutzt werden kann, um heiß begehrte intermediäre Operationen zu implementieren, und die zum anderen von Entwicklern dazu eingesetzt werden kann, um eigene Operationen zu entwickeln.

Diese neue API heißt „Stream Gatherers“ und wurde durch JDK Enhancement Proposal 461 in Java 22 im März 2024, also genau zehn Jahre nach der Einführung der Stream-API, als Preview-Feature veröffentlicht und wird durch JEP 485 in Java 24 finalisiert.

In diesem Artikel erfährst du,

  • was ein Gatherer ist,
  • wie die neue Stream-Gatherers-API funktioniert,
  • wie du mit ihr beliebige intermediäre Stream-Operationen implementierst,
  • welche Gatherer das JDK-Team bereits implementiert hat und wie du diese Gatherer erzeugst.

Beginnen wir mit einer kurzen Zusammenfassung, wie die Stream-API überhaupt funktioniert.

Stufen der Stream-API

Java-Streams bestehen aus drei Stufen:

  1. Stream-Quelle – diese erzeugt einen Stream, z. B. durch IntStream.of(...) oder Collection.stream().
  2. Intermediäre Operationen – diese transformieren die im Stream enthaltenen Elemente, z. B. die Stream-Methoden map(...), filter(...) und limit(...).
  3. Terminale Operationen – diese sammeln z. B. durch toList() die Elemente in einer Liste, durch collect(Collectors.toMap(...)) in einer Map oder zählen die Elemente mit count().

Hier ist ein einfaches Beispiel – eine Methode, die zählt, wie viele Wörter einer bestimmten Länge in einer Liste von Wörtern enthalten sind:

public long countLongWords(List<String> words, int minLength) {
  return words.stream()                       // ⟵ Source
      .map(String::length)                    // ⟵ Intermediate operation
      .filter(length -> length >= minLength)  // ⟵ Intermediate operation
      .count();                               // ⟵ Terminal operation
}Code-Sprache: Java (java)

Terminale Operation: Stream Collector

Und hier ein Beispiel, das die Wörter in Großbuchstaben umwandelt und nach Länge gruppiert in einer Map speichert:

public Map<Integer, List<String>> groupByLength(List<String> words) {
  return words.stream()                                 // ⟵ Source
      .map(String::toUpperCase)                         // ⟵ Intermediate operation
      .collect(Collectors.groupingBy(String::length));  // ⟵ Terminal operation
}Code-Sprache: Java (java)

In diesem zweiten Beispiel wird der terminalen Operation collect(...) ein sogenannter „Collector“ übergeben. Ein Collector ist ein Objekt einer Klasse, die das Collector-Interface implementiert und definiert, was am Ende des Streams mit den Elementen des Streams passieren soll. In diesem Fall sollen sie nach Länge gruppiert in eine Map gespeichert werden.

Intermediäre Operation: Stream Gatherer

Analog dazu definiert die Stream-Gatherers-API die Methode Stream.gather(...) sowie ein Gatherer-Interface. Das folgende Code-Beispiel verwendet die intermediäre Operation „Fixed Window“, welche die Wörter in Listen zu jeweils drei Wörtern gruppiert:

public List<List<String>> groupsOfThree(List<String> words) {
  return words.stream()                  // ⟵ Source
      .gather(Gatherers.windowFixed(3))  // ⟵ Intermediate operation
      .toList();                         // ⟵ Terminal operation
}Code-Sprache: Java (java)

Wenn wir diese Methode z. B. wie folgt aufrufen:

List<String> words = List.of("the", "be", "two", "of", "and", "a", "in", "that");
List<List<String>> groups = groupsOfThree(words);
System.out.println(groups);Code-Sprache: Java (java)

Dann ist die Ausgabe:

[[the, be, two], [of, and, a], [in, that]]Code-Sprache: Klartext (plaintext)

(Da die Stream-Quelle eine Anzahl an Elementen geliefert hat, die nicht ohne Rest durch drei teilbar ist, enthält die letzte Gruppe nur zwei Wörter.)

Wie genau ein Stream-Gatherer aufgebaut ist und wie er funktioniert, erfährst du im nächsten Kapitel.

Aufbau eines Stream Gatherers

Bevor wir uns den Aufbau eines Stream-Gatherers ansehen, ist es wichtig, zwei Eigenschaften eines Gatherers zu kennen:

  • Sie können einen Status haben, so dass Elemente unterschiedlich transformiert werden können, je nachdem, was vorher passiert ist (wofür das relevant ist, zeige ich dir gleich an einem Beispiel).
  • Sie können den Stream vorzeitig terminieren, wie es z. B. limit(...) und takeWhile(...) tun.

Gatherer werden aus bis zu vier Komponenten aufgebaut:

  • Einem optionalen Initializer“, der den eben erwähnten Status initialisiert.
  • Einem „Integrator“, der jedes Element des Streams verarbeitet (ggf. unter Berücksichtigung des aktuellen Status), ggf. den Status aktualisiert, Elemente an die nächste Stufe der Stream-Pipeline weitergibt und ggf. den Stream vorzeitig beendet.
  • Einem optionalen „Finisher“, der nach der Verarbeitung des letzten Elements aufgerufen wird, um ggf. anhand des Status weitere Elemente an die nächste Stufe der Stream-Pipeline zu emittieren.
  • Und einem optionalen Combiner“, der bei der parallelen Verarbeitung eines Streams eingesetzt wird, um die Status von parallel ausgeführten Transformationen zu kombinieren.

In den folgenden Abschnitten schauen wir uns die Komponenten nach und nach und mit vielen Beispielen an.

Integrator

Der Integrator ist die einzige Komponente, die zwingend erforderlich ist. Mit ausschließlich einem Integrator können wir bereits einen einfachen, statuslosen Gatherer entwickeln.

Im Folgenden zeige ich dir, wie sich die Stream.map(...)-Funktion mit einem Gatherer implementieren lässt.

Die gezeigten Interfaces haben zusätzliche statische oder Default-Methoden, die für ein grundlegendes Verständnis nicht wichtig sind. Ich lasse sie daher weg und schreibe anstelle der ausgelassenen Methoden drei Punkte.

Integrator ist ein funktionales Interface mit einer integrate(...)-Methode:

@FunctionalInterface
public interface Integrator<A, T, R> {
  boolean integrate(A state, T element, Downstream<? super R> downstream);
  . . .
}Code-Sprache: Java (java)

Downstream ist ein funktionales Interface mit einer push(...)-Methode, die ein Element an die nächste Stufe der Stream-Pipeline weiterleitet:

@FunctionalInterface
public interface Downstream<T> {
  boolean push(T element);
  . . .
}Code-Sprache: Java (java)

Einen Integrator, der eine Mapping-Funktion aufruft und das Ergebnis der Mapping-Funktion an den Downstream emittiert (also an die nächste Verarbeitungsstufe des Streams weiterleitet), können wir als Lambda-Funktion wie folgt schreiben. Da wir hier keinen Zustand benötigen, verwenden wir Void als Typ für die state-Variable:

Function<T, R> mapper = . . .

Integrator<Void, T, R> integrator = 
    (state, element, downstream) -> {
      R mappedElement = mapper.apply(element);
      return downstream.push(mappedElement);
    };Code-Sprache: Java (java)

In der ersten Zeile des Integrators wird die Mapping-Funktion auf das vom Upstream eingehende Element angewendet. In der zweiten Zeile wird das Ergebnis-Element der Mapping-Funktion an den Downstream emittiert und die Antwort des Downstreams zurückgegeben. Diese Antwort ist ein boolean, das anzeigt, ob der Downstream weitere Elemente akzeptiert. Die Antwort wäre z. B. false, wenn in der Pipeline ein limit() folgen würde, dessen Maximum erreicht ist.

Da der oben gezeigte Integrator niemals von sich aus false zurückliefert, sondern nur dann, wenn dies vom Downstream ausgeht, wird der Integrator als „greedy“ (deutsch: „gierig“) bezeichnet. Um der Stream-Pipeline dies anzuzeigen und ihr damit Optimierungen zu ermöglichen, sollten wir den Integrator daher einmal mit Integrator.ofGreedy(...) wrappen:

Integrator<Void, T, R> integrator =
    Integrator.ofGreedy(
        (state, element, downstream) -> {
          R mappedElement = mapper.apply(element);
          return downstream.push(mappedElement);
        });
Code-Sprache: Java (java)

Um aus dem Integrator schließlich einen Gatherer zu machen, verwenden wir die statische Gatherer.of(...)-Methode:

Gatherer<T, Void, R> gatherer = Gatherer.of(integrator);Code-Sprache: Java (java)

Hier ist ein vollständiges Beispiel mit einer Methode, die einen Gatherer für eine bestimmte Mapping-Funktion erzeugt und einer Methode, die solch einen Gatherer nutzt, um eine Liste von Strings auf deren Längen zu mappen:

public <T, R> Gatherer<T, Void, R> mapping(Function<T, R> mapper) {
  return Gatherer.of(
      Integrator.ofGreedy(
          (state, element, downstream) -> {
            R mappedElement = mapper.apply(element);
            return downstream.push(mappedElement);
          }));
}

public List<Integer> toLengths(List<String> words) {
  return words.stream()
      .gather(mapping(String::length))
      .toList();
}Code-Sprache: Java (java)

Damit ist das grundlegende Konzept eines Gatherers erklärt.

Initializer

Der Gatherer des vorherigen Abschnitts war statuslos, d. h. die Transformation eines Elements war unabhängig von allem, was zuvor passiert ist.

In diesem Abschnitt zeige ich dir, wie sich die Stream.limit(...)-Funktion mit einem Gatherer implementieren lässt. Dazu muss der Gatherer die verarbeiteten Elemente zählen und den Stream nach Erreichen der gewünschten Anzahl von Elementen vorzeitig terminieren.

Der Initializer ist vom Typ Supplier und liefert den initialen Status. Da wir Elemente zählen wollen, eignet sich als Status ein AtomicInteger:

Supplier<AtomicInteger> initializer = AtomicInteger::new;Code-Sprache: Java (java)

Den limitierenden Integrator implementieren wir wie folgt:

int maxSize = . . .

Integrator<AtomicInteger, T, T> integrator = 
    (state, element, downstream) -> {
      if (state.get() < maxSize) {
        boolean result = downstream.push(element);
        state.incrementAndGet();
        return result;
      } else {
        return false;
      }
    };Code-Sprache: Java (java)

Solange unser Status, der Element-Zähler, kleiner ist als maxSize, emittieren wir die Stream-Elemente in den Downstream, erhöhen den Zähler um eins und geben das Antwort-Boolean des Downstreams zurück. Sobald die gewünschte Anzahl Elemente erreicht ist, geben wir false zurück und zeigen damit an, dass der Stream beendet werden soll.

Wir könnten übrigens auch in der if-Anweisung state.getAndIncrement() anstelle von state.get() schreiben und das state.incrementAndGet() weglassen. Doch das wäre etwas komplizierter zu erklären gewesen.

Beachte, dass wir diesen Integrator nicht mit Integrator.ofGreedy(...) wrappen, da dieser Integrator auch von sich aus false zurückliefern kann (also nicht nur, wenn dieser Wert vom Downstream kam).

Um aus dem Initializer und dem Integrator einen Gatherer zu machen, verwenden wir die Gatherer.ofSequential(...)-Methode. Der Name dieser Methode weist darauf hin, dass der zurückgelieferte Gatherer nicht parallel arbeiten kann. Das liegt daran, dass er einen Status hat, aber keinen Combiner (dazu später mehr).

Gatherer<T, AtomicInteger, T> gatherer = Gatherer.ofSequential(initializer, integrator);Code-Sprache: Java (java)

Das folgende Listing zeigt eine Methode, die einen limitierenden Gatherer aus den zuvor gezeigten Bausteinen erzeugt – dieses Mal mit state.getAndIncrement(), und eine Methode, die diesen Gatherer nutzt, um die ersten drei Wörter der Wort-Liste zurückzugeben:

public <T> Gatherer<T, AtomicInteger, T> limiting(int maxSize) {
  return Gatherer.ofSequential(
      // Initializer
      AtomicInteger::new,
 
      // Integrator
      (state, element, downstream) -> {
        if (state.getAndIncrement() < maxSize) {
          return downstream.push(element);
        } else {
          return false;
        }
      });
}

public List<String> firstThreeWords(List<String> words) {
  return words.stream()
      .gather(limiting(3))
      .toList();
}Code-Sprache: Java (java)

Stream Gatherers sind übrigens so mächtig, dass nicht nur map(...) und limit(...), sondern jede existierende intermediäre Operation der Stream-API auch als Gatherer implementiert werden kann.

Finisher

Um zu erklären, wofür wir einen Finisher benötigen, zeige ich dir in diesem Abschnitt, wie sich der im Einführungskapitel gezeigte „Fixed Window“-Gatherer implementieren lässt.

Zur Erinnerung: Dieser soll aus einem Stream von Elementen einen Stream von Listen machen, die jeweils eine bestimmte Anzahl der Elemente enthalten.

Wir beginnen einfach mal mit der Implementierung. An einer Stelle werden wir feststellen, dass wir nicht weiterkommen – und genau dann werde ich den Finisher einführen.

Zunächst einmal benötigt unserer Gatherer einen Status. Da wir Elemente in Listen gruppieren wollen, liegt es nahe, solch eine Liste als Status zu verwenden. Entsprechend implementieren wir den Initializer:

Supplier<List<T>> initializer = ArrayList::new;Code-Sprache: Java (java)

Den Integrator implementieren wir wie folgt:

Integrator<List<T>, T, List<T>> integrator =
    (state, element, downstream) -> {
      state.add(element);
      if (state.size() == windowSize) {
        boolean result = downstream.push(List.copyOf(state));
        state.clear();
        return result;
      } else {
        return true;
      }
    };Code-Sprache: Java (java)

Wir hängen das ankommende Element an die Status-Liste an. Sobald die Liste die gewünschte Größe erreicht hat, senden wir eine Kopie der Liste an den Downstream und leeren die Liste.

Das funktioniert allerdings nur, wenn die Anzahl der Elemente ein Vielfaches der Fenstergröße ist. Wenn wir beispielsweise acht Elemente haben und eine Fenstergröße von drei, dann würden für die ersten sechs Elemente zwei Listen zu je drei Elementen an den Downstream emittiert werden. Das siebte und achte Element würden ebenfalls in einer Liste liegen, doch da diese Liste die gewünschte Größe noch nicht erreicht hat, wurde sie vom Integrator nicht in den Downstream emittiert.

Genau hier kommt der Finisher ins Spiel. Der Finisher bekommt als Input den Status nach der Verarbeitung aller Stream-Elemente sowie den Downstream und kann dann in Abhängigkeit vom Status weitere Elemente in den Downstream emittieren.

Für die Fixed-Window-Operation würde der Finisher wie folgt aussehen:

BiConsumer<List<T>, Downstream<List<T>>> finisher =
    (state, downstream) -> {
      if (!state.isEmpty()) {
        downstream.push(List.copyOf(state));
      }
    };Code-Sprache: Java (java)

Falls die Liste Elemente enthält, wird sie in den Downstream emittiert.

Initializer, Integrator und Finisher kombinieren wir wie folgt zu einem Gatherer:

Gatherer<T, List<T>, List<T>> gatherer =
    Gatherer.ofSequential(initializer, integrator, finisher);Code-Sprache: Java (java)

Das folgende Listing zeigt eine Methode, die aus den zuvor gezeigten Komponenten einen Window-Gatherer erzeugt, sowie eine Methode, die diesen Gatherer nutzt:

public <T> Gatherer<T, List<T>, List<T>> windowing(int windowSize) {
  return Gatherer.ofSequential(
      // Initializer
      ArrayList::new,

      // Gatherer
      (state, element, downstream) -> {
        state.add(element);
        if (state.size() == windowSize) {
          boolean result = downstream.push(List.copyOf(state));
          state.clear();
          return result;
        } else {
          return true;
        }
      },

      // Finisher
      (state, downstream) -> {
        if (!state.isEmpty()) {
          downstream.push(List.copyOf(state));
        }
      });
}

public List<List<String>> groupWords(List<String> words, int groupSize) {
  return words.stream()
      .gather(windowing(groupSize))
      .toList();
}Code-Sprache: Java (java)

Der Einfachheit halber habe ich im vorangegangenen Beispiel als Statusobjekt eine ArrayList verwendet. Das Erzeugen von Kopien und das Leeren der Liste stellt jedoch einen nicht unerheblichen Overhead dar.

Die folgende Lösung verwendet als Status ein Wrapper-Objekt, das eine Liste enthält, welche direkt in den Downstream gepusht und dann neu erzeugt wird. Diese Variante ist ca 20 % schneller:

public <T> Gatherer<T, ?, List<T>> windowing(int windowSize) {
  return Gatherer.ofSequential(
      // Initializer
      () -> new Object() { ArrayList<T> list = new ArrayList<>(); },

      // Gatherer
      (state, element, downstream) -> {
        state.list.add(element);
        if (state.list.size() == windowSize) {
          boolean result = downstream.push(state.list);
          state.list = new ArrayList<>();
          return result;
        } else {
          return true;
        }
      },

      // Finisher
      (state, downstream) -> {
        if (!state.list.isEmpty()) {
          downstream.push(state.list);
        }
      });
}Code-Sprache: Java (java)
Das Statusobjekt new Object() { ArrayList<T> list = new ArrayList<>(); } ist übrigens ein sogenannter „non-denotable type” – ein Typ, der zwar existiert (weshalb wir auf state.list zugreifen können), der aber keinen Namen hat. Wenn wir dieses Objekt einer Variablen zuweisen wollen, funktioniert das nur, wenn wir die Variable mit var deklarieren:

var status = new Object() { ArrayList<String> list = new ArrayList<>(); };
status.list.add("element");

Würdest du in diesem Code var durch Object ersetzen, käme es zu einem Compilerfehler.

Im nächsten Abschnitt kommen wir zum letzten Stream-Gatherer-Baustein, dem Combiner.

Combiner

Um einen statusbehafteten Gatherer parallel auszuführen, benötigt er eine Combiner-Funktion. Ein Combiner kombiniert in der Join-Phase der parallelen Stream-Verarbeitung jeweils zwei Status zu einem.

In diesem Abschnitt wollen wir einen Gatherer implementieren, der entsprechend eines Comparators das größte aller eingehenden Elemente in den Downstream emittiert.

Als Status verwenden wir eine AtomicReference, die entweder kein Element oder das aktuell größte gefundene Element enthält:

Supplier<AtomicReference<T>> initializer = AtomicReference::new;
Code-Sprache: Java (java)

Der Integrator speichert das eingehende Element im Status, wenn der Status leer ist oder wenn das eingehende Element größer ist als das im Status gespeicherte Element. Da der Integrator immer true zurückliefert, kennzeichnen wir ihn als „greedy“:

Integrator<AtomicReference<T>, T, T> integrator =
    Integrator.ofGreedy(
        (state, element, downstream) -> {
          T bestElement = state.get();
          if (bestElement == null || comparator.compare(element, bestElement) > 0) {
            state.set(element);
          }
          return true;
        });
Code-Sprache: Java (java)

Der Finisher sendet das Element, sofern der Status eines enthält, an den Downstream:

BiConsumer<AtomicReference<T>, Downstream<T>> finisher =
    (state, downstream) -> {
      T bestElement = state.get();
      if (bestElement != null) {
        downstream.push(bestElement);
      }
    };
Code-Sprache: Java (java)

Und der Combiner kombiniert zwei Status zu einem:

  • Ist ein Status leer, gibt er den jeweils anderen Status zurück.
  • Enthalten beide Status ein Element, gibt er den Status mit dem größeren Element zurück.

Wenn der Eingangsstream leer ist, wird der Combiner nie aufgerufen, d. h. der Fall, dass beide Status leer sind, kann nicht eintreten.

BinaryOperator<AtomicReference<T>> combiner =
    (state1, state2) -> {
      T bestElement1 = state1.get();
      T bestElement2 = state2.get();

      if (bestElement1 == null) {
        return state2;
      } else if (bestElement2 == null) {
        return state1;
      } else if (comparator.compare(bestElement1, bestElement2) > 0) {
        return state1;
      } else {
        return state2;
      }
    };Code-Sprache: Java (java)

Initializer, Integrator, Combiner und Finisher kombinieren wir wieder mit Gatherer.of(...) zu einem Gatherer:

Gatherer<T, AtomicReference<T>, T> gatherer =
    Gatherer.of(initializer, integrator, combiner, finisher);
Code-Sprache: Java (java)

Und auch für den Combiner folgt ein Listing mit einer Methode, die einen Maximum-Gatherer aus den zuvor gezeigten Bausteinen erzeugt, sowie einer Methode, die diesen in einem parallelen Stream nutzt, um das längste Wort einer Liste zu finden:

public <T> Gatherer<T, AtomicReference<T>, T> maximumBy(Comparator<T> comparator) {
  return Gatherer.of(
      // Initializer
      AtomicReference::new,

      // Integrator
      Integrator.ofGreedy(
          (state, element, downstream) -> {
            T bestElement = state.get();
            if (bestElement == null || comparator.compare(element, bestElement) > 0) {
              state.set(element);
            }
            return true;
          }),

      // Combiner
      (state1, state2) -> {
        T bestElement1 = state1.get();
        T bestElement2 = state2.get();

        if (bestElement1 == null) {
          return state2;
        } else if (bestElement2 == null) {
          return state1;
        } else if (comparator.compare(bestElement1, bestElement2) > 0) {
          return state1;
        } else {
          return state2;
        }
      },

      // Finisher
      (state, downstream) -> {
        T bestElement = state.get();
        if (bestElement != null) {
          downstream.push(bestElement);
        }
      });
}

public Optional<String> getLongest(List<String> words) {
  return words.parallelStream()
      .gather(maximumBy(Comparator.comparing(String::length)))
      .findFirst();
}Code-Sprache: Java (java)

Damit haben wir alle Komponenten eines Stream Gatherers beisammen. Wir brauchen allerdings nicht für jeden Zweck einen eigenen Gatherer zu implementieren. Für einige häufig benötigte intermediäre Transformationen haben das die JDK-Entwickler bereits für uns getan.

Welche Gatherers bereits zur Verfügung stehen, erfährst du im nächsten Kapitel.

Im JDK verfügbare Stream Gatherers

Die im JDK vordefinierten Gatherer kannst du über entsprechende Factory-Methoden der Gatherers-Klasse erzeugen. Einen Gatherer hast du bereits im Einführungskapitel kennengelernt: den „Fixed Window“-Gatherer.

Im Folgenden findest du eine Übersicht der wichtigsten vordefinierten Gatherer:

  • Gatherers.fold(Supplier initial, BiFunction folder)
    kombiniert alle Elemente des Streams zu einem einzigen Element, ähnlich wie ein Collector. Ist dann hilfreich, wenn eine terminale Operation auf einem aus den Stream-Elementen kombinierten Element aufgerufen werden soll.
  • Gatherers.mapConcurrent(int maxConcurrency, Function mapper)
    führt die angegebene Mapping-Funktion in der angegebenen Anzahl virtueller Threads gleichzeitig aus.
  • Gatherers.peek(Consumer effect)
    Gatherers.peekOrdered(Consumer effect)

    senden jedes Element des Streams an den Consumer, bevor es an die nächste Stufe der Stream-Pipeline weitergeleitet wird. peekOrdered(...) stellt bei einem parallelen Stream die Verarbeitung in der richtigen Reihenfolge sicher.
  • Gatherers.scan(Supplier initial, BiFunction scanner)
    führt einen sogenannten Prefix-Scan durch.
  • Gatherers.windowFixed(int windowSize)
    Gatherers.windowSliding(int windowSize)

    gruppieren die Stream-Elemente in Listen der angebenen Größe. Bei der Sliding-Variante überlappen die Listen und sind um jeweils ein Element verschoben, so erzeugt bspw. Stream.of(1, 2, 3, 4, 5).gather(Gatherers.windowSliding(3)).toList() die folgende Liste von Listen: [[1, 2, 3], [2, 3, 4], [3, 4, 5]].

Gatherers kombinieren

So wie du beispielsweise mehrere filter(...)- und map(...)-Operationen hintereinander schalten kannst, kannst du auch mehrere Gatherer hintereinander aufrufen, z. B. so:

var result = source
    .gather(a)
    .gather(b)
    .gather(c)
    .collect(...);Code-Sprache: Java (java)

Wenn du eine bestimmte Abfolge von Gatherern regelmäßig benötigst, kannst du diese – ganz im Sinne von DRY (don't repeat yourself) – auch zu einem einzigen Gatherer kombinieren, z. B. so:

Gatherer abc = a.andThen(b).andThen(c);

var result = source
    .gather(abc)
    .collect(...);Code-Sprache: Java (java)

So kannst du eine beliebig lange Transformationssequenz redundanzfrei auf verschiedene Streams anwenden.

Limitierungen von Stream Gatherern

Stream Gatherer sind leistungsstarke Werkzeuge, haben jedoch zwei wesentliche Einschränkungen:

  • Es gibt sie (genau wie Collectoren) nicht für die primitiven Streams IntStream, LongStream und DoubleStream.
  • Sie haben (genau wie Collectoren) keinen Zugriff auf die (im Spliterator-Interface definierten) Charakteristika des Streams. Das bedeutet, dass sie nicht auf der Grundlage dieser Eigenschaften optimiert werden können (z. B. der Tatsache, dass die Größe bekannt ist oder dass der Stream nur unterschiedliche Elemente enthält).

Fazit

Mit Stream Gatherers können wir beliebige intermediäre Stream-Operationen implementieren, so wie wir mit Kollektoren schon immer beliebige terminale Operationen schreiben konnten. Das erlaubt uns, wesentlich aussagekräftigere Stream-Pipelines zu schreiben als bisher.

Über die Gatherers-Klasse können wir vorgefertigte Gatherer für eine Vielzahl von intermediären Operationen abrufen.

Welcher ist der erste der vorgefertigten Gatherer, den du einsetzen wirst? Welche Funktionalität planst du selbst als Gatherer zu implementieren? Schreibe mir einen Kommentar!

Du willst über alle neue Java-Features auf dem Laufenden sein? Dann klicke hier, um dich für den HappyCoders-Newsletter anzumelden.