Upload
others
View
0
Download
0
Embed Size (px)
Citation preview
Kapitel 7: ParallelprogrammierungVL Programmierparadigmen – SS 2019
Kai-Uwe Sattler | TU Ilmenau | [email protected]
Inhalt
10.05.2019
Motivation Architekturen und
Programmiermodelle Parallelisierungsarten Taskparallelität mit Erlang Datenparallelität und MapReduce MapReduce mit Erlang Parallelprogrammierung mit Java
VL Programmierparadigmen | Kapitel 7 | K. Sattler1 of 107
Lernziele
10.05.2019
Programmierung paralleler Algorithmen und Verfahren als Paradigma
Verständnis grundlegender Architekturen und Modelle Praktische Erfahrungen mit Erlang und Java
VL Programmierparadigmen | Kapitel 7 | K. Sattler2 of 107
Disclaimer
10.05.2019
Material basierend auf: Gutknecht: VL Parallel Programming, ETHZ 2011 Armstrong: Programming Erlang: Software for a Concurrent
World Herlihy, Shavit: The Art of Multiprocessor Programming, Morgan
Kaufmann, 2008. Dean, Ghemawat: MapReduce: Simplified Data Processing on
Large Clusters, OSDI 2004.
VL Programmierparadigmen | Kapitel 7 | K. Sattler3 of 107
Motivation: The free lunch is over
10.05.2019
Herb Sutter: Dr. Dobb‘s Journal 30(3), 2005.
VL Programmierparadigmen | Kapitel 7 | K. Sattler4 of 107
The free lunch is over /2
10.05.2019
Taktfrequenz wächst nur noch langsam Physikalische Gründe: Wärmeentwicklung, Energiebedarf,
Kriechströme, ... 10 GHz-CPU????
Auswege Hyperthreading:
Abarbeitung mehrere Threads auf einer CPU (5..15% Performanzgewinn) Einfache Hardwareunterstützung (einige Register)
Multicore: Mehrere CPUs auf einem Chip Billiger als echte Mehrprozessorsysteme
Caching: Vergrößerung L1, L2, L3-Cache Speicherzugriff 10..50x teurer als Cache
VL Programmierparadigmen | Kapitel 7 | K. Sattler5 of 107
Motivation: Konsequenzen und Trends
10.05.2019
Applikationen müssen nebenläufig programmiert werden, um CPU auszunutzen → Many-Core-Systeme
CPU-Boundness von Applikationen Effizienz und Performanzoptimierung werden immer
wichtiger Unterstützung von Nebenläufigkeit/Parallelität durch
Programmiersprachen
VL Programmierparadigmen | Kapitel 7 | K. Sattler6 of 107
Einordnung
10.05.2019
Zerlegung der Berechnung in Teilaufgaben
Ausführung
zur gleichen Zeitunter Nutzung verschiedener
(räumlich) verteilter Ressourcen
Parallelprogrammierung Verteilte Programmierung
VL Programmierparadigmen | Kapitel 7 | K. Sattler7 of 107
Architekturen
10.05.2019
SIMD SMP NUMA Cluster Grid
VL Programmierparadigmen | Kapitel 7 | K. Sattler8 of 107
Multiprozessorsysteme
10.05.2019
cache
BusBus
memory
cachecache
Abb. aus
VL Programmierparadigmen | Kapitel 7 | K. Sattler9 of 107
Hauptspeicherzugriffe sind teuer ...
processor
memory
interconnect
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler10 of 107
Cache-Zugriffe sind schneller …
cacheHit!
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler11 of 107
Multicore-Systeme
10.05.2019
cache
BusBus
memory
cachecachecache
VL Programmierparadigmen | Kapitel 7 | K. Sattler12 of 107
Symmetrisch vs. Nicht-symmetrisch
10.05.2019
SMP: Symmetric Multi Processing NUMA: Non-Uniform Memory Access
SMP
memory
NUMA
LokalerZugriff
LokalerZugriff
EntfernterZugriff
VL Programmierparadigmen | Kapitel 7 | K. Sattler13 of 107
CPU vs. GPU
10.05.2019
GPU = Graphics Processing Units Hochparallele Prozessorarchitekturen (nicht nur) für
Grafikrendering, z.B. von NVIDIA
VL Programmierparadigmen | Kapitel 7 | K. Sattler14 of 107
Flynn‘s Architekturklassifikation
10.05.2019
SISD: von Neumann
MISD: Fehlertoleranz
SIMD: Vektorprozessor
MIMD: Supercomputer
Folie nach Gutknecht: VL Parallel Programming, ETHZ 2011
Wikipedia
VL Programmierparadigmen | Kapitel 7 | K. Sattler15
Maße zur Leistungsbewertung
10.05.2019
Maße für den Laufzeitgewinn durch Parallelisierung Tn = Laufzeit des Programms mit n Prozessoren/Kernen Speedup
Effizienz
VL Programmierparadigmen | Kapitel 7 | K. Sattler16 of 107
Amdahlsches Gesetz
10.05.2019
Berücksichtigung parallelisierbarer und serieller Anteile im Programmablauf n Prozessoren p = paralleler Anteil s = serieller Anteil
Maximaler Speedup
p + s = 1
seriell
seriell
parallel parallel
VL Programmierparadigmen | Kapitel 7 | K. Sattler17 of 107
Amdahlsches Gesetz
10.05.2019
Quelle: WikipediaVL Programmierparadigmen | Kapitel 7 | K. Sattler18 of 107
Programmiermodelle
10.05.2019
Abstraktion (niedrig → hoch)
Spei
cher
(ge
mei
nsam
→ve
rtei
lt)
Datenparallelität
DirektivenbasierteProgrammierungThreadprogrammierung
Nachrichten-austausch (MPI)
VL Programmierparadigmen | Kapitel 7 | K. Sattler19 of 107
Parallelisierungsarten
10.05.2019
Task-Parallelität Ausnutzung inhärenter Parallelität durch simultane Ausführung
unabhängiger Aufgaben
Datenparallelität Gemeinsame Operation auf homogener Datenmenge Zerlegung eines Datensatzes in kleinere Abschnitte
VL Programmierparadigmen | Kapitel 7 | K. Sattler20 of 107
Taskparallelität
10.05.2019
Unabhängigkeit von Teilprozessen →„Desequenzialisierung“
Beispiel: Quicksort
Pivotelementbestimmen,
Feld aufteilenlinke Teilfolge sortieren
rechteTeilfolge sortieren
Pivotelementbestimmen,
Feld aufteilen
linke Teilfolge sortieren
rechteTeilfolgesortieren
Para
llelis
ieru
ng
VL Programmierparadigmen | Kapitel 7 | K. Sattler21 of 107
Datenparallelität
10.05.2019
Homogene Datenmenge: Felder, Listen, Dokumentmenge, ...
Verteilung der Daten Alle Prozessoren führen gleiches Programm auf jeweils
eigenen Daten aus Beispiel: Matrixaddition S = A + B
for(i=0; i < Z; i++)for(j=0; j < S; j++) S[i][j] = A[i][j] +
B[i][j];
A, B verteilen: [i,j] auf Prozessor [i][j]
S=A+B (parallel)
S zusammensetzen
VL Programmierparadigmen | Kapitel 7 | K. Sattler22 of 107
Parallele Programmierung in Erlang
10.05.2019
Unterstützung paralleler Programmierung in Erlang Leichtgewichtige Prozesse und Message Passing SMP-Support
Ziele für effiziente Parallelisierung: Problem in viele Prozesse zerlegen (aber nicht zu viele ...) Seiteneffekte vermeiden (würde Synchronisation erfordern ...) Sequentiellen Flaschenhals vermeiden (Zugriff auf gemeinsame
Ressourcen: IO, Registrierung von Prozessen, ...)
Small Messages, Big Computation!
VL Programmierparadigmen | Kapitel 7 | K. Sattler23 of 107
SMP-Erlang
10.05.2019
smp:2:2 = 2 scheduler auf 2 Kernen kann mit –smp [disable|enable|auto] beeinflusst werden +S [Anzahl] bestimmt Anzahl der Scheduler Sollte nicht größer als Anzahl der Kerne/Prozessoren sein
$ erlErlang R13B04 (erts-5.7.5) [source] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]
Eshell V5.7.5 (abort with ^G)1>
VL Programmierparadigmen | Kapitel 7 | K. Sattler24 of 107
Scheduler in Erlang
10.05.2019
Scheduler
Run queue
Erlang VM – Single Processor
Scheduler #1
Run queue
Scheduler #n
Erlang VM – SMP
VL Programmierparadigmen | Kapitel 7 | K. Sattler25 of 107
Ansätze zur Parallelisierung
10.05.2019
Beispiel: Berechnung einer (zufällig generierten) Liste von Fibonacci-Zahlen Sequentielle Lösung: über list:map/2
% Berechnung der Fibonacci-Zahl für Ffibo(0) -> 0;fibo(1) -> 1;fibo(F) when F > 0 -> fibo(F - 1) + fibo(F - 2).
% Liste von Num Fibonacci-Zahlenrun(Num) ->
Seq = lists:seq(1, Num),% Zufallszahlen erzeugenData = lists:map(fun(_) -> random:uniform(20) end,
Seq),lists:map(fun fibo/1, Data).
VL Programmierparadigmen | Kapitel 7 | K. Sattler26 of 107
pmap: Parallele Funktionen höherer Ordnung
10.05.2019
Parallele Variante von lists:map(Fun, List) Für jedes Listenelement einen Prozess erzeugen Ergebnisse einsammeln
Armstrong: Programming Erlang: Software for a Concurrent Worldwww.erlang-dach.org/blog/
pmap(F, L) ->S = self(), % Berechnung der Fibonacci-Zahl für F
Pids = lists:map(fun(I) ->% Prozess erzeugen
spawn(fun() -> do_fun(S, F, I) end) end, L),
% Ergebnisse einsammelngather(Pids).
VL Programmierparadigmen | Kapitel 7 | K. Sattler27 of 107
pmap: Hilfsfunktionen
10.05.2019
Eigentliche Verarbeitungsfunktion ausführen
do_fun(Parent, F, I) ->% catch sorgt für korrekte Behandlung von Fehlern in F% Parent ist der ElternprozessParent ! { self(), (catch F(I))}.
% rekursive Implementierunggather([Pid | T]) -> receive % Ordnung der Ergeb. entspricht Ordnung der Argumente
{ Pid, Ret } -> [Ret | gather(T)] end;
gather([]) -> [].
Einsammeln der Ergebnisse
VL Programmierparadigmen | Kapitel 7 | K. Sattler28 of 107
Parallele Berechnung der Fibonacci-Zahlen
10.05.2019
% Liste von Num Fibonacci-Zahlenrun(Num) ->
Seq = lists:seq(1, Num),% Zufallszahlen erzeugenData = lists:map(fun(_) ->
random:uniform(20) end, Seq),% Berechnung parallel ausführenpmap(fun fibo/1, Data).
VL Programmierparadigmen | Kapitel 7 | K. Sattler29 of 107
Diskussion
10.05.2019
Passende Abstraktion wählen Ist Ordnung der Ergebnisse notwendig? Werden Ergebnisse benötigt?
Anzahl der parallelen Prozesse Abhängig von Berechnungsmodell, Hardware etc. evtl. pmap mit max. Anzahl gleichzeitiger Prozesse
Berechnungsaufwand der Prozesse Berechnung vs. Daten/Ergebnisse senden
VL Programmierparadigmen | Kapitel 7 | K. Sattler30 of 107
pmap: Alternative Implementierung
10.05.2019
ohne Berücksichtigung der Ordnung der Ergebnismenge
pmap(F, L) -> …gather2(length(L), Ref, []) .
gather2(N, Ref, L) ->receive{Ref, Ret } -> gather2(N-1, Ref, [Ret | L])end;
gather2(0,_, L) -> L.
VL Programmierparadigmen | Kapitel 7 | K. Sattler31 of 107
Diskussion /2
10.05.2019
Speedup im Vergleich: Fibonacci-Zahlen vs. Listen sortieren (Armstrong: Programming Erlang: Software for a Concurrent World)
VL Programmierparadigmen | Kapitel 7 | K. Sattler32 of 107
Datenparallelität: MapReduce
10.05.2019
Parallelisierungsmuster inspiriert von Konzepten funktionaler Programmiersprachen (map, reduce/fold)
Grundidee: map(f, seq) = wende Funktion f (als Argument
übergeben) auf alle Elemente einer Folge seq an, z.B. multipliziere jedes Element mit 2
reduce(f, seq) = wende eine Funktion f schrittweise auf die Element einer Folge seq an und produziere einen einzelnen Wert, z.B. die Summe aller Elemente der Folge
VL Programmierparadigmen | Kapitel 7 | K. Sattler33 of 107
map in Erlang
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
Definition von map (auch als lists:map/2)
Definition der Multiplikation
Anwendung
map(_, []) -> [];map(F, [H|T]) -> [F(H)|map(F,T)].
mult(X) -> X * 2.
1> S=[1,2,3,4]. [1,2,3,4]2> mr:map(fun mr:mult/1, S).[2,4,6,8]
34 of 107
reduce in Erlang
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
Definition von reduce ((auch als lists:foldl/3 bzw. lists:foldr/3)
Definition der Addition
Anwendung
reduce(_, Init, []) -> Init;reduce(F, Init, [H|T]) -> reduce(F, F(H,Init), T).
add(X, Y) -> X + Y.
1> S=[1,2,3,4]. [1,2,3,4]2> mr:reduce(fun mr:add/2, 0, S).10
35 of 107
Parallelisierung von map und reduce
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
map: Funktion f kann unabhängig (=parallel) auf jedes Element
angewendet werden Partitionieren und Verteilen der Elemente der Folge
reduce: prinzipiell ähnlich, d.h. Funktion f kann auf Paare unabhängig
angewendet werden
3 2 4 8 5 1 7 6
5 12 6 13
17 18
35
36 of 107
Motivation
10.05.2019
Google-Beispiel (Kleber, Google Inc.) 20+ Mrd. Webseiten a 20 KB = 400+ TB 1 Computer mit 30..35 MB/s Disk-IO
4 Monate um alle Webdaten zu lesen
1000 Festplatten für Speicherung der Daten
Noch mehr, wenn Daten auch noch verarbeitet werden sollen ...
Aber: 1000 Maschinen können die gleiche Arbeit in <3 Stunden leisten!
VL Programmierparadigmen | Kapitel 7 | K. Sattler37 of 107
Motivation /2
10.05.2019
Hoher Programmieraufwand Kommunikation und Koordination Umgang mit Ausfällen
1 Server →Ausfälle alle 3 Jahre (1000 Tage)
10.000 Server → 10 Ausfälle pro Tag!
Statusüberwachung Debugging und Optimierung ...
Muss für jedes zu lösende Problem neu implementiert werden!
VL Programmierparadigmen | Kapitel 7 | K. Sattler38 of 107
Idee
10.05.2019
Programmiermodell, -muster, -Framework das alle diese Details übernimmt
Funktionales Paradigma: Funktionen höherer Ordnung ähnlich wie map, pmap, ...
J. Dean, S. Ghemawat: MapReduce: Simplified Data Processing on Large Clusters, OSDI 2004.
VL Programmierparadigmen | Kapitel 7 | K. Sattler39 of 107
Prinzip
10.05.2019
1. Lese Daten (eine Menge davon...)2. Map: Extrahiere die für das Problem relevanten
Ausschnitte aus einem Datensatz3. Shuffle & Sort4. Reduce: Aggregiere, Fasse zusammen, Filtere oder
Transformiere den Datensatz5. Schreibe Ergebnis
Ablauf ist immer gleich, nur Map und Reduce sind problemspezifisch
VL Programmierparadigmen | Kapitel 7 | K. Sattler40 of 107
MapReduce als Programmiermodell
10.05.2019
Datenstruktur: Schlüssel-Wert-Paar (k, v) Beispiel:
Schlüssel = URL
Wert = Dokumenttext
Funktionen map(k1, v1) ➙list of (k2, v2) reduce(k2, list of (v2)) ➙ list of (v2)
Datenparallelität: map, reduce – jeweils gleiche Funktion auf verschiedenen Daten
VL Programmierparadigmen | Kapitel 7 | K. Sattler41 of 107
Ablauf
10.05.2019
in1
in2
in3
in4
map
map
map
map
reduce
reduce
reduce
out1
out2
out3
shuffle, sort, ...
VL Programmierparadigmen | Kapitel 7 | K. Sattler42 of 107
Standardbeispiel Wortzählung
10.05.2019
Bestimme die Häufigkeit des Auftretens von Wörtern in einer großen Dokumentenmenge
map(String key, String value): // key: Dokumentname// value: Inhalt des
Dokumentesforeach word w in value:EmitIntermediate(w, “1”);
reduce(String key, Iterator values): // key: ein Wort// values: Liste von Häufigkeitenint result = 0;foreach v in values: result += ParseInt(v);
Emit(key, AsString(result));
VL Programmierparadigmen | Kapitel 7 | K. Sattler43 of 107
Wortzählung: map
10.05.2019
Über allen Wipfeln ist
Ruh!
doc1
Key Value
Auf allen Wipfeln sitzen Zwerge mit
Zipfeln.
doc2
Key Value
Map
{Über, 1}{allen, 1}{Wipfeln, 1}{ist, 1}{Ruh, 1}
{Auf, 1}{allen, 1}{Wipfeln, 1}{sitzen, 1}{Zwerge, 1}...
{ Wort, Häufigkeit }
VL Programmierparadigmen | Kapitel 7 | K. Sattler44 of 107
Wortzählung: reduce
10.05.2019
Reduce
{ Über, [1] } { allen, [1, 1] }{ Auf, [1] } { Wipfeln, [1, 1]
{ Wort, Liste von Häufigkeiten }
{ Über, [1] } { Auf, [1] } { allen, [2] } { Wipfeln, [2] }
{ Wort, Gesamthäufigkeiten }
VL Programmierparadigmen | Kapitel 7 | K. Sattler45 of 107
shuffle/sort
10.05.2019
MapReduce-Framework sammelt alle Paare mit dem gleichen Schlüssel ...
empfangen aller Paare (combine)
Sowie sortieren nach Schlüssel k2 (sort)
und teilt diese einem Reducer zu partitionieren anhand des Schlüssels (partition)
sowie verteilen (shuffle)
VL Programmierparadigmen | Kapitel 7 | K. Sattler46 of 107
Weitere Anwendungsfälle
10.05.2019
Verteilte Textsuche
Map: verarbeite Dokument/Dokumentausschnitt + gib gefundene Zeile aus
Reduce: -
> grep Erlang *.html
194.145.89.65 - - [19/Oct/2010:14:57:21 +0200] "GET /index.html/
[ / / ]/impressum.html HTTP/1.0"
194.145.89.65 - - [19/Oct/2010:14:57:21 +0200] "GET /index.htmlHTTP/1.0"194.145.89.65 - - [19/Oct/2010:14:57:21 +0200] "GET /about.htmlHTTP/1.0"195.36.75.26 - - [19/Oct/2010:14:58:54 +0200] "GET /impressum.html HTTP/1.0"
Häufigkeit von Zugriffen auf URLs (z.B. aus Logdateien)
Map: Verarbeite Logdatei, gib { URL, 1 } aus Reduce: wie bei Worthäufigkeit
VL Programmierparadigmen | Kapitel 7 | K. Sattler47 of 107
Weitere Anwendungsfälle /2
10.05.2019
Aufbau eines invertierten Index, z.B. für eine Suchmaschine Map: Extrahieren von Wörtern aus Dokumenten und Ausgabe
von {Wort, DokumentID}-Paaren Reduce: Erstellen von {Wort, list of DokumentID}-Paaren
Verteiltes Sortieren Map: Ausgabe der einzelnen Sätze (Schlüssel-Wert-Paare) Reduce: -
VL Programmierparadigmen | Kapitel 7 | K. Sattler48 of 107
MapReduce in Erlang
10.05.2019
Einfache Beispielimplementierung basierend auf: Armstrong: Programming Erlang: Software for a Concurrent World
mapreduce(Map, Reduce, L)
Map = fun(Pid, X) -> voidReduce = fun(Key, [Value], Acc0) -> AccL = [X]
VL Programmierparadigmen | Kapitel 7 | K. Sattler49 of 107
MapReduce in Erlang /2
10.05.2019
Map: Mapper
Sendet Folge von { Key, Value }-Nachrichten an Prozess Pid
Für jeden Wert X in Liste L wird neuer Prozess erzeugt
Reduce: Reducer
Alle Werte Value für einen Key werden zusammengefasst
Für jedes Paar { Key, [Value] } wird Reduce aufgerufen mit initialen Wert Acc0
L: Liste der X-Werte
VL Programmierparadigmen | Kapitel 7 | K. Sattler50 of 107
Erlang-Implementierung
10.05.2019
Start-Prozess Startet eigentliche Verarbeitung Wartet auf Ergebnis
mapreduce(Map, Reduce, L) ->S = self(),Pid = spawn(fun() -> master(S, Map, Reduce, L) end),receive
{ Pid, Result } -> Resultend.
VL Programmierparadigmen | Kapitel 7 | K. Sattler51 of 107
Erlang-Implementierung: Master
10.05.2019
master(Parent, Map, Reduce, L) ->process_flag(trap_exit, true),ReducePid = self(),% Erzeuge für jedes Listenelement einen Map-Prozesslists:foreach(fun(X) -> spawn_link(fun() ->
worker(ReducePid, Map, X) end) end, L),% Warte auf length(L) Prozesse, die in Dict. SchreibenDict = collect_replies(length(L), dict:new()),% Wende Reduce-Funktion F2 anAcc = dict:fold(Reduce, [], Dict),Parent ! { self(), Acc }.
Startet Map-Prozesse für jedes Element der Liste L Verwaltet gemeinsame Datenstruktur für Zuordnung aller Werte zu einem
gemeinsamen Schlüssel Wartet auf Terminierung aller Map-Prozesse und startet Reduce-Funktion
VL Programmierparadigmen | Kapitel 7 | K. Sattler52 of 107
Erlang-Implementierung /3
10.05.2019
Sammelt und mischt { Key, Value }-Nachrichten von N Prozessen
collect_replies(0, Dict) -> Dict;collect_replies(N, Dict) ->receive{ Key, Val } ->case dict:is_key(Key, Dict) oftrue -> % Schlüssel bereits vorhandenDict1 = dict:append(Key, Val, Dict),collect_replies(N, Dict1);
false -> % neuer SchlüsselDict1 = dict:store(Key, [Val], Dict),collect_replies(N, Dict1)
end;{ ‘EXIT’, _, _} -> % Map-Prozess terminiertcollect_replies(N-1, Dict)
end.
VL Programmierparadigmen | Kapitel 7 | K. Sattler53 of 107
Erlang-Implementierung /4
10.05.2019
Aufruf der Map-Funktion Führt F(Pid, X) aus F muss { Key, Value } an Pid senden und anschließend
terminieren
worker(ReducePid, F, X) ->F(ReducePid, X).
VL Programmierparadigmen | Kapitel 7 | K. Sattler54 of 107
Einschub: Akkumulatoren in Erlang
10.05.2019
Akkumulator = zusätzlicher Parameter einer Funktion zum Einsammeln von Informationen in einer Variablen
Beispiel: Mittelwert( ) ( )avg(X) -> avg(X, 0, 0).
avg([H|T], Len, Sum) -> avg(T, Len + 1, Sum + H);
avg([], Len, Sum) ->Sum / Len.
VL Programmierparadigmen | Kapitel 7 | K. Sattler55 of 107
Einschub: Erlang-Modul dict
10.05.2019
Tabelle von Schlüssel-Wert-Paaren Implementierung z.B. als Hashtabelle, Baum, ... dict:append(Key, Value, Dict) – neuen Wert zu Schlüssel
hinzufügen dict:store(Key, Value, Dict) – speichere Schlüssel-Wert-Paar dict:is_key(Key, Dict) – ist Schlüssel vorhanden? dict:fold(Fun, Acc0, Dict) – ruft Fun auf Schlüssel-Wert-Paaren
mit Akkumulator Acc0 auf, Fun muss Akkumulator liefern
VL Programmierparadigmen | Kapitel 7 | K. Sattler56 of 107
Prinzip der Erlang-Implementierung
10.05.2019
worker
worker
worker
worker
worker
Map
masterReduce
{ Key, Value }
Dict
map_reduce
spawn
spawn
Map
Map
Map
Map
VL Programmierparadigmen | Kapitel 7 | K. Sattler57 of 107
Wortzählung in Erlang
10.05.2019
Map-Funktion: Wortweises Lesen einer Datei (genauer: termweises Lesen)
Reduce-Funktion: Anzahl der Einträge (Vals) pro Schlüssel
generate_words(Pid, FileName) ->{ ok, [Words] } = file:consult(FileName),lists:foreach(fun(Word) -> Pid ! { Word, 1 } end, Words).
count_words(Key, Vals, A) ->[{ length(Vals), Key }|A].
VL Programmierparadigmen | Kapitel 7 | K. Sattler58 of 107
Wortzählung in Erlang
10.05.2019
Nutzung der mapreduce-Funktion
Dokumente:
run() ->% Map-FunktionF1 = fun generate_words/2,% Reduce-FunktionF2 = fun count_words/3,% Liste der zu verarbeitenden DateienFiles = [“f1.txt”, “f2.txt”, “f3.txt”],% VerarbeitungL = map_reduce(F1, F2, Files),% Ausgabe des Ergebnisseslists:reverse(lists:sort(L)).
[eins,zwei,drei,vier].
VL Programmierparadigmen | Kapitel 7 | K. Sattler59 of 107
Einschränkungen der Implementierung
10.05.2019
Nur ein Reduce-Prozess Keine echte Parallelisierung des Reduce-Schrittes Gemeinsame (zentrale) Datenstruktur (hier Dict)
Beschränkte Fehlertoleranz Kein Neustart von Jobs im Fehlerfall ...
VL Programmierparadigmen | Kapitel 7 | K. Sattler60 of 107
Verbesserung: mehrere Reducer
10.05.2019
master(Parent, Map, Reduce, L) ->process_flag(trap_exit, true),MasterPid = self(),
% Erzeuge für jedes Listenelement einen Map-Prozesslists:foreach(fun(X) -> spawn_link(fun() ->
Map(MasterPid, X) end) end, L),% Warte auf length(L) Prozesse, die in Dictionary schreibenIDict = collect_replies(length(L), dict:new()),R = dict:to_list(IDict),% Erzeuge für jedes R-Element einen Reduce-Prozesslists:foreach(fun({K,V}) -> spawn_link(fun() ->
Reduce(MasterPid, {K, V}) end) end, R),% Warte auf length(R) Prozesse, die in Dictionary schreibenODict = collect_replies(length(R), dict:new()),
% Sende ErgebnisParent ! { self(), dict:to_list(ODict)}.
VL Programmierparadigmen | Kapitel 7 | K. Sattler61 of 107
Verbesserung /2
10.05.2019
Modifizierte Reduce-Funktion
count_frequency(Pid, {Word, Vals}) ->[{Freq, _}] = count_words(Word, Vals, []),Pid ! { Word, Freq }.
VL Programmierparadigmen | Kapitel 7 | K. Sattler62 of 107
MapReduce mit Hadoop
10.05.2019
Framework für skalierbare, verteilte/parallele Verarbeitung Java-Implementierung von MapReduce
für Cluster-Umgebungen Einsatz bei Facebook, Yahoo,
Baidu, ... Mehrere Komponenten HDFS: verteiltes Dateisystem MapReduce: ;-) Pig: Programmiersprache für
Datenanalyse Hbase: einfache Datenbank ....
Foto: Yahoo!
VL Programmierparadigmen | Kapitel 7 | K. Sattler63 of 107
HDFS
10.05.2019
Dateisystem für die Verwaltung sehr großer Datenmengen (TB ... PB) in einem Cluster
Redundante Speicherung der Daten (Blockgröße: 64 MB)
2, 1, 4 4, 5, 3 1, 2, 5, 3
/dir/file1 → 1,2,3/dir/file2 → 4.5
NameNode
DataNode
VL Programmierparadigmen | Kapitel 7 | K. Sattler64 of 107
HDFS: Nutzung
10.05.2019
Dateien anzeigen
Verzeichnis anlegen
Datei in HDFS kopieren
Datei aus HDFS kopieren
> bin/hadoop dfs –ls /
> bin/hadoop dfs –mkdir /mydir
> bin/hadoop dfs –put myfile /mydir
> bin/hadoop dfs –get myfile meine-lokale-datei
VL Programmierparadigmen | Kapitel 7 | K. Sattler65 of 107
WordCount mit Hadoop: Map
10.05.2019
public static class Mapextends Mapper<LongWritable, Text, Text, IntWritable> {final static IntWritable one = new IntWritable(1);Text word = new Text();
public void map(LongWritable key, Text value,Context context) throws Exception {
String line = value.toString();StringTokenizer tokenizer = new StringTokenizer(line);while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());context.write(word, one);
}}
}
VL Programmierparadigmen | Kapitel 7 | K. Sattler66 of 107
WordCount mit Hadoop: Reduce
10.05.2019
public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {public void reduce(Text key,
Iterator<IntWritable> values,Context context) throws Exception {
int sum = 0;while (values.hasNext()) {
sum += values.next().get();}context.write(key, new IntWritable(sum));
}}
VL Programmierparadigmen | Kapitel 7 | K. Sattler67 of 107
WordCount mit Hadoop: main
10.05.2019
public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf, "word count");job.setJarByClass(WordCount.class);// Struktur des Ergebnisses festlegenjob.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// Mapper und Reducer setzenjob.setMapperClass(Map.class);job.setCombinerClass(Reduce.class);job.setReducerClass(Reduce.class);// Pfade als ProgrammparameterFileInputFormat.setInputPath(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));System.exit(job.waitForCompletion(true) ? 0 : 1);// los geht’s
}
VL Programmierparadigmen | Kapitel 7 | K. Sattler68 of 107
WordCount: Ausführung
10.05.2019
1. Ablage der Dokumente im HDFS2. Starten des Programms
Weitere Details unter:http://hadoop.apache.org/common/docs/current/mapred_tutorial.html
> bin/hadoop jar /users/mr/wordcount.jar tuil.pp.WordCount \/users/mr/wc/input /users/mr/wc/output
VL Programmierparadigmen | Kapitel 7 | K. Sattler69 of 107
Job und Task Tracker
10.05.2019
Job Tracker: Koordinator für MR-Jobs Clients senden MR-Job an Tracker Kennt Datenverteilung und Knoten Startet ggf. Tasks neu
Task Tracker: läuft auf jedem Knoten Verantwortlich für einzelnen Map- bzw. Reduce-Jobs
Task Tracker Task Tracker Task Tracker
Job TrackerMR-Job
VL Programmierparadigmen | Kapitel 7 | K. Sattler70 of 107
Diskussion
10.05.2019
Textdateien als Ein- und Ausgabe Erweiterungen: Kompression, Caching, Records
Job-Kontrolle Verschiedene Scheduling-Strategien Überwachung evtl. Ausfälle, Job-Queues, ....
...
VL Programmierparadigmen | Kapitel 7 | K. Sattler71 of 107
Hadoop: Anwendung
10.05.2019
http://wiki.apache.org/hadoop/PoweredBy Ebay: 532 Knoten (8 * 532 cores, 5.3PB), Suchoptimierung Facebook: 1100 Knoten (8800 cores 12 PB), Analyse und
Reporting LinkedIn: 520 Knoten, „People You May Know“ Yahoo!: > 100,000 CPUs in >40,000 Compute, Ad Systems
und Web Search ...
VL Programmierparadigmen | Kapitel 7 | K. Sattler72 of 107
MapReduce: Fazit
10.05.2019
Programmierparadigma und Software-Framework Datenparallele Tasks in Large-scale Clustern: Analyse
großer Datenmengen Praktischer Einsatz Aktives Forschungsgebiet
Aber: nicht Allheilmittel für alle Parallelisierungsaufgaben!
VL Programmierparadigmen | Kapitel 7 | K. Sattler73 of 107
Parallelprogrammierung in Java
10.05.2019
Unterstützung durch Thread-Konzept eingebaute Mechanismen zur Synchronisation nebenläufiger
Prozesse spezielle High-Level-Klassen im Package java.util.concurrency
VL Programmierparadigmen | Kapitel 7 | K. Sattler74 of 107
Threads
10.05.2019
„leichtgewichtig“ im Vergleich zu Betriebssystemprozess
Threads eines Prozesses teilen sich Adressraum
Thread kann von CPU oder Core ausgeführt werden
Thread („Faden“) = leichtgewichtige Ausführungseinheit oder Kontrollfluss (Folge von Anweisungen) innerhalb eines sich in Ausführung befindlichen Programms
Adressraum
VL Programmierparadigmen | Kapitel 7 | K. Sattler75 of 107
Threads in Java
10.05.2019
Repräsentiert durch Klasse java.lang.Thread Implementierung eines eigenen Kontrollflusses Implementierung der Schnittstelle java.lang.Runnable
keine weitere Beeinflussung des Threads über zusätzliche Methoden notwendig
soll von anderer Klasse als Thread abgeleitet werden
Subklasse von java.lang.Thread zusätzliche Methoden zur Steuerung des Ablaufs benötigt
keine andere Superklasse notwendig
VL Programmierparadigmen | Kapitel 7 | K. Sattler76 of 107
Threads: Runnable-Schnittstelle
10.05.2019
Eigene Klasse muss Runnable implementieren Methode public void run() – wird beim Start des Threads
aufgerufen
public class Heartbeat implements Runnable {int pulse;public Heartbeat(int p) { pulse = p * 1000; }public void run() {while(true) {try {Thread.sleep(pulse);
} catch(InterruptedException e) {}System.out.println(”poch");
}}
VL Programmierparadigmen | Kapitel 7 | K. Sattler77 of 107
Thread-Erzeugung
10.05.2019
Thread-Objekt mit Runnable-Objekt erzeugen Methode start() aufrufen Ruft run() auf
public static void main(String[] args) {Thread t = new Thread(new Heartbeat(2));t.start();
}
VL Programmierparadigmen | Kapitel 7 | K. Sattler78 of 107
Threads: Subklasse von Thread
10.05.2019
Klasse muss von Thread abgeleitet werden Methode run() muss überschriebenwerden
public class Heartbeat2 extends Thread { int pulse = 1000; public Heartbeat2() {} public void setPulse(int p) { pulse = p * 1000; } public void run() { while(true) { try { Thread.sleep(pulse);
} catch(InterruptedException e) {} System.out.println(”poch");
}}
VL Programmierparadigmen | Kapitel 7 | K. Sattler79 of 107
Thread-Erzeugung
10.05.2019
Objekt der eigenen Thread-Klasse erzeugen Methode start() aufrufen Ruft run() auf
Spätere Beeinflussung durch andere Threads möglich
public static void main(String[] args) {Heartbeat2 t = new Heartbeat2(2);t.start();
}
…t.setPulse(2);
VL Programmierparadigmen | Kapitel 7 | K. Sattler80 of 107
Threads: Wichtige Methoden
10.05.2019
void start() initiiert Ausführung des Threads durch Aufruf der Methode run
void run() die eigentliche Arbeitsmethode
static void sleep(int millis) hält die Ausführung des aktuellen Threads für millis
Millisekunden an Keinen Einfluss auf andere Threads!
void join() blockiert den aufrufenden Thread so lange, bis der aufgerufene
Thread beendet ist
VL Programmierparadigmen | Kapitel 7 | K. Sattler81 of 107
Parallele Berechnung von Fibonacci-Zahlen
10.05.2019
Idee: Pro Fibonacci-Zahl einen Berechnungsthread Warten auf das Ende aller Threads mit join
Fibonacci-Zahl in Javapublic class Fibonacci implements Runnable {int fi;public Fibonacci(int f) { fi = f; }
int fibo(int f) { if (f < 2) return 1; else return fibo(f-1) + fibo(f-2); }
public void run() {int res = fibo(fi); System.out.println(“Fibonacci(“ + fi + “) = “ + res);
}}
VL Programmierparadigmen | Kapitel 7 | K. Sattler82 of 107
Parallele Berechnung von Fibonacci-Zahlen
10.05.2019
Thread-Erzeugung und Ausführung
public static void main(String[] args) { Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) { threads[i] = new Thread(new Fibonacci(40 + i)); threads[i].start();
}}
VL Programmierparadigmen | Kapitel 7 | K. Sattler83 of 107
Probleme nebenläufiger Ausführungen
10.05.2019
public class Jawsmith extends Thread {int time;String message;
public Jawsmith(int tm, String msg) {time = tm; message = msg; }
public void run() {while(true) {try {Thread.sleep(time);
} catch (InterruptedException e) { return; }
for (int i = 0; i < message.length(); i++)System.out.print(message.charAt(i));
}}
}
VL Programmierparadigmen | Kapitel 7 | K. Sattler84 of 107
Probleme nebenläufiger Ausführungen
10.05.2019
public static void main(String[] args) {Thread t1 = new Jawsmith(1, "DASISTEINELANGENACHRICHT");Thread t2 = new Jawsmith(1, "dieistaberauchnichtkurz");t1.start();t2.start();
}
Ausgabe:
..NACHRICHTdieistabDASISTEINELANGENACHRICHTerauchnichtkurz…
Race Condition (Wettlaufsituation): Ergebnis nebenläufiger Ausführung auf gemeinsamen Zustand (hier:
Ausgabekanal) hängt vom zeitlichen Verhalten der Einzeloperationen ab
VL Programmierparadigmen | Kapitel 7 | K. Sattler85 of 107
Wechselseitiger Ausschluss
10.05.2019
kritischer Abschnitt: Programmabschnitt in einem Thread, in dem auf eine gemeinsame Ressource (Speicher etc.) zugegriffen wird und der nicht parallel (oder zeitlich verzahnt) zu einem anderen Thread ausgeführt werden darf, in den
Lösung durch wechselseitigen Ausschluss (engl. mutual exclusion = mutex)
VL Programmierparadigmen | Kapitel 7 | K. Sattler86 of 107
Wechselseitiger Ausschluss in Java
10.05.2019
Schlüsselwort synchronized Implementierung von sogenannten Monitoren bzw. locks
(exklusiven Sperren) nur ein Thread darf den kritischen Abschnitt betreten
alle anderen Threads, die darauf zugreifen wollen, müssen auf warten
für Methoden: public synchronized void doSomething() nur ein Thread darf diese Methode auf einem Objekt zur gleichen Zeit
ausführen
für Anweisungen: synchronized(anObject) { ... } nur ein Thread darf den Block betreten Sperre wird durch das Objekt anObject verwaltet (jedem Java-
Objekt ist ein Sperre zugeordnet)
VL Programmierparadigmen | Kapitel 7 | K. Sattler87 of 107
Kritische Abschnitte mit synchronized
10.05.2019
synchronized(myObj) {// kritischer Abschnitt…
}
1. Sperre setzen
2. Abschnitt ausführen
3. Sperre freigeben
1. Sperre anfragen
2. warten
3. Sperre setzen
...
VL Programmierparadigmen | Kapitel 7 | K. Sattler88 of 107
wait, notify
10.05.2019
Signalisierung zwischen Threads in Java Basismethoden der Klasse java.lang.Object wait(): der aktive Thread wartet an diesem Objekt,
Sperren werden ggf. freigegeben! notify(): weckt an diesem Objekt wartenden Thread
auf notifyAll(): weckt alle an diesem Objekt wartenden
Threads auf wait() & notify() dürfen nur in einem synchronized-Block aufgerufen werden
VL Programmierparadigmen | Kapitel 7 | K. Sattler89 of 107
Die 5 speisenden Philosophen
10.05.2019
fünf Philosophen teilen sich eine Schussel Spaghetti
fünf Gabeln, je eine zwischen zwei Philosophen
Philosoph kann nur mit zwei (benachbarten) Gabeln essen
Gabeln werden nur nach dem Essen zurückgelegt
Philosoph durchläuft Zyklus von Zuständen: denken → hungrig → essen → denken etc.
VL Programmierparadigmen | Kapitel 7 | K. Sattler90 of 107
Das Problem mit den Philosophen...
10.05.2019
1. Jeder greift die linkeGabel
2. und wartet auf die rechteGabel
3. ... und wartet ...
Verklemmung
VL Programmierparadigmen | Kapitel 7 | K. Sattler91 of 107
Lösungsidee
10.05.2019
immer beide Gabeln aufnehmen, d.h. wenn nur eine Gabel verfügbar ist: liegen lassen und warten
synchronisierter Zugriff auf Gabeln, d.h. in einem kritischen Abschnitt unter gegenseitigem Ausschluss
Wecken von wartenden Philosophen
VL Programmierparadigmen | Kapitel 7 | K. Sattler92 of 107
Rahmen für Philosophen
10.05.2019
class Philosopher extends Thread { private Forks forks; // Gabelnint leftFork, rightFork; // die linke und rechte Gabelprivate int id;
// Konstruktor: initialisiert alle// Attribute und startet den Threadpublic Philosopher(int i, Forks f, int left, int right)
{ id = i;leftFork = left; rightFork = right;forks = f;start();
}…
VL Programmierparadigmen | Kapitel 7 | K. Sattler93 of 107
Leben eines Philosophen
10.05.2019
public void run() {// Anfangszustand: “Denkend”while (true) { // Warten, bis beide Gabeln verfügbar sind// und schließlich aufnehmen...// eine Weile essentry {
sleep((int) (Math.random () * 3000.0));} catch (InterruptedException exc) { }
// Gabeln niederlegen...// eine Weile nachdenkentry {
sleep((int) (Math.random () * 3000.0));} catch (InterruptedException exc) { }
}}
VL Programmierparadigmen | Kapitel 7 | K. Sattler94 of 107
Gabeln aufnehmen
10.05.2019
synchronized(forks) {// Warten, bis beide Gabeln verfügbar sindwhile (!forks.isAvailable(leftFork) ||
!forks.isAvailable(rightFork)) {// Gabeln sind belegt: Zustand ist// "Hungrig" --> wir müssen wartenSystem.out.println("Philosoph #" + id +
": HUNGRIG");try {
forks.wait();} catch (InterruptedException exc) { }
}// beide Gabeln aufnehmenforks.take(leftFork);forks.take(rightFork);System.out.println("Philosoph #" + id + ": ESSEN");
}VL Programmierparadigmen | Kapitel 7 | K. Sattler95 of 107
Gabeln ablegen
10.05.2019
synchronized(forks) {// beide Gabeln ablegenforks.put(leftFork);forks.put(rightFork);
System.out.println("Philosoph #" + id + ": DENKEN");
// alle wartenden Philosophen aufweckenforks.notifyAll();
}
VL Programmierparadigmen | Kapitel 7 | K. Sattler96 of 107
Gabeln
10.05.2019
class Forks {// 5 Gabeln: true -> frei, false -> belegt// Beginn: alle Gabeln sind verfügbarprivate boolean forks[] = { true, true, true, true,
true };
// Testet, ob Gabel verfügbar istpublic boolean isAvailable(int f) { return forks[f]; }
// Aufnehmen der Gabelvoid take(int f) { forks[f] = false; }
// Ablegen der Gabelvoid put(int f) { forks[f] = true; }
}
VL Programmierparadigmen | Kapitel 7 | K. Sattler97 of 107
Initialisierung
10.05.2019
Gabeln erzeugen
Philosophen erzeugen und Gabeln zuweisen
Forks forks = new Forks();
Philisopher[] philosophers = new Philosophers[5];for (int i = 0; i < 5; i++)
philosophers[i] = new Philosophers(i, forks, i, (i+1) % 5);
VL Programmierparadigmen | Kapitel 7 | K. Sattler98 of 107
Java: High-Level-Klassen
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
Abstraktionsschicht versteckt Details über Thread-Erzeugung
Übernimmt Erstellung und Überwachung von parallelenTasks ExecutorService zum Erzeugen asynchroner Tasks Future: Referenz auf diesen Task bzw. dessen Ergebnis ForkJoinPool & RecursiveAction: rekursives Aufteilen
eines großen Problems
Seit Java Version 1.5
99 of 107
Tasks und Futures
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
Task = logische Ausführungseinheit Thread = Mechanismus zur asynchronen/parallelen
Ausführung von Tasks
Runnable task = () -> { String me = Thread.currentThread().getName(); System.out.println("Hallo " + me);};
task.run();
Thread thread = new Thread(task);thread.start();
100 of 107
Tasks und Futures
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
Future = Resultat einer asynchronen Berechnung, d.h. einer Berechnung die erst noch stattfindet Methoden zum
Prüfen, ob Task fertig ist Prüfen, ob Task erfolgreich beendet oder abgebrochen wurde Ergebnis lesen
101 of 107
Zeit
Task starten
Future erzeugen
Wert setzen
Fertig?
Wert lesen
TaskFutureMain
Future & ExecutorService
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
ExecutorService stellt Methoden zum Starten/Beenden/Steuern von parallelen Aufgaben bereit
implementiert Executor Interface definiert Methode void execute(Runnable r)
Starten einer Aufgabe mit submit Future<T> submit(Callable c) Future<?> submit(Runnable r)
Zugriff auf das Ergebnis mit get T get(long timeout, TimeUnit unit) T get()
102 of 107
Future & ExecutorService - Beispiel
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
class App { ExecutorService executor = Executors.newFixedThreadPool(4);void search(final String w) throws InterruptedException{ Future<String> future
= executor.submit(new Callable<String>() { public String call() {
return searcher.search(target); }});
displayOtherThings(); // do other things try {
displayText(future.get()); // get is blocking } catch (ExecutionException ex) { cleanup();return; }
} }
103 of 107
RecursiveAction & Fork/Join
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
Rekursives Zerlegen eines großen Problems in kleinereProbleme
Solange bis Problem klein genug um direkt ausgeführtwerden zu können
Task erstellt zwei oder mehr Teiltasks von sich selbst --> Datenparallelität ForkJoinPool zum Ausführen implementiert Executor Interface
104 of 107
Fork/Join Beispiel
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
class MyTask extends RecursiveAction {String[] source; int start, length;
public MyTask(String[] src, int s, int l) {source = src; start = s; length = l;
}
void computeDirectly() { … }
@Overridevoid compute() {if(length < THRESHOLD)computeDirectly();
else {int split = length / 2;
invokeAll(new MyTask(source, start, split),new MyTask(source, start + split, length - split));
} } }105 of 107
Fork/Join Beispiel
10.05.2019VL Programmierparadigmen | Kapitel 7 | K. Sattler
String[] src = ...MyTask t = new MyTask(src, 0, src.length);ForkJoinPool pool = new ForkJoinPool();pool.invoke(t);
Starten der Verarbeitung:1. (große) Gesamtaufgabe erstellen2. ForkJoinPool erstellen3. Aufgabe vom Pool ausführen lassen
106 of 107
Zusammenfassung
10.05.2019
Parallelprogrammierung als wichtige Technik zur Nutzung moderner Hardware (Multicore, ...)
verschiedene Programmiermodelle Daten- und Taskparallelität in Erlang und Java MapReduce Thread-Modell und Synchronisation mit vielen weiteren
Konzepten Literatur: Armstrong: Programming Erlang: Software for a Concurrent World,
Pragmatic Bookshelf, 2007 Apache Hadoop Project, http://hadoop.apache.org/ Herlihy, Shavit: The Art of Multiprocessor Programming, Morgan
Kaufmann, 2008. Saake, Sattler: Algorithmen und Datenstrukturen, dpunkt.verlag, 2013.
VL Programmierparadigmen | Kapitel 7 | K. Sattler107 of 107