PriorityBlockingQueue - Feature ImagePriorityBlockingQueue - Feature Image
HappyCoders Glasses

PriorityBlockingQueue in Java
(mit Beispiel)

Sven Woltmann
Sven Woltmann
Aktualisiert: 27. November 2024

In diesem Artikel erfährst du wie die PriorityBlockingQueue funktioniert und welche Eigenschaften sie hat. Anhand eines Beispiels siehst du, wie man sie einsetzt.

Hier befinden wir uns in der Klassenhierarchie:

PriorityBlockingQueue in der Klassenhierarchie
PriorityBlockingQueue in der Klassenhierarchie

PriorityBlockingQueue Eigenschaften

Bei der java.util.concurrent.PriorityBlockingQueue handelt es sich um eine threadsichere und blockierende Variante der PriorityQueue. In dem verlinkten Artikel erfährst du auch, was eine Priotity Queue ist.

Wie bei der PriorityQueue werden die Elemente in einem Array gespeichert, das einen Min-Heap repräsentiert; der Iterator durchläuft die Elemente in entsprechender Reihenfolge.

Threadsicherheit wird durch ein einzelnes ReentrantLock sichergestellt.

Die PriorityBlockingQueue ist nicht bounded, sie hat also keine Kapazitätsgrenze. Das bedeutet, dass put(e) und offer(e, time, unit) niemals blockieren. Nur die Dequeue-Operationen take() und poll(time, unit) blockieren, wenn die Queue leer ist.

Die Eigenschaften im Detail:

Unterliegende DatenstrukturThread-safe?Blocking/
Non-blocking
Fairness
Policy
Bounded/
Unbounded
Iterator Type
Min-Heap
(gespeichert in einem Array)
Ja
(pessimistisches Locking mit einem Lock)
Blocking
(nur dequeue)
Nicht verfügbarUnboundedWeakly consistent¹

¹ Weakly consistent: Alle Elemente, die zum Zeitpunkt der Erzeugung des Interators in der Queue liegen, werden vom Iterator genau einmal durchlaufen. Änderungen, die danach erfolgen, können – müssen aber nicht – durch den Iterator berücksichtigt werden.

Einsatzempfehlung

Die PriorityBlockingQueue wird im JDK nicht genutzt. Es ist daher nicht auszuschließen, dass sie Bugs enthält. Wenn du eine Queue mit entsprechenden Eigenschaften benötigst und die PriorityBlockingQueue verwendest, solltest du deine Anwendung intensiv testen.

PriorityBlockingQueue Beispiel

Das folgenden Beispiel zeigt, wie eine PriorityBlockingQueue angelegt wird und wie mehrere Threads lesend und schreibend darauf zugreifen (→ Code auf GitHub).

Lesende Threads starten alle 3 Sekunden, beginnend sofort nach dem Erstellen der Queue.

Schreibende Threads starten nach 3,5 Sekunden (so dass bereits zwei lesende Threads warten) und schreiben sekündlich einen Zufallswert in die Queue.

public class PriorityBlockingQueueExample {
  private static final long startTime = System.currentTimeMillis();

  public static void main(String[] args) throws InterruptedException {
    BlockingQueue<Integer> queue = new PriorityBlockingQueue<>();
    ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);

    // Start reading from the queue immediately, every 3 seconds
    for (int i = 0; i < 8; i++) {
      int delaySeconds = i * 3;
      pool.schedule(() -> dequeue(queue), delaySeconds, TimeUnit.SECONDS);
    }

    // Start writing to the queue after 3.5 seconds (so there are already 2 threads
    // waiting), every 1 seconds (so that the queue fills faster than it's emptied,
    // so that we see some more elements and their order in the queue)
    for (int i = 0; i < 8; i++) {
      int delayMillis = 3500 + i * 1000;
      pool.schedule(() -> enqueue(queue), delayMillis, TimeUnit.MILLISECONDS);
    }

    pool.shutdown();
    pool.awaitTermination(1, TimeUnit.MINUTES);
  }

  private static void enqueue(BlockingQueue<Integer> queue) {
    int element = ThreadLocalRandom.current().nextInt(10, 100);
    log("Calling queue.put(%d) (queue = %s)...", element, queue);
    try {
      queue.put(element);
      log("queue.put(%d) returned (queue = %s)", element, queue);
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    }
  }

  private static void dequeue(BlockingQueue<Integer> queue) {
    log("    Calling queue.take() (queue = %s)...", queue);
    try {
      Integer element = queue.take();
      log("    queue.take() returned %d (queue = %s)", element, queue);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
  }

  private static void log(String format, Object... args) {
    System.out.printf(
        Locale.US,
        "[%4.1fs] [%-16s] %s%n",
        (System.currentTimeMillis() - startTime) / 1000.0,
        Thread.currentThread().getName(),
        String.format(format, args));
  }
}Code-Sprache: Java (java)

Im folgenden siehst du eine beispielhafte Ausgabe des Programms:

[ 0.0s] [pool-1-thread-1 ]     Calling queue.take() (queue = [])...
[ 3.0s] [pool-1-thread-2 ]     Calling queue.take() (queue = [])...
[ 3.5s] [pool-1-thread-6 ] Calling queue.put(87) (queue = [])...
[ 3.5s] [pool-1-thread-6 ] queue.put(87) returned (queue = [])
[ 3.5s] [pool-1-thread-1 ]     queue.take() returned 87 (queue = [])
[ 4.5s] [pool-1-thread-9 ] Calling queue.put(89) (queue = [])...
[ 4.5s] [pool-1-thread-9 ] queue.put(89) returned (queue = [])
[ 4.5s] [pool-1-thread-2 ]     queue.take() returned 89 (queue = [])
[ 5.5s] [pool-1-thread-7 ] Calling queue.put(31) (queue = [])...
[ 5.5s] [pool-1-thread-7 ] queue.put(31) returned (queue = [31])
[ 6.0s] [pool-1-thread-4 ]     Calling queue.take() (queue = [31])...
[ 6.0s] [pool-1-thread-4 ]     queue.take() returned 31 (queue = [])
[ 6.5s] [pool-1-thread-5 ] Calling queue.put(71) (queue = [])...
[ 6.5s] [pool-1-thread-5 ] queue.put(71) returned (queue = [71])
[ 7.5s] [pool-1-thread-8 ] Calling queue.put(15) (queue = [71])...
[ 7.5s] [pool-1-thread-8 ] queue.put(15) returned (queue = [15, 71])
[ 8.5s] [pool-1-thread-10] Calling queue.put(33) (queue = [15, 71])...
[ 8.5s] [pool-1-thread-10] queue.put(33) returned (queue = [15, 71, 33])
[ 9.0s] [pool-1-thread-3 ]     Calling queue.take() (queue = [15, 71, 33])...
[ 9.0s] [pool-1-thread-3 ]     queue.take() returned 15 (queue = [33, 71])
[ 9.5s] [pool-1-thread-6 ] Calling queue.put(58) (queue = [33, 71])...
[ 9.5s] [pool-1-thread-6 ] queue.put(58) returned (queue = [33, 71, 58])
[10.5s] [pool-1-thread-1 ] Calling queue.put(19) (queue = [33, 71, 58])...
[10.5s] [pool-1-thread-1 ] queue.put(19) returned (queue = [19, 33, 58, 71])
[12.0s] [pool-1-thread-9 ]     Calling queue.take() (queue = [19, 33, 58, 71])...
[12.0s] [pool-1-thread-9 ]     queue.take() returned 19 (queue = [33, 71, 58])
[15.0s] [pool-1-thread-2 ]     Calling queue.take() (queue = [33, 71, 58])...
[15.0s] [pool-1-thread-2 ]     queue.take() returned 33 (queue = [58, 71])
[18.0s] [pool-1-thread-7 ]     Calling queue.take() (queue = [58, 71])...
[18.0s] [pool-1-thread-7 ]     queue.take() returned 58 (queue = [71])
[21.0s] [pool-1-thread-4 ]     Calling queue.take() (queue = [71])...
[21.0s] [pool-1-thread-4 ]     queue.take() returned 71 (queue = [])Code-Sprache: Klartext (plaintext)

Was kann man in dieser Beispielausgabe erkennen?

Zunächst einmal siehst du, wie nach 0,0 s und 3,0 s die Threads 1 und 2 beim Aufruf von take() blockieren, da die Queue leer ist.

Nach 3,5 s schreibt Thread 6 die 87 in die Queue. Sofort im Anschluss wacht der zuvor blockierte Thread 1 auf und entnimmt die 87 wieder.

Nach 4,5 s schreibt Thread 9 die 89 in die Queue, die sofort von Thread 2 wieder entnommen wird.

Nach 5,5 s wird die 31 in die Queue geschrieben, die nach 6,0 s wieder entnommen wird.

Nach 6,5 s, 7,5 s und 8,5 s werden die 71, die 15 und die 33 in die Queue geschrieben. Du siehst, wie jeweils das kleinste Element vorne (links) in der Queue steht.

Nach 9,0 s wird das kleinste Elemente, die 15, entnommen. Daraufhin steht das nächstkleinere Element, die 33 am Kopf der Queue.

Nach 9,5 s und 10,5 s werden zwei weitere Elemente, 58 und 19, in die Queue geschrieben. Du siehst wieder gut, wie jeweils das kleinste Element am Kopf der Queue steht.

Die Queue enthält nun vier Elemente. Es werden keine weiteren Elemente in die Queue geschrieben und die existierenden Elemente entsprechend ihrer Priorität entnommen.

Zusammenfassung und Ausblick

In diesem Artikel hast du erfahren, welche Eigenschaften die PriorityBlockingQueue hat und wie diese eingesetzt wird.

Ab dem nächsten Teil der Tutorial-Serie stelle ich dir einige Queue-Implementierungen für Sonderfälle vor, beginnend mit der DelayQueue.

Wenn du noch Fragen hast, stelle sie gerne über die Kommentar-Funktion. Möchtest du über neue Tutorials und Artikel informiert werden? Dann klicke hier, um dich für den HappyCoders.eu-Newsletter anzumelden.