23
Verteilte Algorithmen TI5005 Th. Letschert TH Mittelhessen Gießen University of Applied Sciences Netze asynchroner Prozesse – Netze aus aktiven Knoten und Kanälen – dynamische und statische Prozessnetze – Topologiedefinition und Netzkonstruktion

Verteilte Algorithmen TI5005 - homepages.thm.dehg51/Veranstaltungen/VA-1415/Folien/va-03.pdf · Seite 2 Prozessnetze Prozesse – sind aktive Einheiten die in blockierenden Anweisungen

Embed Size (px)

Citation preview

Verteilte Algorithmen TI5005Th. Letschert

TH Mittelhessen Gießen

University of Applied Sciences

Netze asynchroner Prozesse– Netze aus aktiven Knoten und Kanälen– dynamische und statische Prozessnetze– Topologiedefinition und Netzkonstruktion

Seite 2

Prozessnetze

Prozesse

– sind aktive Einheiten die in blockierenden Anweisungensuspendiertwerden können (eigener Stack!)

– haben ausschließlich Zugriff auf lokale Ressourcen (Speicher, ...)

– können diskrete Aktionen ausführenLokale Aktionen: interne BerechnungenNachrichten senden Nachrichten empfangen

Prozessnetze– sind verteilte Systeme

mit statischer Topologie: fixe Mengen von Prozessen mit festen Beziehungen und aktiven Knoten: Prozesse / Threads

– Prozesse können sich Nachrichten über ein (irgendwie geartetes) Kommunikationssystem senden

– Topologie: Welcher Prozess kann welchem anderen Nachrichten senden

– Beispiel: Kanal-basiertes Kommunikationssystem:

Die Prozesse sind durch uni- oder bidirektionale Punkt-zu-Punkt Kanäle verbunden

P1

P4

P2

P3 P5

Kanal

Prozess

Seite 3

Prozessnetze

Beispiel: Schreiber und Leser Pseudocode

channel k;    reader = process {    var v;    v = k.receive    do {          case (v != EOS) ­> println(v); v = k.receive    }  }

 writer = process {    var v = v0;    v = k.receive    for ( v <­ (v1, v2 …) {          k.send(v)    }    out.send(EOS)  }

readerk

EOS : End of Stream

writer

Seite 4

Prozessnetze

Beispiel Mergesort Verteiltes Mergesort

Sortieren mit einem Netz aus mischenden Prozessen

2

5

6

7

1

4

3

8

5,4

7 1

8 6

3 2

8 6 3 2

7 5 4 1

8 7 6 5 4 3 2 1

Seite 5

Prozessnetze

Beispiel Mergesort Misch-ProzessPseudocode

channel in1, in2, out;     Merge = process {    var v1, v2;    v1 = in1.receive    v2 = in2.receive    do {         case (v1 != EOS and v2 != EOS) ­>           case {             v1 <= v2 ­> out.send(v1); v1 = in1.receive;             v2 <= v1 ­> out.send(v2); v2 = in2.receive;           }        case (v1 != EOS and v2 == EOS) ­>  out.send(v1); v1 = in1.receive;        case (v1 == EOS and v2 != EOS) ­>  out.send(v2); v2 = in2.receive;    }    out.send(EOS)  }

in1

in2

out

EOS : End of Stream

Seite 6

Kanäle und Ports

Kanal: virtuell oder real Virtuell: Bekanntschaft von Prozessen

Ein Kanal definiert / ist eine Bekanntschaft von Knoten:

Knoten können Knoten die sie „kennen“ Nachrichten senden.

Real: KommunikationsmediumEin Kanal kann als reales physisches Medium der Kommunikation verstanden werden.

Beide Sichten sind prinzipiell äquivalent

– im ersten Fall haben wir ein zentrales Kommunikationsmedium

– im zweiten Fall ist das Kommunikationsmedium verteilt:

Das Kommunikationsmedium besteht aus der Menge der Kanäle

receiverk

sender

receiversender

Bekanntschaft

Nachrichtentransport

k

Bekanntschaft

Nachrichtentransport

Seite 7

Kanäle und Ports

Kanal-Kapazität Kapazität 0

Kanal puffert nichtKommunikation ist synchronKanäle mit Kapazität 0: „Leitungen“

Kapazität >0 Kanal puffertKommunikation ist asynchronVirtuelle Kanäle mit Kapazität >0 sind möglich:Der empfangende (oder der sendende) Prozess puffert

Puffer

Puffer

Virtueller Kanal, Empfänger puffert:asynchrone Kommunikation

Virtueller Kanal, Empfänger puffert nicht: synchrone Kommunikation

Kanal puffert: asynchrone Kommunikation

Kanal puffert nicht: synchrone Kommunikation

Seite 8

Kanäle und Ports

PortsProzess: Definition und InstanzNetze enthalten oft viele gleichartige Prozesse, es ist darum sinnvoll zwischen der Definition eines Prozesses und der einer Prozessinstanz zu unterscheiden.

Problem: generische Partnerwahl Wie kann man sich in einer Prozessdefinition auf Kanäle / Kommunikationspartner beziehen, die in sich in Prozessinstanzen unterscheiden?

Port-KonzeptIn Prozessdefinitionen beziehen sich Kommunikationsanweisungen auf Ports, in Prozessinstanzen sind Ports zu Kanälen / Kommunikationspartner aufgelöst / gebunden.

Intuitiv: Knoten werden instantiiert und dann an Ports verbunden.

Definition

Instantiierung und Verbindung durch Kanäle

Seite 9

Kanäle und Ports

Port: real oder virtuellVirtuell Ports haben keine physische Existenz in Instantiierungen der Knoten. (D.h. es gibt sie nicht zur „Laufzeit“)

Abhängig vom Kanalkonzept werden sie realisiert als

– weitere (Adress-) Bestandteile der Nachrichten (zentrales Kommunikationsmedium)

– Endpunkte von Kanälen (verteiltes Kommunikationsmedium)

RealPorts sind reale Bestandteile von instantiierten Knoten.

Insgesamt gibt es eine breite Palette an Implementierungsmöglichkeiten für Prozesse und Netzen aus miteinander verbundenen Prozessen.

Seite 10

Prozessnetze

Statische und dynamische Prozessnetze

Statische ProzessnetzeZur Laufzeit ist die Topologie des Netzes fix:

– Keine Erzeugung neuer Knoten

– Keine Erzeugung neuer Kanäle / Prozessbekanntschaften

Statische Prozessnetze werden in drei Phasen erzeugt und ausgeführt:

– Definition von Prozessen (Knoten-Typen)

– Definition der Topologie: Knoten (Prozessinstanzen) und ihre Verbindung

– Ausführung der Prozesse

Dynamische ProzessnetzeDie Topologie des Netzes ist während der Laufzeit nicht fix:

– Neue Knoten können erzeugt werden

– Kanäle / Prozessbekanntschaften können erzeugt und modifiziert werden

Die Ausführung dynamischer Prozessnetze kann nicht in Phasen unterteilt werden

Seite 11

Dynamische Prozessnetze

Dynamische ProzessnetzeDie Topologie des Netzes kann zur Laufzeit konstruiert / verändert werden

Beispiel: einfaches „selbst-entfaltendes“ Prozess-Netz für Mergesort

object MergeNode { val EOF = -1}

class MergeNode(out: Channel[Int], lst: List[Int]) extends Runnable { import MergeNode.EOF override def run(): Unit = {

if (lst.length == 1) { out.send(lst(0)) out.send(EOF) out.close()

} else {

val in_1 = new Channel[Int] val in_2 = new Channel[Int] val n = lst.length val pred_1 = new MergeNode(in_1, lst.take(n/2)) val pred_2 = new MergeNode(in_2, lst.drop(n/2)) new Thread(pred_1).start() new Thread(pred_2).start()

. . .

}

}

object Printer extends Thread { val in = new Channel[Int] override def run(): Unit = { while (!in.eof()) { val v = in.receive() if (v!= MergeNode.EOF) println("\t Printer received " +v) } println("Printer finished") }}

object Merge_App extends App { val lst = List(1,3,5,7,2,4,6,8) val merger = new MergeNode(Printer.in, lst) Printer.start() new Thread(merger).start() }

Seite 12

Statische Prozessnetze / Topologie

Definition der Netztopologie: Prozesse erzeugen zu Netzen verbinden Das Port-Konzept erlaubt es die Bindung

Prozess ~> Kommunikationsendpunkt

von der Zeit der Definition des Prozesses zu verschieben auf die Zeit der Topologiedefinition.

Mit später können verschiedene Zeitpunkte gemeint sein

– statische Topologiedefinition: Topologie im Quellcode festlegen:

Bindung zum Zeitpunkt der Definition des Netzes im Quellcode

– dynamische Topologiedefinition: Netzerzeugung als Aktion zur Laufzeit

Übergabe von Referenzen, z.B. als Konstrutor-Parameter

Dependency-Injection zur Laufzeit (vie Refection oder DI-Framework)

Seite 13

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion 1

Prozesse mit Kanälen als Parameter

Channel

out in

class Producer(out: Channel[Int]) extends Runnable { val a = List(1,2,3,4,5,6,7,8,9,0) override def run(): Unit = {

for (v <- a) {out.send(v);Thread.sleep(100);

}out.close

}}

class Consumer(in: Channel[Int]) extends Runnable { override def run(): Unit = {

while(!in.eof()) { try {println(in.receive()); } catch { case e : NoSuchElementException => }}

}}

object SimpleNet_App extends App { val c = new Channel[Int] val producer = new Producer(c) val consumer = new Consumer(c) new Thread(producer).start() new Thread(consumer).start()}

producer consumer

Parameterübergabe

Seite 14

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion – 2a

Prozesse mit Kanal-Endpunkten als Parameter

An den Konstruktor der Prozesse werden Kanal-Referenzen übergeben.

Damit sind die Ports nicht auf Eingabe oder Ausgabe spezialisiert, eine falsche Nutzung ist möglich (Lesen von einen Ausgabe-Port): Kein echtes Port-Konzept.

Channel

out in

producer consumer

Channel

DataSourceDataSink

producer consumer

Besser: An den Konstruktor der Prozesse werden Referenzen auf Kanal-Endpunkte übergeben.Damit sind die Ports auf Eingabe oder Ausgabe spezialisiert. Eine falsche Nutzung der wird bereits zur Übersetzungszeit entdeckt.

besser

Parameterübergabe

Seite 15

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion – 2b

Prozesse mit Kanal-Endpunkten als ParameterChannel

DataSourceDataSink

OutPort InPorttrait InPort[T] { def receive: T def eof: Boolean}

trait OutPort[T] { def send(v:T) def close: Unit}

class ChannelWithEndPoints[T](implicit classTagOfT: ClassTag[T]) extends DataSourceProvider[T] with DataSinkProvider[T] {

val channel: Channel[T] = new Channel[T]

object inport extends InPort[T] { def receive: T = channel.receive def eof: Boolean = channel.eof }

object outport extends OutPort[T] { def send(v:T) : Unit = channel.send(v) def close: Unit = channel.close() }

def dataSource : InPort[T] = inport def dataSink : OutPort[T] = outport}

trait DataSourceProvider[T] { def dataSource : InPort[T]}

trait DataSinkProvider[T] { def dataSink : OutPort[T]}

class Producer(out: OutPort[Int]) extends Runnable { val a = List(1,2,3,4,5,6,7,8,9,0) override def run(): Unit = {

for (v <- a) {out.send(v);Thread.sleep(100);

}out.close

}}

class Consumer(in: InPort[Int]) extends Runnable { override def run(): Unit = {

while(!in.eof) { try {

println(in.receive); } catch { case e : NoSuchElementException => }}

}}

object SimpleNet_App extends App { val c = new ChannelWithEndPoints[Int] val producer = new Producer(c.dataSink) val consumer = new Consumer(c.dataSource) new Thread(producer).start() new Thread(consumer).start()}

Seite 16

Statische Prozessnetze / Topologie

Beispiel statische Netzkonstruktion

Bindung als Modulbindung

Knoten (Prozesse) enthalten Ports als abstrakte Objekt-Variablen (abstract fields)

Im Kontext eines Netzes sind die Kommunikations-Endpunkte bekannt mit denen sie belegt werden.

Werden Knoten zu Objekten instantiiert, dann werden Ports die Endpunkte eines Kanals als Wert zugewiesen.

Ports sind abstrakte Klassenmitglieder die in Instanzen (Objekten) durch die Kommunikations-Endpunkte eines blockierenden Puffers ersetzt werden.

Die Module (Quellcode-Komponenten) - Writer-Objekt- Reader-Objekt- SimpleNet_App-Objektwerden statisch im SimpleNet_App-Objekt gebunden.

abstract class Producer extends Runnable { val out: OutPort[Int] val a = List(1,2,3,4,5,6,7,8,9,0) override def run(): Unit = {

for (v <- a) {out.send(v);Thread.sleep(100);

}out.close

}}

abstract class Consumer extends Runnable { val in: InPort[Int] override def run(): Unit = {

while(!in.eof) { try {

println(in.receive); } catch { case e : NoSuchElementException => }}

}}

object SimpleNet_App extends App {

val c = new ChannelWithEndPoints[Int]

object producer extends Producer{ val out = c.dataSink } object consumer extends Consumer{ val in = c.dataSource } new Thread(producer).start() new Thread(consumer).start()}

Seite 17

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion via Dependency Injection – 1

Reflection erlaubt die programmatische Manipulation von Code zur Laufzeit

Sinnvoll, beispielsweise wenn Prozess-Netze mit einer DSL beschrieben und dynamisch konstruiert werden sollen.

class Producer extends Runnable {

val out: OutPort[Int] = null

val a = List(1,2,3,4,5,6,7,8,9,0)

override def run(): Unit = {for (v <- a) {

out.send(v);Thread.sleep(100);

}out.close

}}

class Consumer extends Runnable {

val in: InPort[Int] = null

override def run(): Unit = {

while(!in.eof) { try {

println(in.receive); } catch { case e : NoSuchElementException => }}

}}

object Simple_ReflectNet_App extends App {

val channel = new ChannelWithEndPoints[Int]

val producerClz = Class.forName("my_package.Producer") val consumerClz = Class.forName("my_package.Consumer") val producer: Producer = producerClz.newInstance().asInstanceOf[Producer] val consumer: Consumer = consumerClz.newInstance().asInstanceOf[Consumer] val ru = scala.reflect.runtime.universe val mirror = ru.runtimeMirror(getClass.getClassLoader) val producerMirror = mirror.reflect(producer) val outField = ru.typeOf[Producer].decl(ru.TermName("out")).asTerm.accessed.asTerm val producerOutField = producerMirror.reflectField(outField) producerOutField.set(channel.dataSink) val consumerMirror = mirror.reflect(consumer) val inField = ru.typeOf[Consumer].decl(ru.TermName("in")).asTerm.accessed.asTerm val consumerInField = consumerMirror.reflectField(inField) consumerInField.set(channel.dataSource) new Thread(producer).start() new Thread(consumer).start()}

Die Namen der Klassen und Ports könnten aus einer Konfigurationsdatei (Prozess-Netz-DSL) entnommen werden.

Seite 18

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion via Dependency Injection – 2

Annotationen können verwendet werden, um die Laufzeit-Manipulation des Codes (die Injection) zu steuern

Mit Annotationen können beispielsweise Ports markiert werden, die dann mit Kanal-Enden besetzt werden.

Annotationen + Reflection-Mechanismen

werden sinnvollerweise in einer Basisklasse für Prozess-Knoten konzentriert

NodeBaisklasse der Knoten mit annotierten Ports+ (Reflection -) Mechansimen zur Modifikation

Producer Consumer

Seite 19

class INPORT_ANO extends scala.annotation.StaticAnnotationclass OUTPORT_ANO extends scala.annotation.StaticAnnotation

abstract class Node extends Runnable { import scala.reflect.runtime.universe._ val typeMirror = runtimeMirror(this.getClass.getClassLoader) val instanceMirror = typeMirror.reflect(this) val members = instanceMirror.symbol.typeSignature.members def fieldMirror(symbol: Symbol) = instanceMirror.reflectField(symbol.asTerm) def annotatedfields = members.filter(f => f.annotations.length > 0) def outportfields = annotatedfields.filter(f => { val a = f.annotations.toList(0).tree.tpe val b = typeOf[OUTPORT_ANO] (a == b)} ) def inportfields = annotatedfields.filter(f => { val a = f.annotations.toList(0).tree.tpe val b = typeOf[INPORT_ANO] (a == b)} ) val myInportFields = inportfields.foldLeft(Map[String, reflect.runtime.universe.FieldMirror]())( (m, f) => m + (f.name.toString().trim -> instanceMirror.reflectField(f.asTerm) ) ) val myOutportFields = outportfields.foldLeft(Map[String, reflect.runtime.universe.FieldMirror]())( (m, f) => m + (f.name.toString().trim -> instanceMirror.reflectField(f.asTerm) ) ) def setInPort(name: String, value: Any) { myInportFields(name).set(value) } def setOutPort(name: String, value: Any) { myOutportFields(name).set(value) } }

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion via Dependency Injection – 2a

Node

Definition von Annotationen für Ports

Basisklasse für Knoten

Annotierten InputPort setzen

Annotierten OutputPort setzen

mit INPORT_ANO annotierte Felder

mit OUTPORT_ANO annotierte Felder

Seite 20

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion via Dependency Injection – 2b

class Producer extends Node { @OUTPORT_ANO val out: OutPort[Int] = null val a = List(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15) override def run(): Unit = {

for (v <- a) {out.send(v);Thread.sleep(100);

}out.close

}}

class Consumer extends Node { @INPORT_ANO val in: InPort[Int] = null override def run(): Unit = {

while(!in.eof) { try {

println(in.receive); } catch { case e : NoSuchElementException => }}

}}

object Simple_ReflectNet_App extends App { val channel = new ChannelWithEndPoints[Int]

val producerClz = Class.forName("processNet.refectAnnoBinding.Producer") val consumerClz = Class.forName("processNet.refectAnnoBinding.Consumer") val producer: Producer = producerClz.newInstance().asInstanceOf[Producer] val consumer: Consumer = consumerClz.newInstance().asInstanceOf[Consumer] producer.setOutPort("out", channel.dataSink) consumer.setInPort("in", channel.dataSource)

new Thread(producer).start() new Thread(consumer).start()

}

Ports werden annotiert

Annotierte Ports können zu Laufzeit ersetzt werden

Seite 21

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion via Dependency Injection – 3

Typesafe Config Library

– basiert auf HOCON Human-Optimized Config Object Notation

JSON-artige Notation (JSON-Erweiterung)

siehe: https://github.com/typesafehub/config/blob/master/HOCON.md

– API

Package com.typesafe.config

Bestandteil vonAkka (http://akka.io/downloads/)

Doku. siehe: http://typesafehub.github.io/config/latest/api/

Beispiel

import com.typesafe.config.ConfigFactory

val conf = ConfigFactory.parseString( """net{ channels : [ channel1 ] nodes : [ { name : node1, type : Producer }, { name : node2, type : Consumer } ] connections : [ { connect: node1.outPort, to: channel1.sink } { connect: node2.inPort, to: channel1.source }, ] }""")

Netz-Beschreibung in HOCON: Produzent und

Konsument erzeugen und über einen Kanal verbinden.

Ziel: Aus derartigen Beschreibungen ein aktives

Prozessnetz erzeugen!

node2node1 channel1

Seite 22

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion via Dependency Injection – 4a

import com.typesafe.config._import scala.collection.JavaConversions._ object Simple_HoconRefectNet_App extends App {

val conf = ConfigFactory.parseString( """net{ channels : [ channel1 ] nodes : [ { name : node1, type : Producer }, { name : node2, type : Consumer } ] connections : [ { connect: node1.out, to: channel1.sink } { connect: node2.in, to: channel1.source }, ] }""")

Konfiguration: Beschreibung des Prozessnetzes

val channels : Map[String, ChannelWithEndPoints[Int]] = conf.getList("net.channels").foldLeft( Map[String, ChannelWithEndPoints[Int]]() )( (m, c) => m + (c.unwrapped().toString().trim -> new ChannelWithEndPoints[Int]) )

Kanäle entsprechend der Konfiguration erzeugen

Seite 23

Statische Prozessnetze / Topologie

Beispiel: dynamische Netzkonstruktion via Dependency Injection – 4b

Knoten und Kanäle entsprechend der Konfiguration verbinden,und Knoten aktivieren.

val nodes : Map[String, Node] = conf.getList("net.nodes") .foldLeft( Map[String, Node]() )( (m, n) => { val nodeInfo = n.unwrapped().asInstanceOf[java.util.Map[String,String]] val nodeName = nodeInfo.get("name"); val className = nodeInfo.get("type"); val fullClassname = "processNet.refectAnnoBinding." + className val nodeClass = Class.forName(fullClassname) m + (nodeName -> nodeClass.newInstance.asInstanceOf[Node]) } )

for (con <- conf.getList("net.connections")) { val c = con.unwrapped().asInstanceOf[java.util.Map[String,String]] val nodeName = c("connect").trim.split("\\.")(0).trim val nodePort = c("connect").trim.split("\\.")(1).trim val channelName = c("to").trim.split("\\.")(0).trim val channelEnd = c("to").trim.split("\\.")(1).trim val nodeInstance = nodes(nodeName) val channelInstance = channels(channelName).asInstanceOf[ChannelWithEndPoints[Int]]

channelEnd match { case "sink" => nodeInstance.setOutPort(nodePort, channelInstance.dataSink) case "source" => nodeInstance.setInPort(nodePort, channelInstance.dataSource) } } nodes.foreach( nameNode => { new Thread(nameNode._2).start } )}

Knoten entsprechend der Konfiguration erzeugen.