Java Concurrency Paket

Mit JDK 5.0 wurde das Java Concurrency Paket, welches maßgeblich von Doug Lea mitentwickelt wurde (siehe Wikipedia). Das Paket java.util.concurrent fügt komplexe Klassen zur Laufzeitumgebung die das Steuern der Parallelität auf einem höheren Niveau als das grundlegenden Konstrukte von Java erlauben. 

In diesem Abschnitt werden einige wenige Klassen vorgestellt. Schauen Sie sich dieses Paket an bevor Sie etwas nebenläufiges implementieren. Hier gibt es sehr viele, sehr mächtige Lösungen, die die Vorlesung bei weitem übersteigen

Referenzen

Atomare Basistypen

 In java.util.concurrent.atomic werden Basistypen zur Verfügung gestellt die atomar sind.

Atomar bedeutet, dass das Lesen des Datums und das anschließende Modifizieren nicht von anderen Operationen unterbrochen werden kann. Das Lesen und Modifizieren erfolgt in einem kritischen Pfad.

Ein Beispiel: Die Klasse AtomicInteger implementiert Number und hat die zum Beispiel Methoden wie:

Anbei ein Beispiel zum nebenläufigen Addieren:

package s2.thread;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author s@scalingbits.com
*/

public class ParaIncrementAtomicInt extends Thread {
public static AtomicInteger zaehler;
public static final int MAX= Integer.MAX_VALUE/100;
public static void increment() {
zaehler.getAndIncrement();
}
/**
* Starten des Threads
*/
public void run() {
for (int i=0; i < MAX; i++) {
increment();
}
}
public static void main(String[] args) {
zaehler = new AtomicInteger(0);
ParaIncrementAtomicInt thread1 = new ParaIncrementAtomicInt();
ParaIncrementAtomicInt thread2 = new ParaIncrementAtomicInt();
long time = System.nanoTime();
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
}
time = (System.nanoTime() -time)/1000000L; // time in milliseconds
if ((2* ParaIncrementAtomicInt.MAX) == zaehler.get())
System.out.println("Korrekte Ausführung: " +
+ ParaIncrementAtomicInt.zaehler.get() + " (" + time + "ms)");
else
System.out.println("Fehler! Soll: " + (2* ParaIncrementAtomicInt.MAX) +
"; Ist: " +ParaIncrementAtomicInt.zaehler.get() + " (" + time + "ms)");
}
}

 

Das "Fork-Join-Framework"

Fork Join Klassen

Die Klasse ForkJoinPool

Die Klasse java.util.concurrent.ForkJoinPool wurde im JDK 7 eingeführt.

In diser Klasse werden eine Menge von Threads verwaltet und zur Ausführung von Task (Aufgaben) verwendet. Dies macht es dem Betriebsystem leichter die Threads auf die Prozessoren zu verteilen, da nicht beliebig viele Threads von Java erzeugt werden. Die Klasse ForkJoinPool verwaltet die Zuordnung der Task auf die konfigurierten (Software)threads.

Tasks können auf 3 Arten dem Framework übergeben werden:

  Aufrufe von ausserhalb Aufruf innerhalb einer Berechnung
asynchrone Ausführung execute(ForkJoinTask) ForkJoinTask.fork()
warte auf Ergebnis invoke(ForkJoinTask) ForkJoinTask.invoke()
asynchrone Ausführung mit Ergebnis submit(ForkJoinTask)
submit(Callable)
ForkJoinTask.fork(ForkJoinTasks sind Futures)

Die Implementierung wird eine RejectedExecutionException werden falls der Threadpool runtergefahren wird oder falls es keine Threads mehr in der VM gibt. 

Interessante Konstruktoren:

Paralleles Abarbeiten ohne Rückgabewerte

Ein Beispiel hierfür ist der Quicksort. Man kann parallel das linke und das rechte Teilintervall sortieren nachdem die Vorarbeit (seriell) erledigt wurde.

Hierzu nutzt man die abstrakte Klasse RecursiveAction die aus der abstrakten Klasse ForkJoinTask spezialisiert wird. Die Klasse ForkJoinPool kann Tasks die die Bedingungen der Klasse ForkJoinTask erfüllen, parallel ausführen. Hierzu gibt es in der Klasse die Methoden:

Es verbleibt das Problem, dass man einen bestimmten Code ausführen möchte. Hierzu dient die abstrakte Klasse RecursiveAction. Sie besitzt eine abstrakte Methode:

  • compute(): führt den Code aus. Diese Methode ist abstrakt!

Wie implementiert man so etwas?

  1. Man implementiert den Algorithmus in einer Unterklasse von RecursiveAction.
    1. Alle Eingabeparameter werden als Attribute im Objekt gespeichert
    2. Man implementiert einen Konstruktor der die Parameter entgegennimmt und abspeichert
    3. Der Algorithmus wird in compute() implementiert
  2. Man bettet seine Klasse als innere Klasse ein
    1. Man hat dann Zugriff auf alle Attribute der äusseren Klasse!
  3. Man legt sich einen Threadpool an
    1. Wahrscheinlich als globales, statisches Objekt
  4. Man startet seinen Algorithmus aus der äusseren Klasse
    1. Anlegen eines Objekts mit den Parametern
    2. Aufruf von invoke() im Treadpool

 Hilfe! Ich muß das am Beispiel sehen...

Paralleles Abarbeiten mit Rückgabewerte

Die Aufgabe parallelisieren und als Aufgabe starten reicht oft nicht. Oft müssen die Ergebnisse entgegen genommen werden und dann rekursiv zu neuen Ergebnissen aggregiert werden.

Hierzu benötigt man einen Mechanismus der

  • die Eingabeparameter für jede Aufgabe (Task) entgegennimmt
  • die Aufgaben (Tasks) ausführt
  • und die Ergebnisse erst zurückgibt wenn die Ergebnisse vorliegen

Die Schnittstelle Callable

Die Schnittstelle java.util.concurrent.Callable muss implementiert werden um die Eingabeparameter einer Aufgabe (Task) entgegenzunehmen. Hierfür muss man

  • einen generischen Typen wählen der das Ergebnis zur Verfügung stellt
  • einen Konstruktor implementieren der alle Eingabeparameter entgegen nimmt.
  • alle Parameter als Attribute implementiert
  • die Methode V call() die das Ergebnis zurückliefert

Ein Beispiel auf der Programmierübung Ariadnefaden

public class SearchCallable implements Callable<List<Position>> {
Position von;
Position nach;
Ariadne4Parallel a; /** * * @param a Instanz von Ariadne * @param von Start * @param nach Ziel */ SearchCallable(Ariadne4Parallel a,Position von, Position nach) { this.a =a; this.von = von; this.nach = nach; } /** * Führe Task in eigenem Thread aus und nutze Instanzvariablen * als Parameter um Aufgabe auszuführen. * @throws java.lang.Exception */ @Override public List<Position> call() throws Exception { return a.suche(von, nach); } }

In diesem Beispiel ist das Ergebnis eine Liste von Positionen die den Web aus dem Labyrint weisen.

Die Eingabeparameter sind ein Labyrinth, die Startposition und die Position mit dem Ausgang ais dem Labyrinth.

Die Schnittstelle Future

Die Schnittstelle Future erlaubt es das Ergebnis einer Aufgabe zu analysieren. Die Schnittstelle ist generisch und erlaubt es daher nur einen bestimmten Type einer Klasse auszulesen.

Die beiden wichtigsten Methoden der Schnittstelle sind:

  • V get(): wartet falls notwendig auf das Beenden der Aufgabe und liefert das Ergebnis ab
  • V get(long timeout, TimeUnit unit): wartet eine bestimmte Zeit auf auf ein bestimmtes Ergebnis

In der Schnittstellendefinition sind noch weitere Methoden zum Kontrollieren und Beenden von Aufgaben (Tasks) vorhanden.

Abschicken von Aufgaben (Tasks)

Die Klasse  java.util.concurrent.ForkJoinPool verfügt über eine Methode

  • public <T> ForkJoinTask<T> submit(Callable<T> task)

Diese Methode nimmt die Eingaben für die Aufgabe (Task) an solange es eine Spezialierung der Klasse Callable<T> ist. Das Ergebnis ist ein Objekt vom Typ ForkJoinTask. Die Klasse ForkJoinTask implementiert die Schnittstelle Future. Das Ergebnis der Aufgabe (Task) wird hier abgeliefert sobald die Aufgabe abgearbeitet ist.

Wie implementiert man so etwas?

  1. Man implementiert den Algorithmus in einer Unterklasse von RecursiveAction.
    1. Alle Eingabeparameter werden als Attribute in einem Objekt von Callable gespeichert
    2. Man implementiert einen Konstruktor der die Parameter entgegennimmt und abspeichert
    3. Der Algorithmus wird in compute() implementiert
      1. Der Algorithmus führt rekursive Aufrufe durch und nimmt die Ergebnisse vom Typ Future an.
  2. Man bettet seine Klasse die Callable spezialisiert als innere Klasse ein
    1. Man hat dann Zugriff auf alle Attribute der äusseren Klasse!
  3. Man legt sich einen Threadpool an
    1. Wahrscheinlich als globales, statisches Objekt
  4. Man startet seinen Algorithmus aus der äusseren Klasse
    1. Anlegen eines Objekts mit den Parametern
    2. Aufruf von invoke() im Treadpool

 Hilfe! Ich muß das am Beispiel sehen...