Threads (Nebenläufigkeit)

Threads (Nebenläufigkeit)

Duke in Threads

Einleitung

Java bietet als Programmiersprache von Beginn an eine aktive Unterstützung für das Programmieren mit Threads (engl der Faden, das Fädchen). Hierdurch ist es in Java relativ einfach nebenläufige Programme zu implementieren.
Das Threadkonzept erlaubt es in der gleichen Javaanwendungen Dinge parallel abzuarbeiten und trotzdem auf die gleichen Daten zuzugreifen.

 

Hiermit ist die Implementierung von Javaanwendungen mit den folgenden Vorteilen möglich:

  • man kann sehr viel mehr Aufgaben abarbeiten, da man es gleichzeitig tun kann.
  • man kann Aufgaben schneller zu erledigen, da man es parallel tun kann.
  • man kann Aufgaben asynchron, parallel im Hintergrund abarbeiten ohne auf andere Aufgaben warten zu lassen.

Prozesse und Threads im Betriebssystem

Mit Java-Threads kann man nebenläufige Programme programmieren die es erlauben mehrere Dinge gleichzeitig zu tun. Ein sehr einfaches Beispiel ist eine Programm mit einer graphische Benutzeroberfläche. Dieses Programm lädt typischerweise in einem Thread eine Datei aus dem Internet während gleichzeitig ein anderer Thread auf dem Bildschirm einen Fortschrittsbalken vergrößert.

Betriebssysteme verwalten die Ressourcen eines Rechners. Beim Programmieren mit nebenläufigen Java-Threads ist es wichtig zu verstehen, wie die beiden wichtigen Resourcen Hauptspeicher und Prozessoren vom Javalaufzeitsystem und dem Betriebsystem verwaltet werden. Bei der Betrachtung des Speichers spielt nur der virtuelle Speicher des Betriebsystems eine Rolle, da der Javaentwickler in der Regel keinen direkten Einfluss auf dem physischen Speicher nehmen kann. Moderne Betriebsysteme sind in der Lage Programme gleichzeitig bzw. nebenläufig auszuführen.

Definition
Prozess(Informatik)
Ein Prozess bezeichnet in der Informatik ein im Ablauf befindliches Computerprogramm (siehe Wikipedia). Zum Prozess gehört das Programm samt seiner Daten und dem Prozesskontext (siehe Wikipedia).

 

Das Javalaufzeitsystem ist aus der Sicht des Betriebssystems während seiner Ausführung ein Prozess.

Prozesse besitzen während ihrer Ausführung typischerweise:

  • ein Programm welches sie ausführen. Mehrere Prozesse können durchaus das gleiche Programm ausführen
  • einen eigenen Speicherbereich zur Verwaltung der Daten. Das Betriebsystem verwaltet diesen Speicher und sorgt dafür das alle Prozesse ihren eigenen Speicher benutzen können ohne sich gegenseitig zu beinflussen. Bei Java ist der Heap der bekannteste gemeinsame Speicherbereich.
  • Zumindest einen Programmstack (Stapel) zur Verwaltung der Daten von Methoden und einen dazugehörige Befehlszähler/zeiger
  • Zugriff auf Betriebssystemressourcen (Bildschirm, Tastatur, Massenspeicher, Netzwerk etc.)

Threads (engl. der Faden, das Fädchen) sind leichtgewichtige Ausführungseinheiten eines Prozesses:

Definition
Thread (Informatik)
Ein Thread (auch: Aktivitätsträger oder leichtgewichtiger Prozess) bezeichnet in der Informatik einen Ausführungsstrang oder eine Ausführungsreihenfolge in der Abarbeitung eines Programms. Ein Thread ist Teil eines Prozesses. (siehe Wikipedia)

Das Javalaufzeitsystem ist typischerweise ein Prozess des Betriebssystems. Die Java-Threads werden normalerweise auf Betriebssystem-Threads abgebildet. Dies war in frühen Javaimplementierungen (1.1) nicht der Fall. Hier wurden die Threads von Java selbst verwaltet (siehe Green Threads) .

Ein Thread besitzt typischerweise

  • keinen eigenen Speicher (Heap). Er und alle anderen Threads des Prozesses haben Zugriff auf den gemeinsamen Hauptspeicher seines Prozesses. Durch das gemeinsame, gleichzeitige Arbeiten auf den gemeinsamen Daten ist der Datenaustausch zwischen Threads sehr einfach. Die Konsistenz der Daten muss jedoch aktiv verwaltet werden.
  • einen eigenen Programmstack. Er dient der Verwaltung der aktuell aufgerufenen Methodenvariablen.
  • einen eigenen Befehlszähler

Prozesse bestehen aus mindestens einem Thread (dem Haupt-Thread, Main-Thread) und eventuell zusätzlichen Threads die eigene Programmstacks zur parallelen Ausführung besitzen. Die Lebensdauer von Threads ist durch die Lebensdauer des dazugehörigen Prozesses beschränkt. Sie enden mit dem Beenden des Pozesses.

 

Nebenläufige Ausführung im Betriebssystem

Betriebsysteme weisen den Prozessen die Prozessoren zur Ausführung zu. Hierfür verwendet man die englischen Begriffe des "scheduling" oder "dispatching". Ziel des Betriebssystems ist es die Prozessoren möglichst gut auszunutzen und eine faire, vorteilhafte Abarbeitung aller Programme in Prozessen zu gewährleisten.

Die (historisch) einfachste Art der Verwaltung von Prozessen durch das Betriebsystem ist der Batchbetrieb (Stapelbetrieb). Ein Rechner hat typischerweise nur einen Prozessor. Das Betriebssystem kann nur ein Programm gleichzeitig als Prozess ausführen. Es weist dem Prozessor eine Programm A zu dieses läuft bis es beendet wird. Anschließend wird das nächste Programm ausgeführt: 

Multi tasking: Um interaktive Benutzer zu bedienen ist es geschickter laufende Prozesse zu unterbrechen und andere Prozesse teilweise abzuarbeiten. Aufgrund der hohen Prozessorgeschwindigkeit hat der menschliche Betrachter den Eindruck, die Prozesse laufen gleichzeitig. Alle moderne Betriebsysteme arbeiten nach diesem Prinzip. Das Unterbrechen der Prozesse ist oft problemlos möglich, da sie sich oft selbst blockieren. Sie müssen da si relativ lange auf Daten von Benutzern, Festplatten, dem Netzwerk warten müssen. In diesen vielen Zwangpausen kann das Betriebssyteme andere, lauffähige Prozesse abarbeiten. 

Die Prozesse laufen jetzt verschränkt und sie werden in vielen einzelnen Blöcken abgearbeitet. Sie werden quasi-parallel abgearbeitet.

Multi tasking-Multiprozessor: Da alle modernen Prozessoren mehrere Ausführungseinheiten besitzen können die Betriebsysteme Prozesse parallel abarbeiten. Die Prozesse müssen nicht zwangsweise auf dem gleichen Prozessor ausgeführt werden (siehe Beispiel). Die Gesamtausführungszeiten können bei mehren Prozessoren entsprecht verkürzen.

Multithreaded-Multiprozessor: Da Threads leichtgewichtige Prozesse mit einem Programmstack und einem eigenen Programmablauf sind, werden sie bei der Prozessorvergabe wie Prozesse behandelt.  Ein Prozess kann jetzt mehrere Prozessoren gleichzeitig verwenden wenn er nur über mehrere Theads verfügt. Anwendungen können jetzt innerhalb eines Prozesses skalieren. Dies bedeutet, sie können (threoretisch) beliebig viele Prozessoren benutzen und damit beliebig viele Aufgaben in einer bestimmten Zeit abarbeiten. Der Durchsatz eines Prozesses, bzw. die Abarbeitungsgeschwindigkeit ist nicht mehr direkt an die Geschwindigkeit eines einzelnen Prozessors gebunden.

 

 

Stefan Schneider Sat, 05/28/2011 - 10:08

Thread Zustandsübergänge

Thread Zustandsübergänge

Threads und Prozesse haben unterschiedliche Zustände, die von den verfügbaren Betriebsmitteln abhängen:

  • blocked: ein Thread der auf Daten von einem Gerät wie Festplatte, Tastatur, Maus oder Netzwerk wartet ist blockiert. Das Betriebssystem wird ihm keinen Prozessor zuweisen da er ihn nicht nutzen kann solange ihm die notwendigen Daten fehlen.
  • ready-to-run: Der Thread hat alle Betriebsmittel, mit Ausnahme des Prozessors, die er zum Laufen benötigt. Er wartet bis der Dispatcher einen Prozessor zuweist.
  • running: Der Thread hat alle nötigen Betriebsmittel inklusive eines Prozessors. Er führt sein Programm aus bis er eine nicht vorhandene Ressource benötigt oder bis er vom Dispatcher den Prozessor entzogen bekommt.

Die Übergänge zwischen den vereinfachten Zuständen eines Threads sind im folgenden Diagramm dargestellt:

Threads haben ähnlich wie Prozesse eine Reihe von Zuständen. Sie besitzen jedoch mehr Zustände, da ihre Lebensdauer kürzer als die des Prozesses ist und sie sich miteinandere synchronisieren müssen. Threads haben die folgenden fünf Zustände:

  • new: Der Thread wurde mit dem new Operator erzeugt. Er befindet sich im Anfangszustand. Auf seine Daten kann man zugreifen. Er ist noch nicht ablauffähig.
  • ready-to-run: Der Thread ist lauffähig und wartet auf eine Prozessorzuweisung
  • running: Der Thread hat einen Prozessor und führt das Programm aus
  • blocked: Der Thread wartet auf Ressourcen
  • dead: Der Thread kann nicht wieder gestartet werden

Eine Reihe dieser Zustände kann durch Methodenaufrufe vom Entwickler beeinflusst werden:

  • start(): Ein Thread wechselt vom Zustand "new" zu "ready-to-run" 
  • sleep(): Ein laufender Thread wird für eine bestimmte Zeit blockiert
  • join(): Ein Thread blockiert sich selbst bis der Thread dessen join() Methode aufgerufen wurde sich beendet hat
  • yield(): Ein Thread gibt freiwillig den Prozessor auf und erlaubt der Ablaufsteuerung den Prozessor einem anderen Thread zuzuweisen
  • interrupt(): Erlaubt es Threads die wegen eines sleep() oder join() blockiert sind wieder in den Zustand "ready-to-run" zu versetzen

Stefan Schneider Sat, 05/28/2011 - 10:16

Programmieren mit Threads

Programmieren mit Threads

Java erlaubt das Erzeugen und Verwalten von Threads mit Hilfe der Systemklasse Thread. Beim Starten einer Javaanwendung bekommt die Methode main() automatisch einen Thread erzeugt und zugewiesen der sie ausführt. Mit Hilfe der Klasse Thread kann man selbst zusätzliche Threads erzeugen und starten.

Erzeugen und Start mit der Klasse Thread

Man kann einen neuen Thread starten indem man ein Objekt von Thread erzeugt. Hiermit wird parallel im Hintergrund ein Javathread erzeugt. Das Aufrufen der Methode start() startet dann den neuen Thread. Dies geschieht wie im folgenden Beispiel gezeigt:

public static void main(String[] args {
   ...
   Thread t1= new Thread(...);
   t1.start();
   ...
}

Im Laufzeitsystem wird hierdurch zuerst ein neuer Thread erzeugt und dann gestartet:

 

Es verbleibt die Frage welchen Programmcode der neue Thread ausführt.

Der ausgeführte Programmcode steht in einer Methode mit den Namen run() und muss von einer Klasse nach den Vorgaben der Schnittstelle Runnable implementiert werden.

Hierfür muss beim Erzeugen des Thread-objekts eine Referenz auf ein Objekt mitgegeben werden welches diese Schnittstelle implementiert. Geschieht dies nicht wird die Methode run() des Tread-objekts aufgerufen. Hierdurch ergeben sich zwei Möglichkeiten einen eigenen Thread mit einem bestimmten Programm zu starten:

Starten eines Threads durch Erweitern der Klasse Thread

Die erste Möglichkeit besteht im Erweitern der Klasse Thread und im überschreiben der Methode run().
Die Klasse Thread implementiert schon selbst die Schnittstelle Runnable. Ruft man die Methode start() ohne einen Parameter auf, so wird bei einer angeleiteten Klasse die überschriebene Methode run() in einem eigenen neuen Thread aufgerufen.
Im unten aufgeführten Beispiel wurde die Klasse myThread aus der Klasse Thread abgeleitet:

"Klassenhierachie Java Thread"

Die Klasse myThread verwaltet die Threads in ihrer main() Methode. Das Erzeugen und Starten der Threads der Klasse myThread könnte auch aus jeder anderen beliebigen Klasse erfolgen.

package s2.thread;

/**
*
* @author s@scalingbits.com
*/
public class MyThread extends Thread {
   @Override
   public void run() {
      System.out.println("Hurra ich bin myThread in einem Thread mit der Id: "
      + Thread.currentThread().getId());
   }

   public static void main(String[] args) {
      System.out.println("Start MyThread.main() Methode im Thread mit der Id: "
         + Thread.currentThread().getId());
      MyThread t1 = new MyThread();
      t1.start();
      System.out.println("Ende MyThread.main() Methode im Thread mit der Id: "
         + Thread.currentThread().getId());
      MyThread t2 = new MyThread();
      // t2 ist zwar ein Threadobjekt und repräsentiert einen Thread
      // da das Objekt nicht mit start() aufgerufen läuft es im gleichen
      // Thread wie die main() Routine!
      t2.run();
   }
} // Ende der Klasse

Das Programm erzeugt die folgende Konsolenausgabe:

Start MyThread.main() Methode im Thread mit der Id: 1
Ende MyThread.main() Methode im Thread mit der Id: 1
Hurra ich bin myThread in einem Thread mit der Id: 1
Hurra ich bin myThread in einem Thread mit der Id: 9

Im Beispielprogramm wird für die Referenz t1 ein neues Threadobjekt erzeugt. Anschliesend wird es durch seine start() Methode in einem eigenen Thread gestartet. Die run() Methode wird nach dem Starten im eigenen Thread ausgeführt.

Das Objekt mit der Referenz t2 ist zwar auch ein Thread. Das es aber direkt mir der run() Methode aufgerufen wird, läuft es im gleichen Thread wie die main() Methode.

Im UML Sequenzdiagramm ergibt sich der folgende Ablauf:

Starten eines Threads durch Implementieren der Schnittstelle Runnable

Das Erweitern einer Klasse aus der Klasse Thread ist nicht immer möglich. Man kann einen Thread auch starten indem man ein Thread-objekt erzeugt und ihm die Referenz auf eine Instanz der Schnittstelle Runnable mitgibt. Die Programmierabfolge ist dann:

  • Erzeugen eine Threadobjekts mit Referenz auf eine Instanz von Runnable.
  • Aufrufen der start() Methode des Threadobjekts
    • die Methode run() der Runnable-objekts wird automatisch aufgerufen

Die Klasse myRunnable:

package s2.thread;
/**
 *
 * @author s@scalingbits.com
 */
public class MyRunnable implements Runnable {
@Override
public void run() {
        System.out.println("Hurra ich bin myRunnable in einem Thread mit der Id: "
                + Thread.currentThread().getId());
    }
}

 

Die Klasse ThreadStarter die als Hauptprogramm dient:  

package s2.thread;
/**
*
* @author s@scalingbits.com
*/
public class ThreadStarter{
   public static void main(String[] args) {
      System.out.println("Start ThreadStarter.main() Methode im Thread mit der Id: "
         + Thread.currentThread().getId());
      MyRunnable r1 = new MyRunnable();
      MyRunnable r2 = new MyRunnable();
      Thread t1 = new Thread(r1);
      Thread t2 = new Thread(r2);
      t1.start();
      System.out.println("Ende ThreadStarter.main() Methode im Thread mit der Id: "
         + Thread.currentThread().getId());
      // r2 ist zwar ein Runnableobjekt , da das Objekt aber nicht von einem
      // Threadobjekt indirekt aufgerufen wirdläuft es im gleichen
      // Thread wie die main() Routine!
      r2.run();
   }
}

Das Programm erzeugt die gleichen Ausgaben wie das vorherige Programm:

Start ThreadStarter.main() Methode im Thread mit der Id: 1
Ende ThreadStarter.main() Methode im Thread mit der Id: 1
Hurra ich bin myRunnable in einem Thread mit der Id: 1
Hurra ich bin myRunnable in einem Thread mit der Id: 9

Das Aufrufen von r2.run() startet keinen eigenen Thread. Der Vorteil der Benutzung der Schnittstelle Runnable liegt darin, dass man die Methode run() in jeder beliebigen Klasse implementieren kann.

Die wichtigsten Methoden der Klasse Thread

  • Konstruktor: Thread(Runnable target) : Erzeugt einen neuen Thread und übergibt ein Objekt dessen run() Methode beim Starten des Threads aufgerufen (anstatt die eigene run() Methode aufzurufen.
  • static Thread currentThread(); liefert den aktuellen Thread der ein Codestück gerade ausführt
  • long getId(): Liefert die interne Nummer des Threads
  • join(): Hält den aktuellen Thread an bis der referenzierte Thread beendet ist.
  • static void sleep(long millis): Lässt den aktuellen Thread eine Anzahl von Millisekunden schlafen.
  • start(): Lässt die VM den referenzierten Thread starten. Dieser ruft dann die run() Methode auf.

 

Stefan Schneider Sat, 05/28/2011 - 10:19

Synchronisation

Synchronisation

Da Threads auf die gleichen Objekte auf im Heap zugreifen, können sie so sehr effizient Daten austauschen. Es besteht jedoch das Risiko der Datenkorruption, da man oft mehrere Daten gleichzeitig verändern muss um sie von einem konsistenten Zustand in den nächsten konsistenten Zustand zu überführen. 

Die Sitzplatzreservierung in einem Flugzeug ist hierfür ein typisches Beispiel:

Mehrere Reisebüros prüfen ein Flugzeug auf die Verfügbarkeit von 10 Plätzen für eine Reisegruppe. Ergibt das Lesen der Belegungsvariable 20 freie Plätze, so fährt Reisebüro 1 fort und liest weitere Daten um die Buchung vorzubereiten. Verzögert sich die endgültige Buchung so kann es vorkommen, dass ein zweites Reisebüro die Verfügbarkeit von 15 Plätzen abfragt und die 15 Plätze bucht. Das zweite Reisebüro erhöht den Belegungszähler also um 15 Plätze.

Kommt das erste Reisebüro nun endlich mit seiner Buchung vorran und erhöht die ursprünglich ausgelesene Variable um 10 ergibt sich ein inkonsistenter Zustand.

Man muss also in Systemen mit parallelen Zugriff auf Daten die Möglichkeit schaffen nur einen Thread über eine gewisse Zeit auf einem Datensatz (Objekt) arbeiten zu lassen um wieder einen konsistenten Zustand herzuführen.

Darf nur ein Thread gleichzeitig auf einer Variablen arbeiten, so nennt man diese eine kritische Variable. Die Zeit die ein Thread mit der Bearbeitung einer solchen Varriablen verbringt nennt man den kritischen Abschnitt oder auch den kritischen Pfad.

Das oben geschilderte Problem beim gleichzeitigen Zugriff nennt man auch "Reader/Writer" Problem, da das Lesen und Schreiben auf dem Datum atomar erfolgen muss. Da in in nebenläufigen Systemen diese Datenkorruption ausschlieslich von der Geschwindigkeit und dem zufälligen paralleln Zugriff abhängt nennt man ein solches Problem auch eine "Race Condition". Die Datenkorruption tritt zufällig und abhängig von der Ausführungsgeschwindigkeit auf.

Beispiel: Nichtatomares Inkrement in Java

Der ++ Operator ist Java ist nicht atomar. Dies bedeutet, dass zwei Threads einen bestimmten Wert auslesen können und der erste Thread schreibt den um 1 erhöhten Wert zurück während eventuell der zweite Thread noch etwas Zeit benötigt. Schreibt der zweite Thread dann den gleichen inkrementierten Wert zurück wurde die Zahl nur einmal inkrementiert. Es liegt eine Datenkorruption vor.

Im Programm ParaIncrement wird eine gemeinsame Variable zaehler von zwei Threads gleichzeitig inkrementiert. Der Wert der Variablen sollte immer doppelt so groß wie die Anzahl der Durchläufe (Konstante MAX) eines einzelnen Threads sein.Die Korruption im Programm ParaIncrement findet relativ selten selten statt. Man muss die Konstante K für die Anzahl der Durchläufe eventuell abhängig vom Rechner und der Java virtuellen Maschine anpassen:

package s2.thread;

/**
*
* @author s@scalingbits.com
*/

public class ParaIncrement extends Thread {
   public static int zaehler=0;
   public static final int MAX= Integer.MAX_VALUE/10;

   public static void increment() {
      zaehler++;
   }

   /**
   * Starten des Threads
   */
   public void run() {
      for (int i=0; i < MAX; i++) {
         increment();
      }
   }

   public static void main(String[] args) {
      ParaIncrement thread1 = new ParaIncrement();
      ParaIncrement thread2 = new ParaIncrement();
      long time = System.nanoTime();
      thread1.start();
      thread2.start();
      try {
         thread1.join();
         thread2.join();
      } catch (InterruptedException e) {
         }
      time = (System.nanoTime() -time)/1000000L; // time in milliseconds
      if ((2* ParaIncrement.MAX) == ParaIncrement.zaehler)
         System.out.println("Korrekte Ausführung: " +
         + ParaIncrement.zaehler + " (" + time + "ms)");
      else
         System.out.println("Fehler! Soll: " + (2* ParaIncrement.MAX) +
            "; Ist: " +ParaIncrement.zaehler + " (" + time + "ms)");
   }
}

Quellen bei github.

Wechselseitiger Ausschluss

Zur Vermeidung der oben genannten Korruption ist es wichtig sicher zustellen, dass nur ein Thread gleichzeitig Zugriff auf diese Daten hat.

Duke thinking Das oben gezeigte Programm zeigt die Millisekunden an die es benötigt. Wieviel sind es?
Man kann das Schlüsselwort synchronized als Modifizierer in die Methode increment() einpflegen. Läuft das Programm jetzt langsamer oder schneller? Läuft es korrekt oder inkorrekt? 
Definition
Kritischer Abschnitt/ Kritischer Pfad
Ein kritischer Abschnitt ist eine Folge von Befehlen, die ein Thread nacheinander vollständig abarbeiten muss, auch wenn er vorübergehend die CPU and einen anderen Thread abgibt. Kein anderer Thread darf einen kritischen Abschnitt betreten, der auf die gleichen Variablen zugreift, solange der erstgenannte Thread mit der Abarbeitung der Befehlsfolge noch nicht fertig ist. (siehe: Goll, Seite 744)

Einfachste Lösung: Verwendung von Typen die den kritischen Pfad selbst schützen

Das Java Concurrency Paket bietet reichhaltige Möglichkeiten und Klassen. Das oben gezeigte Beispiel kann mit Hilfe der Klasse AtomicInteger sicher implementiert werden. Die Klasse AtomicInteger erlaubt immer nur einem Thread Zugriff auf das Datum. Die entsprechende Implementierung ist:

package s2.thread;
import java.util.concurrent.atomic.AtomicInteger;
/**
 *
 * @author s@scalingbits.com
 */
 public  class ParaIncrementAtomicInt extends Thread {
    public static AtomicInteger zaehler;
    public static final int  MAX= Integer.MAX_VALUE/100;
    public static void increment() {
        zaehler.getAndIncrement();
    }
    /**
     * Starten des Threads
     */
    public void run() {
        for (int i=0; i < MAX; i++) {
            increment();
        }
    }
    public static void main(String[] args) {
        zaehler = new AtomicInteger(0);
        ParaIncrementAtomicInt thread1 = new ParaIncrementAtomicInt();
        ParaIncrementAtomicInt thread2 = new ParaIncrementAtomicInt();
        long time = System.nanoTime();
        thread1.start();
        thread2.start();
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
        }
        time = (System.nanoTime() -time)/1000000L; // time in milliseconds
        if ((2* ParaIncrementAtomicInt.MAX) == zaehler.get())
            System.out.println("Korrekte Ausführung: " +
                    + ParaIncrementAtomicInt.zaehler.get() + " (" + time + "ms)");
        else
            System.out.println("Fehler! Soll: " + (2* ParaIncrementAtomicInt.MAX) +
                    "; Ist: " +ParaIncrementAtomicInt.zaehler.get() + " (" + time + "ms)");
    }
}

Quellen bei github.

Vergleichen Sie die Laufzeiten beider Anwendungen! Die Anwendung bei der immer nur ein Thread auf das Objekt zugreifen kann ist erheblich langsamer, jedoch korrekt.

Sperren durch Monitore mit dem Schlüsselwort synchronized

Um die oben gezeigten Möglichkeiten von Korruptionen zu vermeiden verfügen Javaobjekte über Sperren die Monitore genannt werden. Jedes Javaobjekt besitzt einen Monitor der gesetzt ist oder nicht gesetzt ist.

  • Ein Monitor wird gesetzt wenn eine Instanzmethode des Objekts aufgerufen wird, die mit dem Schlüsselwort synchronized versehen ist.
  • Kein anderer Thread kann eine synchronisierte Instanzmethode des gleichen Objekts aufrufen, solange der Thread der den Monitor erworben hat noch in der synchronisierten Methode arbeitet.
    • Alle anderen Threads werden beim Aufruf einer synchronisierten Instanzmethode des gleichen Objekts blockiert und müssen warten bis der erste Thread die synchronisierte Methode verlassen hat.
  • Nach dem der erste Thread die synchronisierten Methode verlassen hat, wird der Monitor wieder freigegeben.
  • Der nächste Thread, der eventuell schon wartet kann den Monitor erwerben.

Es ist wichtig zu verstehen, dass in Java immer die individuellen Objekte mit einem Monitor geschützt sind. Sind zum Beispiel die Sitzplätze eines Flugzeuges durch Java-Objekte implementiert, so kann man mit der gleichen synchronisierten Methode auf untrschiedlichen Objekte parallel arbeiten.

Am Beispiel der Klasse Sitzplatz kann man sehen wie man den Monitor für einen bestimmten Sitzplatz setzen kann:

package s2.thread;

/**
*
* @author s@scalingbits.com
*/

public class Sitzplatz {
   private boolean frei = true;
   private String reisender;

   boolean istFrei() {return frei;}

   /**
   * Buche einen Sitzplatz für einen Kunden falls er frei ist
   * @param kunde Name des Reisenden
   * @return erfolg der Buchung
   */
   synchronized boolean belegeWennFrei(String kunde) {
      boolean erfolg = frei; // Kein Erfolg wenn nicht frei
      if (frei) {
         reisender = kunde;
         frei = false;
      }
   return erfolg;
   }
}

Quellen bei github

Die Methode belegeWennFrei() kann jetzt nur noch von einem Thread auf einem Objekt gleichzeitig aufgerufen werden. Die Methode istFrei() ist nicht synchronisiert und in einer parallelen Umgebung nicht sehr relevant. Man kann sich nicht darauf verlassen, dass bei der nächsten Operation der freie Zustand noch gilt.

Wichtig
Monitore und Schutz von Daten

Monitore schützen nur die synchronsierten Methoden eines Objekts. Dies bedeutet

  • Nicht synchronisierte Methoden der Klasse können weiterhin parallel aufgerufen werden
  • Die Attribute einer Objektinstanz sind nicht selbst geschützt. Man schützt Sie indirekt mit dem Schlüsselwort private. Hierdurch ist der Zugriff auf die Attribute auf die eigenen Methoden beschränkt. Die Methoden der Klasse können wiederum mit synchronized geschützt werden.

 

Statische synchronisierte Methoden

Das Schlüsselwort synchronized kann auch verwendet werden um einen Monitor für die Klasse zu setzen. Dieser Monitor hat aber keinen Einfluss auf den Zugriff auf die Objekte einer Klasse! Er schützt nur statische Methoden der Klasse.

Beispiel: Man kann das Korruptionsproblem in der Klasse ParaIncrement beheben in dem man die statische  Methode increment() synchronsiert:

public static synchronized void increment() {
   zaehler++;
}

Die Variable zaehler ist in diesem Beispiel eine statische Variable. Sie gehört nicht zu einem der beiden erzeugten Objekten.

Synchronisierte Blöcke

Man muss nicht notwendigerweise eine gesamt Methode synchroniseren. Java bietet auch die Möglichkeit einzelne Blöcke zu synchronisieren. Das Synchronsieren eines Blocks erfolgt ebenfalls mit Hilfe des Schlüsselworts synchronized. Hier muss man jedoch das Objekt angeben für welches man einen Monitor erwerben will.

Man kann die Sitzplatzreservierung auch mit Hilfe eines synchronisierten Blocks implementieren:

boolean belegeWennFrei(String kunde) {
        boolean erfolg;
        synchronized (this) {
            erfolg =  frei; // Kein Erfolg wenn nicht frei
            if (frei) {
                reisender = kunde;
                frei = false;
            }
        }
        return erfolg;
    }

In dieser Implementierung kann die Methode belegeWennFrei() parallel aufgerufen werden. Beim Schlüsselwort synchronized muss jedoch für das aktuelle Objekt der Monitor erworben werden bevor der Thread fortfahren kann. 

Referenzen

  • Goll,Heinisch, Müller-Hoffman: Java als erste Programiersprache, Teubner Verlag
  • IBM Developerworks: Going Atomic: Gute Erklärung zu synchronisierten Zählern (AtomicInteger)

 

Stefan Schneider Sat, 05/28/2011 - 17:38

Beispiel: Kritischer Pfad

Beispiel: Kritischer Pfad

Das hier benutzte Beispielprogramm visualiert 15 Threads die sich auf einem synchronisierten Objekt serialisieren.

Threads in grüner Farbe befinden sich nicht im kritischen Pfad. Threads in roter Farbe befinden sich im kritischen Pfad.

Der kritische Pfad in in der Klasse EinMonitor in der Methode buchen() implementiert.

Die einzige Instanz von EinMonitor verfügt über zwei Variablen a und b die Konten darstellen sollen.

Die Methode buchen() "verschiebt" mehrfach einen Betrag zwischen den beiden Konten. Die Summe der beiden Variablen sollte am Ende der Methode stets gleich sein.

Die Methode buchen() enthält etwas Overhead zur Visualierung des Konzepts:

  • am Anfang und am Ende  muss das GUI Subsystemen über den Eintritt und das Verlassen des kritischen Pfads informiert werden
  • zwischen allen Buchungen werden kleine Schlafpausen eingelegt um die Zeit im kritischen Pfad künstlich zu verlängern

Die Methode buchen() enthält eine Konsistenzprüfung am Ende der Methode die bei einem Fehler in der Buchführung eine Konsolenmeldung ausdruckt. Sie kommt in der auf dieser Seite gezeigten Variante nicht zum Zuge!

Die Klasse MainTest dient zum Starten des Programms. Sie erzeugt und startet die 15 Threads. Jeder Thread führt nur eine gewisse Anzahl von Buchungen durch und beendet sich dann.

Aufgaben

  • Übersetzen Sie die Klassen und starten Sie das Hauptprogramm
  • Der Schieberegler erlaubt das Einstellen der Schlafpausen im kritischen Pfad. Was geschieht wenn der kritische Pfad verkürzt wird?
  • Entfernen die das Schlüsselwort synchronized in der Methode buchen(). Was geschieht?
  • Was würde geschehen geschehen wenn die künstlichen sleep Aufrufe entfernt werden?
    • Hinweis: Sie müssen dann auch die Anzahl der Durchläufe pro Thread stark erhöhen. Da die Zeit im kritischen Pfad sehr kurz wird.
  • Was geschieht wenn man den yield() Aufruf für in der run() Methode von MainTest entfernt

Kommentar

Da die aktuelle Ausführungsgeschwindigkeit, 4-5 Zehnerpotenzen, jenseits der menschlichen Wahrnehmungsfähigkeit liegt ist, es sehr schwer die echten Abläufe im Zeitlupentempo zu visualisieren. Ein künstlicher sleep() Aufruf blockiert den Prozess und gibt den Prozessor an das Betriebssystem zurück. Der Scheduler des Betriebssystems trifft bei dieser künstlichen Verlangsamung eventuell andere Entscheidungen in Bezug auf den Thread den er ausführt. Das gleiche Problem besteht beim Debuggen von Javaprogrammen. Durch das Bremsen bestimmter Threads können existierende Fehler nicht mehr reproduzierbar sein oder bisher nicht aufgetretene Fehler in der Synchronsiation können sichtbar werden.

Starten des Programms

Die benötigten Klassen sind in Threading.jar zusammen gefaßt.

Man kann diese jar Datei mit

java -jar Threading.jar

von der Kommandozeile nach dem Runterladen starten. Vielleicht reicht auch ein Doppelklick auf die Datei im Download-Ordner...

Nach dem Übersetzen der Dateien oder nach dem Starten der jar-Datei erscheint ein GUI wie es im folgenden Bild zu sehen ist:

Klasse MainTest

Hauptprogramm der Anwendung.

package s2.thread;

/**
*
* @author s@scalingbits.com
*/
public class MainTest extends Thread {

public static final int INCRITICALPATH = 0;
public static final int NOTINCRITICALPATH = 1;
public static final int ENDED = 2;
public static int anzahlThreads = 15;
public static MainTest[] mt;
public int threadStatus = NOTINCRITICALPATH;
private static EinMonitor myMonitor;
public static int sleepPeriod = 500;
public int meineID;
public static ThreadingPanel tp;
public static ThreadFenster tg;
public boolean stop = false;
public boolean synchron = true;

   public MainTest(int id) {
      meineID = id;
   }
   @Override
   public void run() {
      long anfangszeit = System.nanoTime();
      System.out.println("Thread [" + meineID + "] gestartet");
      //GUIupdate(NOTINCRITICALPATH);
      for (long i = 0; i < 200; i++) {
         Thread t = Thread.currentThread();
         // Erlaube anderen Threads die CPU zu holen
         t.yield();
      if (tg.synchron)
         myMonitor.buchen(10);
      else
          myMonitor.parallelbuchen(10);
      }
      threadStatus = ENDED;
      System.out.println("Thread [" + meineID + "] beendet...");
   }
 
   public static void main(String[] args) {
      // Anlegen des Monitorobjekts
      myMonitor = new EinMonitor(1000000L);
      mt = new MainTest[anzahlThreads];
      tg = new ThreadFenster();
      tp = tg.tp;
      // Erzeuge die Threads
      for (int i = 0; i < anzahlThreads; i++) {
         mt[i] = new MainTest(i);
      }
      // Starte die Threads
      for (int i = 0; i < anzahlThreads; i++) {
         mt[i].start();
      }
   }
}

Klasse EinMonitor

package s2.thread;

/**
*
* @author s@scalingbits.com
*/
public class EinMonitor {
   long invariante;
   long a;
   long b;

   public EinMonitor(long para) {
      invariante = para;
      a = para;
      b = 0L;
   }

   synchronized public void buchen(long wert) {
      GUIupdate(MainTest.INCRITICALPATH);
      sleepABit(MainTest.sleepPeriod/5);
      this.a = this.a - wert;
      sleepABit(MainTest.sleepPeriod/5);
      this.b = this.b + wert;
      sleepABit(MainTest.sleepPeriod/5);
      this.a = this.a + wert;
      sleepABit(MainTest.sleepPeriod/5);
      this.b = this.b - wert;
      sleepABit(MainTest.sleepPeriod/5);
      GUIupdate(MainTest.NOTINCRITICALPATH);

      if ((a+b) != invariante)
         System.out.println("Inkonsistenter Zustand");
      }

   public void parallelbuchen(long wert) {
      GUIupdate(MainTest.INCRITICALPATH);
      sleepABit(MainTest.sleepPeriod/5);
      this.a = this.a - wert;
      sleepABit(MainTest.sleepPeriod/5);
      this.b = this.b + wert;
      sleepABit(MainTest.sleepPeriod/5);
      this.a = this.a + wert;
      sleepABit(MainTest.sleepPeriod/5);
      this.b = this.b - wert;
      sleepABit(MainTest.sleepPeriod/5);
      GUIupdate(MainTest.NOTINCRITICALPATH);

      if ((a+b) != invariante)
      System.out.println("Inkonsistenter Zustand");
   }

   private void sleepABit(int sleep) {
      try {
          Thread.sleep(sleep);
      } catch (InterruptedException e) {}
   }

   private void GUIupdate(int status) {
      MainTest t = (MainTest) Thread.currentThread();
      t.threadStatus = status;
      t.tp.repaint();
   }
}

Klasse ThreadFenster

package s2.thread;

import java.awt.BorderLayout;
import java.awt.Container;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import javax.swing.BoxLayout;
import javax.swing.ButtonGroup;
import javax.swing.JButton;
import javax.swing.JFrame;
import javax.swing.JLabel;
import javax.swing.JMenu;
import javax.swing.JMenuBar;
import javax.swing.JMenuItem;
import javax.swing.JPanel;
import javax.swing.JRadioButton;
import javax.swing.JSlider;
import javax.swing.JTextField;
import javax.swing.event.ChangeEvent;
import javax.swing.event.ChangeListener;
/**
*
* @author s@scalingbits.com
*
*/
public class ThreadFenster {
   final private JFrame hf;
   private JButton okButton;
   final private JButton exitButton;
   JTextField threadDisplay;
   private final static int SLEEPMIN = 1;
   private final static int SLEEPMAX = 2000;
   private final static int SLEEPINIT = 500;
   private final int threadCurrent = 10;
   public ThreadingPanel tp;
   public boolean synchron = true;
   JRadioButton syncButton;
   JRadioButton nosyncButton;

   public class exitActionListener implements ActionListener {
      @Override
      public void actionPerformed(ActionEvent e) {
         System.exit(0);
      }
   }
   /**
   * Aufbau des Fensters zur Ausnahmebehandlung
   *
   */
   public ThreadFenster() {
      JPanel buttonPanel;
      // Erzeugen einer neuen Instanz eines Swingfensters
      hf = new JFrame("Thread Monitor");
     //Nicht Beenden bei Schliesen des Fenster
     hf.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
     // Anlegen der Buttons
     exitButton = new JButton("Beenden");
     JLabel threadsLabel = new JLabel("sleep(ms):");
     JSlider threadSlider = new JSlider
        (JSlider.HORIZONTAL, SLEEPMIN, SLEEPMAX, SLEEPINIT);
     threadDisplay = new JTextField();
     threadDisplay.setText(Integer.toString(threadCurrent));
     threadDisplay.setColumns(4);
     threadDisplay.setEditable(false);
     threadSlider.addChangeListener(new ChangeListener() {
        @Override
        public void stateChanged(ChangeEvent e) {
           JSlider source = (JSlider) e.getSource();
           if (!source.getValueIsAdjusting()) {
               MainTest.sleepPeriod = source.getValue();
               threadDisplay.setText(Integer.toString(MainTest.sleepPeriod));
           }
        }
      });
      exitButton.addActionListener(new exitActionListener());
      syncButton = new JRadioButton("Synchronisiert");
      syncButton.addActionListener(new ActionListener() {
         @Override
         public void actionPerformed(ActionEvent e) {
            synchron= true;
            System.out.println("Synchronisiert");
         }
      } );
      syncButton.setSelected(true);
      nosyncButton = new JRadioButton(" Nicht Sync.");
      nosyncButton.addActionListener(new ActionListener() {
         @Override
         public void actionPerformed(ActionEvent e) {
             synchron= false;
             System.out.println("Nicht synchronisiert");
        }
      } );
      ButtonGroup group = new ButtonGroup();
      group.add(syncButton);
      group.add(nosyncButton);
      JPanel syncPanel = new JPanel();
      BoxLayout bl = new BoxLayout(syncPanel, BoxLayout.Y_AXIS);
      syncPanel.setLayout(bl);
      syncPanel.add(syncButton);
      syncPanel.add(nosyncButton);
     //Aufbau des Panels
     //buttonPanel = new JPanel(new GridLayout(1, 0));
     buttonPanel = new JPanel();
     buttonPanel.add(threadsLabel);
     buttonPanel.add(threadSlider);
     buttonPanel.add(threadDisplay);
     //buttonPanel.add(okButton);
     buttonPanel.add(syncPanel);
     buttonPanel.add(exitButton);
     tp = new ThreadingPanel();
     // Aubau des ContentPanes
     Container myPane = hf.getContentPane();
     myPane.add(buttonPanel, BorderLayout.SOUTH);
     myPane.add(tp, BorderLayout.CENTER);  
     JMenuBar jmb = new JMenuBar();
     JMenu jm = new JMenu("Ablage");
     jmb.add(jm);
     JMenuItem jmi = new JMenuItem("Beenden");
     jmi.addActionListener(new exitActionListener());
     jmi.setEnabled(true);
     jm.add(jmi);
     hf.setJMenuBar(jmb);
     //Das JFrame sichtbar machen
     hf.pack();
     hf.setVisible(true);
     hf.setAlwaysOnTop(true);
   }
}

Klasse ThreadingPanel

package s2.thread;

import java.awt.Color;
import java.awt.Dimension;
import java.awt.Graphics;
import javax.swing.JPanel;
/**
*
* @author s@scalingbits.com
*/
public class ThreadingPanel extends JPanel {

   private final int ziffernBreite = 10; // Breite einer Ziffer in Pixel
   private final int ziffernHoehe = 20; // Hoehe einer Ziffer in Pixel

   public ThreadingPanel() {
      setPreferredSize(new Dimension(200, 100));
      setDoubleBuffered(true);
   }
   /**
   * Methode die das Panel überlädt mit der Implementierung
   * der Treads
   * @param g
   */
   @Override
   public void paintComponent(Graphics g) {
      super.paintComponent(g);
      int maxWidth = getWidth();
      int maxHeight = getHeight();
      g.setColor(Color.black);
      g.drawString("Anzahl threads: " + MainTest.anzahlThreads, 10, 20);
      for (int i = 0; i < MainTest.anzahlThreads; i++) {
         paintThread(g, i, 20 + 25 * i, 30);
      }
   }
   /**
   * Malen eines Threads und seines Zustands
   * @param g Graphicshandle
   * @param id Identifier
   * @param x X Koordinate des Thread
   * @param y Y Koordinate des Thread
   */
   public void paintThread(Graphics g, int id, int x, int y) {
      int xOffset = 1; // offset Box zu Text
      int yOffset = 7; // offset Box zu Text
      //String wertThread = k.toString(); // Wert als Text

     if (MainTest.mt[id] != null) {
        switch(MainTest.mt[id].threadStatus) {
           case MainTest.ENDED:             g.setColor(Color.LIGHT_GRAY); break;
           case MainTest.NOTINCRITICALPATH: g.setColor(Color.GREEN); break;
           case MainTest.INCRITICALPATH:    g.setColor(Color.RED); break;
           default: assert(true):"Hier laeuft etwas falsch";
       }
   }
   int breite = 2 * ziffernBreite;
   int xNextNodeOffset = 20;
   int yNextNodeOffset = ziffernHoehe * 6 / 5; // Vertikaler Offset zur naechsten Kn.ebene
   //g.setColor(Color.); // Farbe des Rechtecks im Hintergrund
   g.fillRoundRect(x - xOffset, y - yOffset, breite, ziffernHoehe, 3, 3);
   g.setColor(Color.black); // Schriftfarbe
   g.drawString(Integer.toString(id), x + xOffset, y + yOffset);
   }
}
Stefan Schneider Wed, 06/01/2011 - 08:58

Java Concurrency Paket

Java Concurrency Paket

Mit JDK 5.0 wurde das Java Concurrency Paket, welches maßgeblich von Doug Lea mitentwickelt wurde (siehe Wikipedia), Teil der Laufzeitumgebung (JRE). Das Paket java.util.concurrent fügt komplexe Klassen zur Laufzeitumgebung die das Steuern der Parallelität auf einem höheren Niveau als das grundlegenden Konstrukte von Java erlauben. 

In diesem Abschnitt werden einige, wenige Klassen vorgestellt. Schauen Sie sich dieses Paket an bevor Sie etwas Nebenläufiges implementieren. Hier gibt es sehr viele, sehr mächtige Lösungen, die die Vorlesung bei weitem übersteigen

Referenzen

Stefan Schneider Mon, 01/28/2019 - 09:10

Atomare Basistypen

Atomare Basistypen

 In java.util.concurrent.atomic werden Basistypen zur Verfügung gestellt die atomar sind.

Atomar bedeutet, dass das Lesen des Datums und das anschließende Modifizieren nicht von anderen Operationen unterbrochen werden kann. Das Lesen und Modifizieren erfolgt in einem kritischen Pfad.

Ein Beispiel: Die Klasse AtomicInteger implementiert Number und hat die zum Beispiel Methoden wie:

Anbei ein Beispiel zum nebenläufigen Addieren:

package s2.thread;
import java.util.concurrent.atomic.AtomicInteger;
/**
*
* @author s@scalingbits.com
*/

public class ParaIncrementAtomicInt extends Thread {
public static AtomicInteger zaehler;
public static final int MAX= Integer.MAX_VALUE/100;
public static void increment() {
zaehler.getAndIncrement();
}
/**
* Starten des Threads
*/
public void run() {
for (int i=0; i < MAX; i++) {
increment();
}
}
public static void main(String[] args) {
zaehler = new AtomicInteger(0);
ParaIncrementAtomicInt thread1 = new ParaIncrementAtomicInt();
ParaIncrementAtomicInt thread2 = new ParaIncrementAtomicInt();
long time = System.nanoTime();
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
}
time = (System.nanoTime() -time)/1000000L; // time in milliseconds
if ((2* ParaIncrementAtomicInt.MAX) == zaehler.get())
System.out.println("Korrekte Ausführung: " +
+ ParaIncrementAtomicInt.zaehler.get() + " (" + time + "ms)");
else
System.out.println("Fehler! Soll: " + (2* ParaIncrementAtomicInt.MAX) +
"; Ist: " +ParaIncrementAtomicInt.zaehler.get() + " (" + time + "ms)");
}
}

 

Stefan Schneider Tue, 03/05/2019 - 18:55

Das "Fork-Join-Framework"

Das "Fork-Join-Framework"

Es gibt zwei Möglichkeiten das Threading in Java zu nutzen:

  • Erledigen unterschiedlicher Aufgaben parallel
    • Bsp: Swing: Oberfläche neu zeichen, Eingaben abarbeiten, eigentliche Programmaufgaben ausführen
  • Paralleles Abarbeiten der gleichen Aufgabe
    • Bsp: Differentialgleichungen berechnen, Such & Sortieralgorithmen, Bearbeitung von Massendaten (Batch).

Das Fork & Join Framework erleichert das parallele Programmieren durch eine Optimierung der Parallelität und komfortable Klassen.

Im Rahmen der Vorlesung werden wir die Möglichkeiten einer Parallelisierung ohne und mit Rückgaben diskutieren. Die Unterschiede zum Einsatz der einfachen Threadingkonstrukte und der des Frameworks sind im Diagramm unten dargestellt:

3 Möglichkeiten der Parallelisierung

Steuerung des Grads der Parallelisieren

Java erlaubt es recht einfach viele Ausführungseinheiten (Threads) zu erzeugen. Das Betriebssystem wird diese Threads auf Hardwarethreads zuordnen. Das macht das Betriebssystem auch fast immer sehr gut. Erzeugt man aber zufiele ausführbare Threads kann der Rechner überlastet werden und ist kaum noch ansprechbar. Das bedeutet, dass Prozesse die mit dem Benutzer interagieren sollen nicht mehr unbedingt die CPU Zeit bekommen die sie benötigen.

Hat man viel, zu viele Softwarethreads und Prozesse wird das Betriebsystem auch noch ineffizient. Es muss die ganzen Zustandänderungen managen. Hier gibt es kritische Pfade und im Überlastungsfall zuwenig CPU-Zeit.

Das Fork & Join Framework erleichtert das professionelle Skalieren in dem es eine Threadpool anlegt und alle Tasks an freie Prozessoren im Threadpool vergibt. Hiermit übernimmt es eine Aufgaben des Betriebsystem und vermeidet eine Überlastung. Gibt man keinen Wert für den Pool an, wird die Anzahl aller Prozessoren im Rechner als Defaultwert verwendet.

Dies ist nur ein kleiner Ausschnitt aus dem Fork & Join Framwork. Wir betrachten hierzu die folgenden Klassen:

Fork Join Klassen

Die Klasse ForkJoinPool

Die Klasse java.util.concurrent.ForkJoinPool wurde im JDK 7 eingeführt.

In dieser Klasse werden eine Menge von Threads verwaltet und zur Ausführung von Task (Aufgaben) verwendet. Dies macht es dem Betriebsystem leichter die Threads auf die Prozessoren zu verteilen, da nicht beliebig viele Threads von Java erzeugt werden. Die Klasse ForkJoinPool verwaltet die Zuordnung der Task auf die konfigurierten (Software)threads.

Tasks können auf 3 Arten dem Framework übergeben werden:

  Aufrufe von ausserhalb Aufruf innerhalb einer Berechnung
asynchrone Ausführung ForkJoinPool.execute(ForkJoinTask) ForkJoinTask.fork()
warte auf Ergebnis ForkJoinPool.invoke(ForkJoinTask) ForkJoinTask.invoke()
asynchrone Ausführung mit Ergebnis ForkJoinPool.submit(ForkJoinTask)
ForkJoinPool.submit(Callable)
ForkJoinTask.fork(ForkJoinTasks sind Futures)

Die Implementierung wird zu einer RejectedExecutionException Ausnhame führen falls der Threadpool runtergefahren wird oder falls es keine Threads mehr in der VM gibt. 

Interessante Konstruktoren:

Paralleles Abarbeiten ohne Rückgabewerte

Ein Beispiel hierfür ist der Quicksort. Man kann parallel das linke und das rechte Teilintervall sortieren nachdem die Vorarbeit (seriell) erledigt wurde.

Hierzu nutzt man die abstrakte Klasse RecursiveAction die aus der abstrakten Klasse ForkJoinTask spezialisiert wird. Die Klasse ForkJoinPool kann Tasks die die Bedingungen der Klasse ForkJoinTask erfüllen, parallel ausführen. Hierzu gibt es in der Klasse die Methoden:

Es verbleibt das Problem, dass man einen bestimmten Code ausführen möchte. Hierzu dient die abstrakte Klasse RecursiveAction. Sie besitzt eine abstrakte Methode:

  • compute(): führt den Code aus. Diese Methode ist abstrakt!

Wie implementiert man so etwas?

  1. Man implementiert den Algorithmus in einer Unterklasse von RecursiveAction.
    1. Alle Eingabeparameter werden als Attribute im Objekt gespeichert
    2. Man implementiert einen Konstruktor der die Parameter entgegennimmt und abspeichert
    3. Der Algorithmus wird in compute() implementiert
  2. Man bettet seine Klasse als innere Klasse ein
    1. Man hat dann Zugriff auf alle Attribute der äusseren Klasse!
  3. Man legt sich einen Threadpool an
    1. Wahrscheinlich als globales, statisches Objekt
  4. Man startet seinen Algorithmus aus der äusseren Klasse
    1. Anlegen eines Objekts mit den Parametern
    2. Aufruf von invoke() im Treadpool

 Hilfe! Ich muß das am Beispiel sehen...

Paralleles Abarbeiten mit Rückgabewerte

Die Aufgabe parallelisieren und als Aufgabe starten reicht oft nicht. Oft müssen die Ergebnisse entgegen genommen werden und dann rekursiv zu neuen Ergebnissen aggregiert werden.

Hierzu benötigt man einen Mechanismus der

  • die Eingabeparameter für jede Aufgabe (Task) entgegennimmt
  • die Aufgaben (Tasks) ausführt
  • und die Ergebnisse erst zurückgibt wenn die Ergebnisse vorliegen

Die Schnittstelle Callable

Die Schnittstelle java.util.concurrent.Callable muss implementiert werden um die Ausgabeparameter einer Aufgabe (Task) entgegenzunehmen. Hierfür muss man

  • einen generischen Typen wählen der das Ergebnis zur Verfügung stellt
  • einen Konstruktor implementieren der alle Eingabeparameter entgegen nimmt.
  • alle Parameter als Attribute implementiert
  • die Methode V call() die das Ergebnis zurückliefert

Ein Beispiel auf der Programmierübung Ariadnefaden

public class SearchCallable implements Callable<List<Position>> {
        Position von;
        Position nach;
        Ariadne4Parallel a;
         /**
         * 
         * @param a Instanz von Ariadne
         * @param von Start
         * @param nach Ziel
         */
        SearchCallable(Ariadne4Parallel a,Position von, Position nach) {
            this.a =a;
            this.von = von;
            this.nach = nach;
        }
        /**
         * Führe Task in eigenem Thread aus und nutze Instanzvariablen
         * als Parameter um Aufgabe auszuführen.
         * @throws java.lang.Exception
         */
        @Override
        public List<Position> call() throws Exception {
            return a.suche(von, nach);
        }
    }

In diesem Beispiel ist das Ergebnis eine Liste von Positionen die den Web aus dem Labyrinth weisen.

Die Eingabeparameter sind ein Labyrinth, die Startposition und die Position mit dem Ausgang ais dem Labyrinth.

Die Schnittstelle Future

Die Schnittstelle Future erlaubt es das Ergebnis einer Aufgabe zu analysieren. Die Schnittstelle ist generisch und erlaubt es daher nur einen bestimmten Type einer Klasse auszulesen.

Die beiden wichtigsten Methoden der Schnittstelle sind:

  • V get(): wartet falls notwendig auf das Beenden der Aufgabe und liefert das Ergebnis ab
  • V get(long timeout, TimeUnit unit): wartet eine bestimmte Zeit auf auf ein bestimmtes Ergebnis

In der Schnittstellendefinition sind noch weitere Methoden zum Kontrollieren und Beenden von Aufgaben (Tasks) vorhanden.

Abschicken von Aufgaben (Tasks)

Die Klasse  java.util.concurrent.ForkJoinPool verfügt über eine Methode

  • public <T> ForkJoinTask<T> submit(Callable<T> task)

Diese Methode nimmt die Eingaben für die Aufgabe (Task) an solange es eine Spezialierung der Klasse Callable<T> ist. Das Ergebnis ist ein Objekt vom Typ ForkJoinTask. Die Klasse ForkJoinTask implementiert die Schnittstelle Future. Das Ergebnis der Aufgabe (Task) wird hier abgeliefert sobald die Aufgabe abgearbeitet ist.

Wie implementiert man so etwas?

  1. Man implementiert den Algorithmus in einer Unterklasse von RecursiveAction.
    1. Alle Eingabeparameter werden als Attribute in einem Objekt von Callable gespeichert
    2. Man implementiert einen Konstruktor der die Parameter entgegennimmt und abspeichert
    3. Der Algorithmus wird in compute() implementiert
      1. Der Algorithmus führt rekursive Aufrufe durch und nimmt die Ergebnisse vom Typ Future an.
  2. Man bettet seine Klasse die Callable spezialisiert als innere Klasse ein
    1. Man hat dann Zugriff auf alle Attribute der äusseren Klasse!
  3. Man legt sich einen Threadpool an
    1. Wahrscheinlich als globales, statisches Objekt
  4. Man startet seinen Algorithmus aus der äusseren Klasse
    1. Anlegen eines Objekts mit den Parametern
    2. Aufruf von invoke() im Treadpool

 Hilfe! Ich muß das am Beispiel sehen...

Stefan Schneider Wed, 03/06/2019 - 18:39

Aufgaben (Threads)

Aufgaben (Threads)

Programmieraufgabe JoinAndSleep

Ziel der Aufgabe ist es drei Threads zu programmieren die auf das Beenden des anderen Warten und dann eine Zeit schlafen:

  1. Sie drucken jeden neuen Zustand auf der Konsole aus
  2. Als erstes nach ihrem Start warten sie bis ein anderer Thread auf den sie zeigen sich beendet hat. Zeigen sie auf keinen anderen Thread so gehen sie sofort über zum nächstens Schritt.
  3. Die Threads schlafen für eine vorgegebene Zeit in ms
  4. Die Threads beenden sich

Die geforderte Aufgabe soll in einer Klasse implementiert werden

  1. Erweitern Sie die Klasse JoinAndSleep aus der Klasse Thread
  2. Attribute: Die Klasse hat ein Ganzzahlattribut sleep zur Verwaltung der Schlafzeit
    1. Die Klasse hat ein Ganzzahlattribut sleep zur Verwaltung der Schlafzeit
    2. Die Klasse hat eine Referenz auf ein Objekt der Klasse JoinAndSleep
  3. Konstruktor
    1. Der Konstruktor der Klasse erlaubt es die Schlafzeit zu übergeben und eine Referenz auf einen anderen Thread. Dies ist der Thread auf den gewartet werden soll.
  4. run() Methode: Diese Methode implementiert die oben genannte Semantik zum Warten und Schlafen
    1. Falls ein Thread gegeben ist soll auf sein Ende gewartet werden
    2. Anschliesend soll eine bestimmte Zeit geschlafen werden
    3. Fügen Sie zwischen allen Schritten Konsolenausgaben ein um den Fortschritt zu kontrollieren. Geben Sie hier immer auch den aktuellen Thread aus!
  5. main() Methode
    1. Erzeuge Thread 3: Er soll auf keinen Thread warten und dann 4000ms schlafen
    2. Erzeuge Thread 2: Er soll auf Thread 3 warten und dann 3000ms schlafen
    3. Erzeuge Thread 1: Er soll auf Thread 2 warten und dann 2000ms schlafen
    4. Starten Sie Thread 1
    5. Starten Sie Thread 2
    6. Starten Sie Thread 3

Hinweise:

Ich welchen Thread bin ich gerade?

  • Die statische Methode Thread.currentThread() liefert einen Zeiger auf den aktuellen Thread
  • Diesen kann man direkt ausdrucken

Wie warte ich auf einen anderen Thread?

Die Methode Thread.join() erlaubt auf das Beenden eines anderen Threads zu warten. Man muss auf eine InterruptedException vorbereitet sein, da man aufgeweckt werden kann:

Thread aThread;
...
try {
    aThread.join();
   } catch (InterruptedException e) {}

Wie lasse ich einen Thread schlafen?

Die Methode Thread.sleep() ist eine statische Methode. Man muss seinen eigenen Thread nicht kennen um ihn ruhen zulassen! Auch diese Methode kann eine InterruptedException werfen und muss mit einer Ausnahmebehandlung versehen werden:

try {
    Thread.sleep(schlafen);
   } catch (InterruptedException e) {}

Verständnisfragen

  • Wer wartet hier auf wen?
  • Ist dies an den Konsolenaufgaben zu erkennen?
  • Woran erkenne ich bei den Konsolenausgaben, das der Code in einem eigenen Thread läuft?

 

 

 

Stefan Schneider Sun, 05/29/2011 - 14:29

Lösungen (Threads)

Lösungen (Threads)

Programmieraufgabe JoinAndSleep

package s2.thread;

public class JoinAndSleep extends Thread{
/**
* Der Thread auf dessen Beendigung gewartet wird
*/
Thread joinIt;
/**
* Zeit in Millisekunden zum Schlafen
*/
int schlafen;
/**
* Der Konstruktor
* @param sleeptime Die Zeit in msdie der Thread schlafen soll
* @param toJoin der Thread auf den gewartet werden solll bis er beendet ist
*/
public JoinAndSleep(int sleeptime, Thread toJoin) {
joinIt = toJoin;
schlafen = sleeptime;
System.out.println("Thread: " + this + " erzeugt");
}
/**
* Die überschriebene Methode run() sie führt den Code aus
*/
public void run() {
System.out.println("Thread: " +Thread.currentThread() + " gestartet");
try {
if (joinIt!=null) {
joinIt.join();
System.out.println("Thread: " +Thread.currentThread()
+ " join auf " + joinIt + " fertig");
}
} catch (InterruptedException e) {}
System.out.println("Thread: " +Thread.currentThread()
+ " schlaeft jetzt fuer " + schlafen + "ms");
try {
Thread.sleep(schlafen);
} catch (InterruptedException e) {}
System.out.println("Thread: " +Thread.currentThread() + " endet");
}
/**
* Das Hauptprogramm
* @param args keine Parameter benoetigt!
*/
public static void main (String[] args) {
JoinAndSleep s3= new JoinAndSleep(2003, null);
JoinAndSleep s2= new JoinAndSleep(2002, s3);
JoinAndSleep s1= new JoinAndSleep(2001, s2);
s1.start();
s2.start();
s3.start();
}
}

 

Stefan Schneider Sun, 05/29/2011 - 14:31

Lernziele (Threading)

Lernziele (Threading)

Am Ende dieses Blocks können Sie:

  • ... zwischen einem Thread und einem Prozess unterscheiden
  • ... die Zustände eines Threads nennen und wissen wie man Threads von einem Zustand in einen anderen Zustand überführt
  • ... mit dem Schlüsselwort synchronized Objekte in kritischen Abschnitten sperren um Threads zu synchronisieren
  • die wichtigsten Methoden der Javaklasse Thread anwenden. Hierzu gehören
    • start()
    • run()
    • sleep()
    • join()
    • getId()
    • currentThread()
    • Konstruktor

Lernzielkontrolle

Sie sind in der Lage die folgenden Fragen zu beantworten: Fragen zur Nebenläufigkeit ( Multithreading)

Feedback

Zur Umfrage

QR Code für Umfrage

Stefan Schneider Wed, 01/09/2013 - 09:25