Upload
truongthuy
View
217
Download
0
Embed Size (px)
Citation preview
Verteilte Algorithmen TI5005Th. Letschert
TH Mittelhessen Gießen
University of Applied Sciences
Netze asynchroner Prozesse– Netze aus aktiven Knoten und Kanälen– dynamische und statische Prozessnetze– Topologiedefinition und Netzkonstruktion
Seite 2
Prozessnetze
Prozesse
– sind aktive Einheiten die in blockierenden Anweisungensuspendiertwerden können (eigener Stack!)
– haben ausschließlich Zugriff auf lokale Ressourcen (Speicher, ...)
– können diskrete Aktionen ausführenLokale Aktionen: interne BerechnungenNachrichten senden Nachrichten empfangen
Prozessnetze– sind verteilte Systeme
mit statischer Topologie: fixe Mengen von Prozessen mit festen Beziehungen und aktiven Knoten: Prozesse / Threads
– Prozesse können sich Nachrichten über ein (irgendwie geartetes) Kommunikationssystem senden
– Topologie: Welcher Prozess kann welchem anderen Nachrichten senden
– Beispiel: Kanal-basiertes Kommunikationssystem:
Die Prozesse sind durch uni- oder bidirektionale Punkt-zu-Punkt Kanäle verbunden
P1
P4
P2
P3 P5
Kanal
Prozess
Seite 3
Prozessnetze
Beispiel: Schreiber und Leser Pseudocode
channel k; reader = process { var v; v = k.receive do { case (v != EOS) > println(v); v = k.receive } }
writer = process { var v = v0; v = k.receive for ( v < (v1, v2 …) { k.send(v) } out.send(EOS) }
readerk
EOS : End of Stream
writer
Seite 4
Prozessnetze
Beispiel Mergesort Verteiltes Mergesort
Sortieren mit einem Netz aus mischenden Prozessen
2
5
6
7
1
4
3
8
5,4
7 1
8 6
3 2
8 6 3 2
7 5 4 1
8 7 6 5 4 3 2 1
Seite 5
Prozessnetze
Beispiel Mergesort Misch-ProzessPseudocode
channel in1, in2, out; Merge = process { var v1, v2; v1 = in1.receive v2 = in2.receive do { case (v1 != EOS and v2 != EOS) > case { v1 <= v2 > out.send(v1); v1 = in1.receive; v2 <= v1 > out.send(v2); v2 = in2.receive; } case (v1 != EOS and v2 == EOS) > out.send(v1); v1 = in1.receive; case (v1 == EOS and v2 != EOS) > out.send(v2); v2 = in2.receive; } out.send(EOS) }
in1
in2
out
EOS : End of Stream
Seite 6
Kanäle und Ports
Kanal: virtuell oder real Virtuell: Bekanntschaft von Prozessen
Ein Kanal definiert / ist eine Bekanntschaft von Knoten:
Knoten können Knoten die sie „kennen“ Nachrichten senden.
Real: KommunikationsmediumEin Kanal kann als reales physisches Medium der Kommunikation verstanden werden.
Beide Sichten sind prinzipiell äquivalent
– im ersten Fall haben wir ein zentrales Kommunikationsmedium
– im zweiten Fall ist das Kommunikationsmedium verteilt:
Das Kommunikationsmedium besteht aus der Menge der Kanäle
receiverk
sender
receiversender
Bekanntschaft
Nachrichtentransport
k
Bekanntschaft
Nachrichtentransport
Seite 7
Kanäle und Ports
Kanal-Kapazität Kapazität 0
Kanal puffert nichtKommunikation ist synchronKanäle mit Kapazität 0: „Leitungen“
Kapazität >0 Kanal puffertKommunikation ist asynchronVirtuelle Kanäle mit Kapazität >0 sind möglich:Der empfangende (oder der sendende) Prozess puffert
Puffer
Puffer
Virtueller Kanal, Empfänger puffert:asynchrone Kommunikation
Virtueller Kanal, Empfänger puffert nicht: synchrone Kommunikation
Kanal puffert: asynchrone Kommunikation
Kanal puffert nicht: synchrone Kommunikation
Seite 8
Kanäle und Ports
PortsProzess: Definition und InstanzNetze enthalten oft viele gleichartige Prozesse, es ist darum sinnvoll zwischen der Definition eines Prozesses und der einer Prozessinstanz zu unterscheiden.
Problem: generische Partnerwahl Wie kann man sich in einer Prozessdefinition auf Kanäle / Kommunikationspartner beziehen, die in sich in Prozessinstanzen unterscheiden?
Port-KonzeptIn Prozessdefinitionen beziehen sich Kommunikationsanweisungen auf Ports, in Prozessinstanzen sind Ports zu Kanälen / Kommunikationspartner aufgelöst / gebunden.
Intuitiv: Knoten werden instantiiert und dann an Ports verbunden.
Definition
Instantiierung und Verbindung durch Kanäle
Seite 9
Kanäle und Ports
Port: real oder virtuellVirtuell Ports haben keine physische Existenz in Instantiierungen der Knoten. (D.h. es gibt sie nicht zur „Laufzeit“)
Abhängig vom Kanalkonzept werden sie realisiert als
– weitere (Adress-) Bestandteile der Nachrichten (zentrales Kommunikationsmedium)
– Endpunkte von Kanälen (verteiltes Kommunikationsmedium)
RealPorts sind reale Bestandteile von instantiierten Knoten.
Insgesamt gibt es eine breite Palette an Implementierungsmöglichkeiten für Prozesse und Netzen aus miteinander verbundenen Prozessen.
Seite 10
Prozessnetze
Statische und dynamische Prozessnetze
Statische ProzessnetzeZur Laufzeit ist die Topologie des Netzes fix:
– Keine Erzeugung neuer Knoten
– Keine Erzeugung neuer Kanäle / Prozessbekanntschaften
Statische Prozessnetze werden in drei Phasen erzeugt und ausgeführt:
– Definition von Prozessen (Knoten-Typen)
– Definition der Topologie: Knoten (Prozessinstanzen) und ihre Verbindung
– Ausführung der Prozesse
Dynamische ProzessnetzeDie Topologie des Netzes ist während der Laufzeit nicht fix:
– Neue Knoten können erzeugt werden
– Kanäle / Prozessbekanntschaften können erzeugt und modifiziert werden
Die Ausführung dynamischer Prozessnetze kann nicht in Phasen unterteilt werden
Seite 11
Dynamische Prozessnetze
Dynamische ProzessnetzeDie Topologie des Netzes kann zur Laufzeit konstruiert / verändert werden
Beispiel: einfaches „selbst-entfaltendes“ Prozess-Netz für Mergesort
object MergeNode { val EOF = -1}
class MergeNode(out: Channel[Int], lst: List[Int]) extends Runnable { import MergeNode.EOF override def run(): Unit = {
if (lst.length == 1) { out.send(lst(0)) out.send(EOF) out.close()
} else {
val in_1 = new Channel[Int] val in_2 = new Channel[Int] val n = lst.length val pred_1 = new MergeNode(in_1, lst.take(n/2)) val pred_2 = new MergeNode(in_2, lst.drop(n/2)) new Thread(pred_1).start() new Thread(pred_2).start()
. . .
}
}
object Printer extends Thread { val in = new Channel[Int] override def run(): Unit = { while (!in.eof()) { val v = in.receive() if (v!= MergeNode.EOF) println("\t Printer received " +v) } println("Printer finished") }}
object Merge_App extends App { val lst = List(1,3,5,7,2,4,6,8) val merger = new MergeNode(Printer.in, lst) Printer.start() new Thread(merger).start() }
Seite 12
Statische Prozessnetze / Topologie
Definition der Netztopologie: Prozesse erzeugen zu Netzen verbinden Das Port-Konzept erlaubt es die Bindung
Prozess ~> Kommunikationsendpunkt
von der Zeit der Definition des Prozesses zu verschieben auf die Zeit der Topologiedefinition.
Mit später können verschiedene Zeitpunkte gemeint sein
– statische Topologiedefinition: Topologie im Quellcode festlegen:
Bindung zum Zeitpunkt der Definition des Netzes im Quellcode
– dynamische Topologiedefinition: Netzerzeugung als Aktion zur Laufzeit
Übergabe von Referenzen, z.B. als Konstrutor-Parameter
Dependency-Injection zur Laufzeit (vie Refection oder DI-Framework)
Seite 13
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion 1
Prozesse mit Kanälen als Parameter
Channel
out in
class Producer(out: Channel[Int]) extends Runnable { val a = List(1,2,3,4,5,6,7,8,9,0) override def run(): Unit = {
for (v <- a) {out.send(v);Thread.sleep(100);
}out.close
}}
class Consumer(in: Channel[Int]) extends Runnable { override def run(): Unit = {
while(!in.eof()) { try {println(in.receive()); } catch { case e : NoSuchElementException => }}
}}
object SimpleNet_App extends App { val c = new Channel[Int] val producer = new Producer(c) val consumer = new Consumer(c) new Thread(producer).start() new Thread(consumer).start()}
producer consumer
Parameterübergabe
Seite 14
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion – 2a
Prozesse mit Kanal-Endpunkten als Parameter
An den Konstruktor der Prozesse werden Kanal-Referenzen übergeben.
Damit sind die Ports nicht auf Eingabe oder Ausgabe spezialisiert, eine falsche Nutzung ist möglich (Lesen von einen Ausgabe-Port): Kein echtes Port-Konzept.
Channel
out in
producer consumer
Channel
DataSourceDataSink
producer consumer
Besser: An den Konstruktor der Prozesse werden Referenzen auf Kanal-Endpunkte übergeben.Damit sind die Ports auf Eingabe oder Ausgabe spezialisiert. Eine falsche Nutzung der wird bereits zur Übersetzungszeit entdeckt.
besser
Parameterübergabe
Seite 15
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion – 2b
Prozesse mit Kanal-Endpunkten als ParameterChannel
DataSourceDataSink
OutPort InPorttrait InPort[T] { def receive: T def eof: Boolean}
trait OutPort[T] { def send(v:T) def close: Unit}
class ChannelWithEndPoints[T](implicit classTagOfT: ClassTag[T]) extends DataSourceProvider[T] with DataSinkProvider[T] {
val channel: Channel[T] = new Channel[T]
object inport extends InPort[T] { def receive: T = channel.receive def eof: Boolean = channel.eof }
object outport extends OutPort[T] { def send(v:T) : Unit = channel.send(v) def close: Unit = channel.close() }
def dataSource : InPort[T] = inport def dataSink : OutPort[T] = outport}
trait DataSourceProvider[T] { def dataSource : InPort[T]}
trait DataSinkProvider[T] { def dataSink : OutPort[T]}
class Producer(out: OutPort[Int]) extends Runnable { val a = List(1,2,3,4,5,6,7,8,9,0) override def run(): Unit = {
for (v <- a) {out.send(v);Thread.sleep(100);
}out.close
}}
class Consumer(in: InPort[Int]) extends Runnable { override def run(): Unit = {
while(!in.eof) { try {
println(in.receive); } catch { case e : NoSuchElementException => }}
}}
object SimpleNet_App extends App { val c = new ChannelWithEndPoints[Int] val producer = new Producer(c.dataSink) val consumer = new Consumer(c.dataSource) new Thread(producer).start() new Thread(consumer).start()}
Seite 16
Statische Prozessnetze / Topologie
Beispiel statische Netzkonstruktion
Bindung als Modulbindung
Knoten (Prozesse) enthalten Ports als abstrakte Objekt-Variablen (abstract fields)
Im Kontext eines Netzes sind die Kommunikations-Endpunkte bekannt mit denen sie belegt werden.
Werden Knoten zu Objekten instantiiert, dann werden Ports die Endpunkte eines Kanals als Wert zugewiesen.
Ports sind abstrakte Klassenmitglieder die in Instanzen (Objekten) durch die Kommunikations-Endpunkte eines blockierenden Puffers ersetzt werden.
Die Module (Quellcode-Komponenten) - Writer-Objekt- Reader-Objekt- SimpleNet_App-Objektwerden statisch im SimpleNet_App-Objekt gebunden.
abstract class Producer extends Runnable { val out: OutPort[Int] val a = List(1,2,3,4,5,6,7,8,9,0) override def run(): Unit = {
for (v <- a) {out.send(v);Thread.sleep(100);
}out.close
}}
abstract class Consumer extends Runnable { val in: InPort[Int] override def run(): Unit = {
while(!in.eof) { try {
println(in.receive); } catch { case e : NoSuchElementException => }}
}}
object SimpleNet_App extends App {
val c = new ChannelWithEndPoints[Int]
object producer extends Producer{ val out = c.dataSink } object consumer extends Consumer{ val in = c.dataSource } new Thread(producer).start() new Thread(consumer).start()}
Seite 17
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion via Dependency Injection – 1
Reflection erlaubt die programmatische Manipulation von Code zur Laufzeit
Sinnvoll, beispielsweise wenn Prozess-Netze mit einer DSL beschrieben und dynamisch konstruiert werden sollen.
class Producer extends Runnable {
val out: OutPort[Int] = null
val a = List(1,2,3,4,5,6,7,8,9,0)
override def run(): Unit = {for (v <- a) {
out.send(v);Thread.sleep(100);
}out.close
}}
class Consumer extends Runnable {
val in: InPort[Int] = null
override def run(): Unit = {
while(!in.eof) { try {
println(in.receive); } catch { case e : NoSuchElementException => }}
}}
object Simple_ReflectNet_App extends App {
val channel = new ChannelWithEndPoints[Int]
val producerClz = Class.forName("my_package.Producer") val consumerClz = Class.forName("my_package.Consumer") val producer: Producer = producerClz.newInstance().asInstanceOf[Producer] val consumer: Consumer = consumerClz.newInstance().asInstanceOf[Consumer] val ru = scala.reflect.runtime.universe val mirror = ru.runtimeMirror(getClass.getClassLoader) val producerMirror = mirror.reflect(producer) val outField = ru.typeOf[Producer].decl(ru.TermName("out")).asTerm.accessed.asTerm val producerOutField = producerMirror.reflectField(outField) producerOutField.set(channel.dataSink) val consumerMirror = mirror.reflect(consumer) val inField = ru.typeOf[Consumer].decl(ru.TermName("in")).asTerm.accessed.asTerm val consumerInField = consumerMirror.reflectField(inField) consumerInField.set(channel.dataSource) new Thread(producer).start() new Thread(consumer).start()}
Die Namen der Klassen und Ports könnten aus einer Konfigurationsdatei (Prozess-Netz-DSL) entnommen werden.
Seite 18
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion via Dependency Injection – 2
Annotationen können verwendet werden, um die Laufzeit-Manipulation des Codes (die Injection) zu steuern
Mit Annotationen können beispielsweise Ports markiert werden, die dann mit Kanal-Enden besetzt werden.
Annotationen + Reflection-Mechanismen
werden sinnvollerweise in einer Basisklasse für Prozess-Knoten konzentriert
NodeBaisklasse der Knoten mit annotierten Ports+ (Reflection -) Mechansimen zur Modifikation
Producer Consumer
Seite 19
class INPORT_ANO extends scala.annotation.StaticAnnotationclass OUTPORT_ANO extends scala.annotation.StaticAnnotation
abstract class Node extends Runnable { import scala.reflect.runtime.universe._ val typeMirror = runtimeMirror(this.getClass.getClassLoader) val instanceMirror = typeMirror.reflect(this) val members = instanceMirror.symbol.typeSignature.members def fieldMirror(symbol: Symbol) = instanceMirror.reflectField(symbol.asTerm) def annotatedfields = members.filter(f => f.annotations.length > 0) def outportfields = annotatedfields.filter(f => { val a = f.annotations.toList(0).tree.tpe val b = typeOf[OUTPORT_ANO] (a == b)} ) def inportfields = annotatedfields.filter(f => { val a = f.annotations.toList(0).tree.tpe val b = typeOf[INPORT_ANO] (a == b)} ) val myInportFields = inportfields.foldLeft(Map[String, reflect.runtime.universe.FieldMirror]())( (m, f) => m + (f.name.toString().trim -> instanceMirror.reflectField(f.asTerm) ) ) val myOutportFields = outportfields.foldLeft(Map[String, reflect.runtime.universe.FieldMirror]())( (m, f) => m + (f.name.toString().trim -> instanceMirror.reflectField(f.asTerm) ) ) def setInPort(name: String, value: Any) { myInportFields(name).set(value) } def setOutPort(name: String, value: Any) { myOutportFields(name).set(value) } }
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion via Dependency Injection – 2a
Node
Definition von Annotationen für Ports
Basisklasse für Knoten
Annotierten InputPort setzen
Annotierten OutputPort setzen
mit INPORT_ANO annotierte Felder
mit OUTPORT_ANO annotierte Felder
Seite 20
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion via Dependency Injection – 2b
class Producer extends Node { @OUTPORT_ANO val out: OutPort[Int] = null val a = List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15) override def run(): Unit = {
for (v <- a) {out.send(v);Thread.sleep(100);
}out.close
}}
class Consumer extends Node { @INPORT_ANO val in: InPort[Int] = null override def run(): Unit = {
while(!in.eof) { try {
println(in.receive); } catch { case e : NoSuchElementException => }}
}}
object Simple_ReflectNet_App extends App { val channel = new ChannelWithEndPoints[Int]
val producerClz = Class.forName("processNet.refectAnnoBinding.Producer") val consumerClz = Class.forName("processNet.refectAnnoBinding.Consumer") val producer: Producer = producerClz.newInstance().asInstanceOf[Producer] val consumer: Consumer = consumerClz.newInstance().asInstanceOf[Consumer] producer.setOutPort("out", channel.dataSink) consumer.setInPort("in", channel.dataSource)
new Thread(producer).start() new Thread(consumer).start()
}
Ports werden annotiert
Annotierte Ports können zu Laufzeit ersetzt werden
Seite 21
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion via Dependency Injection – 3
Typesafe Config Library
– basiert auf HOCON Human-Optimized Config Object Notation
JSON-artige Notation (JSON-Erweiterung)
siehe: https://github.com/typesafehub/config/blob/master/HOCON.md
– API
Package com.typesafe.config
Bestandteil vonAkka (http://akka.io/downloads/)
Doku. siehe: http://typesafehub.github.io/config/latest/api/
Beispiel
import com.typesafe.config.ConfigFactory
val conf = ConfigFactory.parseString( """net{ channels : [ channel1 ] nodes : [ { name : node1, type : Producer }, { name : node2, type : Consumer } ] connections : [ { connect: node1.outPort, to: channel1.sink } { connect: node2.inPort, to: channel1.source }, ] }""")
Netz-Beschreibung in HOCON: Produzent und
Konsument erzeugen und über einen Kanal verbinden.
Ziel: Aus derartigen Beschreibungen ein aktives
Prozessnetz erzeugen!
node2node1 channel1
Seite 22
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion via Dependency Injection – 4a
import com.typesafe.config._import scala.collection.JavaConversions._ object Simple_HoconRefectNet_App extends App {
val conf = ConfigFactory.parseString( """net{ channels : [ channel1 ] nodes : [ { name : node1, type : Producer }, { name : node2, type : Consumer } ] connections : [ { connect: node1.out, to: channel1.sink } { connect: node2.in, to: channel1.source }, ] }""")
Konfiguration: Beschreibung des Prozessnetzes
val channels : Map[String, ChannelWithEndPoints[Int]] = conf.getList("net.channels").foldLeft( Map[String, ChannelWithEndPoints[Int]]() )( (m, c) => m + (c.unwrapped().toString().trim -> new ChannelWithEndPoints[Int]) )
Kanäle entsprechend der Konfiguration erzeugen
Seite 23
Statische Prozessnetze / Topologie
Beispiel: dynamische Netzkonstruktion via Dependency Injection – 4b
Knoten und Kanäle entsprechend der Konfiguration verbinden,und Knoten aktivieren.
val nodes : Map[String, Node] = conf.getList("net.nodes") .foldLeft( Map[String, Node]() )( (m, n) => { val nodeInfo = n.unwrapped().asInstanceOf[java.util.Map[String,String]] val nodeName = nodeInfo.get("name"); val className = nodeInfo.get("type"); val fullClassname = "processNet.refectAnnoBinding." + className val nodeClass = Class.forName(fullClassname) m + (nodeName -> nodeClass.newInstance.asInstanceOf[Node]) } )
for (con <- conf.getList("net.connections")) { val c = con.unwrapped().asInstanceOf[java.util.Map[String,String]] val nodeName = c("connect").trim.split("\\.")(0).trim val nodePort = c("connect").trim.split("\\.")(1).trim val channelName = c("to").trim.split("\\.")(0).trim val channelEnd = c("to").trim.split("\\.")(1).trim val nodeInstance = nodes(nodeName) val channelInstance = channels(channelName).asInstanceOf[ChannelWithEndPoints[Int]]
channelEnd match { case "sink" => nodeInstance.setOutPort(nodePort, channelInstance.dataSink) case "source" => nodeInstance.setInPort(nodePort, channelInstance.dataSource) } } nodes.foreach( nameNode => { new Thread(nameNode._2).start } )}
Knoten entsprechend der Konfiguration erzeugen.