Java Concurrency Paket

Java Concurrency Paket

Mit JDK 5.0 wurde das Java Concurrency Paket, welches maßgeblich von Doug Lea mitentwickelt wurde (siehe Wikipedia), Teil der Laufzeitumgebung (JRE). Das Paket java.util.concurrent fügt komplexe Klassen zur Laufzeitumgebung die das Steuern der Parallelität auf einem höheren Niveau als das grundlegenden Konstrukte von Java erlauben. 

In diesem Abschnitt werden einige, wenige Klassen vorgestellt. Schauen Sie sich dieses Paket an bevor Sie etwas Nebenläufiges implementieren. Hier gibt es sehr viele, sehr mächtige Lösungen, die die Vorlesung bei weitem übersteigen

Referenzen

Stefan Schneider Mon, 01/28/2019 - 09:10

Atomare Basistypen

Atomare Basistypen

 In java.util.concurrent.atomic werden Basistypen zur Verfügung gestellt die atomar sind.

Atomar bedeutet, dass das Lesen des Datums und das anschließende Modifizieren nicht von anderen Operationen unterbrochen werden kann. Das Lesen und Modifizieren erfolgt in einem kritischen Pfad.

Ein Beispiel: Die Klasse AtomicInteger implementiert Number und hat die zum Beispiel Methoden wie:

Anbei ein Beispiel zum nebenläufigen Addieren:

package s2.thread;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author s@scalingbits.com
*/

public class ParaIncrementAtomicInt extends Thread {
public static AtomicInteger zaehler;
public static final int MAX= Integer.MAX_VALUE/100;
public static void increment() {
zaehler.getAndIncrement();
}
/**
* Starten des Threads
*/
public void run() {
for (int i=0; i < MAX; i++) {
increment();
}
}
public static void main(String[] args) {
zaehler = new AtomicInteger(0);
ParaIncrementAtomicInt thread1 = new ParaIncrementAtomicInt();
ParaIncrementAtomicInt thread2 = new ParaIncrementAtomicInt();
long time = System.nanoTime();
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
}
time = (System.nanoTime() -time)/1000000L; // time in milliseconds
if ((2* ParaIncrementAtomicInt.MAX) == zaehler.get())
System.out.println("Korrekte Ausführung: " +
+ ParaIncrementAtomicInt.zaehler.get() + " (" + time + "ms)");
else
System.out.println("Fehler! Soll: " + (2* ParaIncrementAtomicInt.MAX) +
"; Ist: " +ParaIncrementAtomicInt.zaehler.get() + " (" + time + "ms)");
}
}

 

Stefan Schneider Tue, 03/05/2019 - 18:55

Das "Fork-Join-Framework"

Das "Fork-Join-Framework"

Es gibt zwei Möglichkeiten das Threading in Java zu nutzen:

  • Erledigen unterschiedlicher Aufgaben parallel
    • Bsp: Swing: Oberfläche neu zeichen, Eingaben abarbeiten, eigentliche Programmaufgaben ausführen
  • Paralleles Abarbeiten der gleichen Aufgabe
    • Bsp: Differentialgleichungen berechnen, Such & Sortieralgorithmen, Bearbeitung von Massendaten (Batch).

Das Fork & Join Framework erleichert das parallele Programmieren durch eine Optimierung der Parallelität und komfortable Klassen.

Im Rahmen der Vorlesung werden wir die Möglichkeiten einer Parallelisierung ohne und mit Rückgaben diskutieren. Die Unterschiede zum Einsatz der einfachen Threadingkonstrukte und der des Frameworks sind im Diagramm unten dargestellt:

3 Möglichkeiten der Parallelisierung

Steuerung des Grads der Parallelisieren

Java erlaubt es recht einfach viele Ausführungseinheiten (Threads) zu erzeugen. Das Betriebssystem wird diese Threads auf Hardwarethreads zuordnen. Das macht das Betriebssystem auch fast immer sehr gut. Erzeugt man aber zufiele ausführbare Threads kann der Rechner überlastet werden und ist kaum noch ansprechbar. Das bedeutet, dass Prozesse die mit dem Benutzer interagieren sollen nicht mehr unbedingt die CPU Zeit bekommen die sie benötigen.

Hat man viel, zu viele Softwarethreads und Prozesse wird das Betriebsystem auch noch ineffizient. Es muss die ganzen Zustandänderungen managen. Hier gibt es kritische Pfade und im Überlastungsfall zuwenig CPU-Zeit.

Das Fork & Join Framework erleichtert das professionelle Skalieren in dem es eine Threadpool anlegt und alle Tasks an freie Prozessoren im Threadpool vergibt. Hiermit übernimmt es eine Aufgaben des Betriebsystem und vermeidet eine Überlastung. Gibt man keinen Wert für den Pool an, wird die Anzahl aller Prozessoren im Rechner als Defaultwert verwendet.

Dies ist nur ein kleiner Ausschnitt aus dem Fork & Join Framwork. Wir betrachten hierzu die folgenden Klassen:

Fork Join Klassen

Die Klasse ForkJoinPool

Die Klasse java.util.concurrent.ForkJoinPool wurde im JDK 7 eingeführt.

In dieser Klasse werden eine Menge von Threads verwaltet und zur Ausführung von Task (Aufgaben) verwendet. Dies macht es dem Betriebsystem leichter die Threads auf die Prozessoren zu verteilen, da nicht beliebig viele Threads von Java erzeugt werden. Die Klasse ForkJoinPool verwaltet die Zuordnung der Task auf die konfigurierten (Software)threads.

Tasks können auf 3 Arten dem Framework übergeben werden:

  Aufrufe von ausserhalb Aufruf innerhalb einer Berechnung
asynchrone Ausführung ForkJoinPool.execute(ForkJoinTask) ForkJoinTask.fork()
warte auf Ergebnis ForkJoinPool.invoke(ForkJoinTask) ForkJoinTask.invoke()
asynchrone Ausführung mit Ergebnis ForkJoinPool.submit(ForkJoinTask)
ForkJoinPool.submit(Callable)
ForkJoinTask.fork(ForkJoinTasks sind Futures)

Die Implementierung wird zu einer RejectedExecutionException Ausnhame führen falls der Threadpool runtergefahren wird oder falls es keine Threads mehr in der VM gibt. 

Interessante Konstruktoren:

Paralleles Abarbeiten ohne Rückgabewerte

Ein Beispiel hierfür ist der Quicksort. Man kann parallel das linke und das rechte Teilintervall sortieren nachdem die Vorarbeit (seriell) erledigt wurde.

Hierzu nutzt man die abstrakte Klasse RecursiveAction die aus der abstrakten Klasse ForkJoinTask spezialisiert wird. Die Klasse ForkJoinPool kann Tasks die die Bedingungen der Klasse ForkJoinTask erfüllen, parallel ausführen. Hierzu gibt es in der Klasse die Methoden:

Es verbleibt das Problem, dass man einen bestimmten Code ausführen möchte. Hierzu dient die abstrakte Klasse RecursiveAction. Sie besitzt eine abstrakte Methode:

  • compute(): führt den Code aus. Diese Methode ist abstrakt!

Wie implementiert man so etwas?

  1. Man implementiert den Algorithmus in einer Unterklasse von RecursiveAction.
    1. Alle Eingabeparameter werden als Attribute im Objekt gespeichert
    2. Man implementiert einen Konstruktor der die Parameter entgegennimmt und abspeichert
    3. Der Algorithmus wird in compute() implementiert
  2. Man bettet seine Klasse als innere Klasse ein
    1. Man hat dann Zugriff auf alle Attribute der äusseren Klasse!
  3. Man legt sich einen Threadpool an
    1. Wahrscheinlich als globales, statisches Objekt
  4. Man startet seinen Algorithmus aus der äusseren Klasse
    1. Anlegen eines Objekts mit den Parametern
    2. Aufruf von invoke() im Treadpool

 Hilfe! Ich muß das am Beispiel sehen...

Paralleles Abarbeiten mit Rückgabewerte

Die Aufgabe parallelisieren und als Aufgabe starten reicht oft nicht. Oft müssen die Ergebnisse entgegen genommen werden und dann rekursiv zu neuen Ergebnissen aggregiert werden.

Hierzu benötigt man einen Mechanismus der

  • die Eingabeparameter für jede Aufgabe (Task) entgegennimmt
  • die Aufgaben (Tasks) ausführt
  • und die Ergebnisse erst zurückgibt wenn die Ergebnisse vorliegen

Die Schnittstelle Callable

Die Schnittstelle java.util.concurrent.Callable muss implementiert werden um die Ausgabeparameter einer Aufgabe (Task) entgegenzunehmen. Hierfür muss man

  • einen generischen Typen wählen der das Ergebnis zur Verfügung stellt
  • einen Konstruktor implementieren der alle Eingabeparameter entgegen nimmt.
  • alle Parameter als Attribute implementiert
  • die Methode V call() die das Ergebnis zurückliefert

Ein Beispiel auf der Programmierübung Ariadnefaden

public class SearchCallable implements Callable<List<Position>> {
        Position von;
        Position nach;
        Ariadne4Parallel a;
         /**
         * 
         * @param a Instanz von Ariadne
         * @param von Start
         * @param nach Ziel
         */
        SearchCallable(Ariadne4Parallel a,Position von, Position nach) {
            this.a =a;
            this.von = von;
            this.nach = nach;
        }
        /**
         * Führe Task in eigenem Thread aus und nutze Instanzvariablen
         * als Parameter um Aufgabe auszuführen.
         * @throws java.lang.Exception
         */
        @Override
        public List<Position> call() throws Exception {
            return a.suche(von, nach);
        }
    }

In diesem Beispiel ist das Ergebnis eine Liste von Positionen die den Web aus dem Labyrinth weisen.

Die Eingabeparameter sind ein Labyrinth, die Startposition und die Position mit dem Ausgang ais dem Labyrinth.

Die Schnittstelle Future

Die Schnittstelle Future erlaubt es das Ergebnis einer Aufgabe zu analysieren. Die Schnittstelle ist generisch und erlaubt es daher nur einen bestimmten Type einer Klasse auszulesen.

Die beiden wichtigsten Methoden der Schnittstelle sind:

  • V get(): wartet falls notwendig auf das Beenden der Aufgabe und liefert das Ergebnis ab
  • V get(long timeout, TimeUnit unit): wartet eine bestimmte Zeit auf auf ein bestimmtes Ergebnis

In der Schnittstellendefinition sind noch weitere Methoden zum Kontrollieren und Beenden von Aufgaben (Tasks) vorhanden.

Abschicken von Aufgaben (Tasks)

Die Klasse  java.util.concurrent.ForkJoinPool verfügt über eine Methode

  • public <T> ForkJoinTask<T> submit(Callable<T> task)

Diese Methode nimmt die Eingaben für die Aufgabe (Task) an solange es eine Spezialierung der Klasse Callable<T> ist. Das Ergebnis ist ein Objekt vom Typ ForkJoinTask. Die Klasse ForkJoinTask implementiert die Schnittstelle Future. Das Ergebnis der Aufgabe (Task) wird hier abgeliefert sobald die Aufgabe abgearbeitet ist.

Wie implementiert man so etwas?

  1. Man implementiert den Algorithmus in einer Unterklasse von RecursiveAction.
    1. Alle Eingabeparameter werden als Attribute in einem Objekt von Callable gespeichert
    2. Man implementiert einen Konstruktor der die Parameter entgegennimmt und abspeichert
    3. Der Algorithmus wird in compute() implementiert
      1. Der Algorithmus führt rekursive Aufrufe durch und nimmt die Ergebnisse vom Typ Future an.
  2. Man bettet seine Klasse die Callable spezialisiert als innere Klasse ein
    1. Man hat dann Zugriff auf alle Attribute der äusseren Klasse!
  3. Man legt sich einen Threadpool an
    1. Wahrscheinlich als globales, statisches Objekt
  4. Man startet seinen Algorithmus aus der äusseren Klasse
    1. Anlegen eines Objekts mit den Parametern
    2. Aufruf von invoke() im Treadpool

 Hilfe! Ich muß das am Beispiel sehen...

Stefan Schneider Wed, 03/06/2019 - 18:39