Verteilte Algorithmen TI5005Th. Letschert
TH Mittelhessen Gießen
University of Applied Sciences
Netzwerk-Programmierung mit Scala / Akka– Klassische Netzwerk-Programmierung in Scala– akka.io
– Entfernte Aktoren – Entfernte Kommunikation als Pysical- und Link-Layer
Seite 2
Java-Netzwerk-Programmierung
Klassische Java-Netzwerk-I/OIn Scala steht der komplette Satz Java-Klassen zur Verfügung
Dieser kann für Netzwerk-I/O eingesetzt werden.
TCP-KommunikationBeispiel einfache Echo-Anwendung (1: Server)
import java.io.{ BufferedReader, IOException, InputStreamReader, PrintWriter}import java.net.{Socket, ServerSocket}import scala.util.{Try, Success, Failure}
object EchoServer_App extends App { val ECHO_PORT = 4713; val serverSocket : ServerSocket = new ServerSocket(ECHO_PORT); for (i <- 1 to 10) { val sock = serverSocket.accept(); val pw = new PrintWriter(sock.getOutputStream()); val stream = new InputStreamReader(sock.getInputStream()); val reader = new BufferedReader(stream); val msg = reader.readLine(); val response = "Echo: "+ msg; pw.println(response); pw.flush(); reader.close(); pw.close(); sock.close(); } serverSocket.close()}
Seite 3
Java-Netzwerk-Programmierung
TCP-KommunikationBeispiel einfache Echo-Anwendung (2: Client)
import java.net.Socketimport java.io.{BufferedReader, InputStreamReader, PrintWriter}
object EchoClientApp extends App { val ECHO_PORT = 4713;
val socket = new Socket("127.0.0.1", ECHO_PORT) val pw = new PrintWriter(socket.getOutputStream()); val stream = new InputStreamReader(socket.getInputStream()); val reader = new BufferedReader(stream);
pw.println("Hallo"); pw.flush();
println("Client awaits echo"); val response = reader.readLine(); println("Received from Server: "+ response);
reader.close(); pw.close(); socket.close(); }
Seite 4
Java-Netzwerk-Programmierung
UDP-Kommunikation Beispiel einfache Echo-Anwendung
import java.net.{DatagramPacket, DatagramSocket, InetAddress}
object EchoServer_App extends App { val ECHO_PORT = 4713; val dtgrmSocket = new DatagramSocket(ECHO_PORT) val buf = new Array[Byte](256) val rcvpkt = new DatagramPacket(buf, buf.length);
while (true) { dtgrmSocket.receive(rcvpkt); println("Server hat Daten empfangen: " + new String(rcvpkt.getData(), 0, rcvpkt.getLength())); val clientAdr = rcvpkt.getAddress(); val clientPort = rcvpkt.getPort(); println("Von Rechner: " + clientAdr.getHostAddress() + " an Port: "+clientPort); val sndpkt = new DatagramPacket(buf, buf.length, clientAdr, clientPort); dtgrmSocket.send(sndpkt); } }
import java.net.{DatagramPacket, DatagramSocket, InetAddress, SocketException}
object EchoClient_App extends App {
val ECHO_PORT = 4713; val host = "127.0.0.1" val dtgrmSocket = new DatagramSocket() val serverAddress = InetAddress.getByName(host); val msg = "Hallo wer da?"; val buf = msg.getBytes(); val pkt = new DatagramPacket(buf, buf.length, serverAddress, ECHO_PORT); dtgrmSocket.send(pkt); dtgrmSocket.receive(pkt); println("Client hat empfangen: " + new String(pkt.getData())); }
Server
Client
Seite 5
Java-Netzwerk-Programmierung
NIO In Scala können natürlich auch alle modernen Kommunikations-Mittel von Java verwendet werden
– NIO (ab Java 1.4) : Channel, Selector – NIO.2 (ab Java 7) : Asynchrone I/O
NIO / NIO.2 ist schwer direkt nutzbar
meist nutzt man (low level) Frameworks, die NIO-Features zugänglich machen (z.B. Netty [http://netty.io/] )
Seite 6
Aktoren und entfernte Kommunikation
Aktoren und entfernte Kommunikation Aktoren und Netzwerk-Kommunikation können auf verschiedene Arten kombiniert werden:
– Lokal: Aktor; Entfernt: Thread-basierte Kommunikationfür die entfernte Kommunikation werden klassische Thread-basierte Kommunikations-Mechanismen verwendet
Lokal kommen bei Bedarf Aktoren zum Einsatz
Interaktion Aktor <~> Thread muss realisiert werden
– Lokale und Entfernte AktorenEinheitliches Modell mit Kommunikation von lokalen Aktoren und Aktoren auf anderen Knoten
Geeignet für Peer-to-Peer-Anwendungen mit Ortstransparenz der Aktoren
Basis des Akka-Clusterings (in Entwicklung)
wenig geeignet für Client-Server-Anwendungen
– Akka.ioKommunikation im Client-Server-Stil
mit Unterstützung der Interaktion von Aktoren und dem Kommunikationssystem
Seite 7
akka.io
akk.io ÜbersichtPackage akka.io mit Akka Version 2.2 (2013) eingeführt
– Scala-Framework
– Basiert auf / entspricht dem Modul spray.io des Spray-Frameworks [http://spray.io/] (gemeinsame Entwicklung von Spray und Typesafe)
– nutzt speziell speziell den Netzwerkcode von Spray
der auf NIO- / NIO.2-Features basiert
– ist vergleichbar mit Netty
Prinzip– ein Manager-Actor kontrolliert die IO
– die Anwendung kommuniziert mit dem Manager über Kommando-NachrichtenVerbindungs-Management (connect, … close) undNachrichten-Transfer (read, write)
Seite 8
akka.io
TCP-Beispiel ein einfacher Echo-Server (1)
import akka.actor.{ Actor, ActorRef, Props, ActorSystem}import akka.io.{ IO, Tcp }import akka.util.ByteStringimport java.net.InetSocketAddress
class ConnectionHandler extends Actor { … }
class Server(managerActor: ActorRef) extends Actor { … }
object Server_App extends App {
implicit val system = ActorSystem("TCPEchoSystem")
val managerActor = IO(Tcp) val server = system.actorOf(Props(classOf[Server], managerActor), "server") println("starting server as actor " + server) }
Ein Manager-Actor für TCP wird erzeugt. Er dient als Schnittstelle zu Kommunikations-Diensten.
Ein Server wird erzeugt, er erhält eine Referenz auf den Manager-Actor um über diesen seine Kommunikation abwickeln zu können.
Der Server nimmt Verbindungen an
Der ConnectionHandler bedient Verbindungen
Seite 9
akka.io
TCP-Beispiel ein einfacher Echo-Server (2)
class Server(managerActor: ActorRef) extends Actor {
val handler = self //The Bind message is send to the TCP manager actor in order to bind to a listening socket. //handler: The actor which will receive all incoming connection requests in the form of Connected messages managerActor ! Tcp.Bind(handler, new InetSocketAddress("127.0.0.1", 4711))
// The TCP manager will reply either with a CommandFailed, // or it will spawn an internal actor representing the new connection. // This new actor will then send a Connected message to the original sender of the Connect message. def receive = { case Tcp.CommandFailed(_: Tcp.Bind) => context stop self //The actor sending the Bind message will receive a Bound message signalling //that the server is ready to accept incoming connections; //this message also contains the InetSocketAddress to which the socket //was actually bound (i.e. resolved IP address and correct port number). case b @ Tcp.Bound(localAddress) => println("Server ready") // The connection actor sends this message either to the sender of a Connect command (for outbound) // or (here) to the handler for incoming connections designated in the Bind message. // The connection is characterized by the remoteAddress and localAddress TCP endpoints. case c @ Tcp.Connected(remote, local) => println("Server accepted connection from " + remote + ", at " + local) val connectionHandler = context.actorOf(Props[ConnectionHandler]) // sender: internal actor representing the new connection val connection = sender() // In order to activate the new connection a Register message // must be sent to the connection actor, informing that one about // who shall receive data from the socket connection ! Tcp.Register(connectionHandler) } }
Alle Kommentare sind weitgehend aus der API-Doku oder dem Tutorial übernommen.
Seite 10
akka.io
TCP-Beispiel ein einfacher Echo-Server (3)
Ein ConnectionHandler nimmt Daten an und sendet sie zurück. … Solange bis die Verbindung geschlossen wird.
class ConnectionHandler extends Actor {
def receive = { case Tcp.Received(data) => println("Server received " + data + " from " + sender()) // send back answer: sender() ! Tcp.Write(data) case Tcp.PeerClosed => println("Connection closed") context stop self case x: Any => println("Server received " + x) }
}
Seite 11
akka.io
TCP-Beispiel ein einfacher Echo-Client (1)
import akka.actor.{ Actor, ActorRef, Props, ActorSystem}import akka.io.{ IO, Tcp }import akka.util.ByteStringimport java.net.InetSocketAddressimport akka.actor.ActorDSL._
class Client(managerActor: ActorRef) extends Actor { …}
object Client_App extends App { implicit val system = ActorSystem("TCPEchoSystem")
val managerActor = IO(Tcp)
val client = system.actorOf(Props(classOf[Client], managerActor), "client") for (i <- 1 to 10) { val v = scala.io.StdIn.readLine() client ! Some(v) } Thread.sleep(500) client ! "close" Thread.sleep(500) system.shutdown()}
Ein Manager-Actor für TCP wird erzeugt. Er dient als Schnittstelle zu Kommunikations-Diensten.
Ein Client wird erzeugt, er erhält eine Referenz auf den Manager-Actor um über diesen seine Kommunikation abwickeln zu können.
Seite 12
akka.io
TCP-Beispiel ein einfacher Echo-Client (2)
class Client(managerActor: ActorRef) extends Actor { //The Connect message is sent to the TCP manager actor which will spawn an internal actor managerActor ! Tcp.Connect(new InetSocketAddress("127.0.0.1", 4711)) def receive = { case Tcp.CommandFailed(_: Tcp.Connect) => println("connect failed") context stop self // The connection actor sends this message either to the sender of a Connect command // (here, for outbound connections) or to the handler for incoming connections designated in the Bind message. // The connection is characterized by the remoteAddress and localAddress TCP endpoints. case c @ Tcp.Connected(remote, local) =>
println("connected to " + remote + " at " + local) // sender: internal actor representing the new connection val connection = sender() // This message must be sent to a TCP connection actor after receiving the Connected message. // The connection will not read any data from the socket until this message is received, because // this message defines the actor which will receive all inbound data. connection ! Tcp.Register(self) // register at connection-actor context become { // change behavior
nächste Folie: Verhalten des Actors nach Verbindungsaufbau
} }}
Alle Kommentare sind weitgehend aus der API-Doku oder dem Tutorial übernommen.
Seite 13
akka.io
TCP-Beispiel ein einfacher Echo-Client (3)
class Client(managerActor: ActorRef) extends Actor { … def receive = { … case c @ Tcp.Connected(remote, local) =>
context become { // change behavior
case Tcp.CommandFailed(w: Tcp.Write) => println("write failed")
case Tcp.Received(data) => println("client received from connection : " + data)
case "close" => connection ! Tcp.Close case _: Tcp.ConnectionClosed => println("connection closed") context stop self
// application msg case msg @ Some(x) => println("client received user msg " + msg) println("... and sends to manager " + ByteString(x.toString + "\n") + " to " + connection) connection ! Tcp.Write(ByteString(x.toString + "\n")) // data from network case data: ByteString => println("Client actor received: " + data) // other msg case x: Any => println("handler received: " + x) } … }}
Seite 14
akka.io
UDP-Beispiel ein einfacher Echo-Server
import akka.actor.{Actor, ActorRef, Props, ActorSystem}import akka.io.{IO, Udp}import akka.util.ByteStringimport java.net.InetSocketAddress
class Server(localAdr: InetSocketAddress) extends Actor { import context.system val managerActor = IO(Udp) ! Udp.Bind(self, localAdr) def receive = { case Udp.Bound(local) => context become { case Udp.Received(data, remote) => val client = sender() println("Server received " + data + " from " + remote) client ! Udp.Send(data, remote) case Udp.Unbind => val client = sender() client ! Udp.Unbind case Udp.Unbound => context.stop(self) } } }
object Server_App extends App { val PORT = 4712 val system = ActorSystem("UDPEchoSystem") val server = system.actorOf(Props(classOf[Server], new InetSocketAddress("127.0.0.1", PORT)), "server") println("starting server as actor " + server) }
Seite 15
akka.io
UDP-Beispiel ein einfacher Echo-Client (1)
import akka.actor.{Actor, ActorRef, Props, ActorSystem, PoisonPill}import akka.io.{IO, Udp}import akka.util.ByteStringimport java.net.InetSocketAddress
class Client(localAdr: InetSocketAddress, remoteAdr: InetSocketAddress) extends Actor { … siehe nächste Folie … }
object Client_App extends App { val MyPORT = 4711 val RemotePORT = 4712
val system = ActorSystem("UDPEchoSystem") val client = system.actorOf( Props(classOf[Client], new InetSocketAddress("127.0.0.1", MyPORT), new InetSocketAddress("127.0.0.1", RemotePORT)), "client") Thread.sleep(500) for (i <- 1 to 10) { val v = scala.io.StdIn.readLine() client ! "Msg Nr. " + v } Thread.sleep(500) client ! PoisonPill Thread.sleep(500) system.shutdown()
}
Seite 16
akka.io
UDP-Beispiel ein einfacher Echo-Client (2)
class Client(localAdr: InetSocketAddress, remoteAdr: InetSocketAddress) extends Actor { import context.system IO(Udp) ! Udp.Bind(self, localAdr) def receive = { case Udp.Bound(local) =>
val server = sender() context become { case msg: String => println("client sends " + msg) server ! Udp.Send(ByteString(msg), remoteAdr) case Udp.Received(data, remote) => println("Client received " + data + " from " + remote) } } }
Seite 17
akka.io
Bytestring akka.io basiert auf NIO
NIO nutzt java.nio.ByteBuffer
– sehr low-level und schwierig zu nutzen
akka.util.ByteString
– Schnittstelle zwischen Anwendung und ByteBuffer
Seite 18
akka.io
Serialisierung – Serialisierung ist ein zentrales Thema aller verteilten Anwendung
– Java Serialisierung (Serializable, DataInput- DataOutputStream, ObjectInput-, ObjectOutputStream...)
– Scala Serialisierung auf Basis von Java-Features
– Diverse Serialisierungs-Frameworks (z.B. Protocol Buffers https://code.google.com/p/protobuf/)
– Akka: erweiterbare konfigurierbare Serialisierung
Seite 19
akka.io
Beispiel: einfache Serialisierung von / nach ByteString abstract class Msgcase class StringMsg(str: String) extends Msgcase class NumberMsg(d1: Double, d2: Double) extends Msg zu übertragende Nachrichten
object Msg { implicit val byteOrder = java.nio.ByteOrder.BIG_ENDIAN
val STRING_MSG = 0 val NUMBER_MSG = 1 def decode(msg: ByteString): Msg = { val byteIter: ByteIterator = msg.iterator val msgType: Int = byteIter.getInt
msgType match { case STRING_MSG => StringMsg(getString(byteIter)) case NUMBER_MSG => NumberMsg(byteIter.getDouble, byteIter.getDouble); } }
def encode(msg: Msg) : ByteString = msg match { case StringMsg(str) => val strBytes = str.getBytes val strBytesLen = strBytes.length new ByteStringBuilder().putInt(STRING_MSG).putInt(strBytesLen).putBytes(str.getBytes).result
case NumberMsg(d1, d2) => new ByteStringBuilder().putInt(NUMBER_MSG).putDoubles(Array[Double](d1, d2)).result } def getString(iter: ByteIterator): String = { val length = iter.getInt val bytes = new Array[Byte](length) iter getBytes bytes ByteString(bytes).utf8String } }
1 | Double | Double
0 | Len | Char | … | Char
Übertragungsformat für 2 Sorten von
Nachrichten
Deserialisierung mit einem ByteIterator
Serialisierung mit einem ByteStringBuilder
object SerialTest_App extends App { println(Msg.decode(Msg.encode(StringMsg("Hallo")))) println(Msg.decode(Msg.encode(NumberMsg(1.5, 2.5))))}
StringMsg(Hallo)NumberMsg(1.5,2.5)
Seite 20
Entfernte Aktoren
Beispiel: Sender und Empfänger
Empfänger-Prozess:127.0.0.1:4711
Sender-Prozess:127.0.0.1:2525
Actor-System: DistSystem
ReceiverActor ForwarderActor
create create
Msg
Msg
JVM JVM
Seite 21
Entfernte Aktoren
Beispiel: Empfänger
import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
class ReceiverActor extends Actor { def receive = { case msg: String => println(s"Actor ${context.self} received $msg from $sender") case _ => println("Received non-string msg ") }}
object Receiver_App extends App { val config = ConfigFactory. parseString( """akka { loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 4711 } log-sent-messages = on log-received-messages = on } }""")
val system = ActorSystem("DistSystem",config)
val receiverActor = system.actorOf(Props[ReceiverActor], name = "receiverActor")
println(s" ${receiverActor.path} is ready")}
ReceiverActor
create
Dieser Aktor wird von einem entfernten Aktor angesprochen werden. Diese Tatsache ist für den Aktor selbst völlig transparent.
Das Aktorsystem muss konfiguriert werden, um entfernte Aktoren nutzen zu können. Die Konfiguration wird in der Regel in einer Konfigurationsdatei erfolgen. Hier wird sie der Übersicht halber programmatisch realisiert.
Der Empfänger ist ein eigenständiger Prozess
Seite 22
Entfernte Aktoren
Beispiel: Sender (1)
import akka.actor.{Actor, ActorSystem, ActorSelection, ActorRef, Props}import akka.util.Timeoutimport scala.concurrent.duration._import scala.concurrent.{Await, Future}import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
case object SendCmd
class ForwarderActor(remoteReceiverActor: ActorRef) extends Actor { def receive = { case SendCmd => remoteReceiverActor ! "Hello from remote sender actor!" }}
object Sender_App extends App { val config = ConfigFactory. parseString( """akka { loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] log-sent-messages = on log-received-messages = on netty.tcp { hostname = "127.0.0.1" port = 2526 } } }""") … nächste Folie … }
ForwarderActor
createMsg
Dieser Aktor leitet die Nachrichten des Threads an den entfernten Aktor weiter. Die Tastsache, dass die Nachricht an einen entfernten Aktor geht ist auch hier völlig irrelevant.
Auch auf der Sendeseite muss die Verwendung entfernter Aktoren konfiguriert werden.
Der Sender ist ein eigenständiger Prozess
Seite 23
Entfernte Aktoren
Beispiel: Sender (2)
object Sender_App extends App { val config = ... implicit val timeout = Timeout(3 seconds) val system = ActorSystem("DistSystem", config) // create actor selection: a path to a remote actor val receiverActorS : ActorSelection = system.actorSelection("akka.tcp://[email protected]:4711/user/receiverActor") println("receiverActor selection = " + receiverActorS) // resolve actor selection to actor path val futureActorR : Future[ActorRef] = receiverActorS.resolveOne val serverActorR : ActorRef = Await.result(futureActorR , 2.seconds) println("Remote receiverActorR resolved to: " + serverActorR)
// send to actor selection without resolving it to a actor ref // resolving will be done automatically receiverActorS ! "Greeting from sender" // create local actor via constructor with remote ActorRef parameter val forwarderActor = system.actorOf(Props( classOf[ForwarderActor], serverActorR), name = "remoteReceiverActor")
// send to local actor forwarderActor ! SendCmd
}
ActorSelection: Referenz auf einen Unterbaum der Aktorhierachie.
resolveOne: Suche die ActorReference die zur ActorSelection passt.
Sende direkt an eine ActorSelection
Erzeuge Forwarder übergib dabei Referenz auf entfernten Aktor
Seite 24
Entfernte Aktoren
Beispiel: Entfernten Aktor erzeugen
Empfänger-Prozess:127.0.0.1:4711
Sender-Prozess:127.0.0.1:2525
Actor-System: DistSystem
ReceiverActor ForwarderActor
create
Msg
Msgcreate
JVM JVM
Seite 25
Entfernte Aktoren
Beispiel: Entfernten Aktor erzeugen / Empfänger
import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
class ReceiverActor extends Actor { def receive = { case msg: String => println(s"Actor ${context.self} received $msg from $sender") case _ => println("Received non-string msg ") }}
object Receiver_App extends App { val config = ConfigFactory. parseString( """akka { //loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 4711 } log-sent-messages = on log-received-messages = on } }""")
val system = ActorSystem("DistSystem",config) //val receiverActor = system.actorOf(Props[ReceiverActor], name = "receiverActor") println("receiver process is ready")}
Empfänger-Aktor: Ein Aktor dieser Klasse wird von entfernt (von einem anderen Prozess) erzeugt werden.
Hier wird nur das Aktorsystem erzeugt und die Klasse der Aktoren bereit gestellt.
Seite 26
Entfernte Aktoren
Beispiel: Entfernten Aktor erzeugen / Sender
import akka.actor.{Actor, ActorSystem, ActorSelection, ActorRef, Props, AddressFromURIString, Deploy}import akka.util.Timeoutimport scala.concurrent.duration._import scala.concurrent.{Await, Future}import com.typesafe.config.{ConfigFactory, ConfigValueFactory}import akka.remote.RemoteScope
case object SendCmd
class ForwarderActor(remoteReceiverActor: ActorRef) extends Actor { def receive = { case SendCmd => remoteReceiverActor ! "Hello from remote sender actor!" }}
object Sender_App extends App { val config = ConfigFactory.parseString( """akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] log-sent-messages = on log-received-messages = on netty.tcp { hostname = "127.0.0.1" port = 2526 } }}""") val system = ActorSystem("DistSystem", config) val address = AddressFromURIString("akka.tcp://[email protected]:4711") val receiverActorR : ActorRef = system.actorOf(Props[ReceiverActor].withDeploy(Deploy(scope = RemoteScope(address))))
val forwarderActor = system.actorOf(Props(classOf[ForwarderActor], receiverActorR), name = "remoteReceiverActor")
forwarderActor ! SendCmd}
Die Klasse ReceiverActor muss hier in diesem Prozess auf dem Klassenpfad zur Verfügung stehen und ebenfalls auf dem Klassenpfad der entfernten Anwendung auf der der Aktor erzeugt werden wird. Akka überträgt (im Gegensatz zu RMI) keinen Klassencode!
Erzeugung eines Aktors auf einem entfernten Prozess.
Seite 27
Protokoll-Implementierungen
ProtokolleVerteilte Algorithmen werden als Protokolle implementiert
In der Regel sind in einem verteilten System mehrere Protokolle gleichzeitig aktiv
Oft interagieren die Protokolle
Schichtenstruktur: Typisches Interaktionsmuster von ProtokollenEin Protokoll nutzt die Dienste, die von einem anderen bereit gestellt werden
OSI Konzept der Schichten
Seite 28
Protokoll-Implementierung
Physical-Layer / Link-Layer, VerbindungsschichtUnterste Schichten eines Protokollstacks nach OSI und TCP/IP
Stellen den elementarsten Kommunikationsdienst bereit:
Kommunikation von direkt (auf Hardware-Ebne) verbundenen Rechnern
Protokolle der Verbindungsschicht
– verbindungslos (z.B. Ethernet)
oder verbindungs-orientiert (z.B. PPP)
– mit Broadcast (für Broadcast-Medien: Ethernet-HW)
oder ohne Broadcast (für Punkt-zu-Punkt-Medien: Kabel, Modem, etc.)
Seite 29
Protokoll-Implementierung
Beispiel: Emulation Physical-LayerAufgabe des Physical-Layers
– Bereitstellung von Netzwerkgeräten (Sender/Empfänger)
– für ihre Nutzer: Netzwerktreiber / Protokoll-Implementierungen
Emulation Netzwerk-GerätAufgabe des Geräts
– Senderoutine für Bytes
– Registrierung einer Callback-Routine für den Empfang von Bytes
trait NetworkDevice { // low level send def sendWire(msg: Array[Byte]) // register callback routine for this device def register(cB: Array[Byte] => Unit) : Unit = { callBack = cB } // callback routine for this device var callBack : Array[Byte] => Unit = { (bytes: Array[Byte]) => { throw new NoSuchMethodException } }
}
Seite 30
Protokoll-Implementierung
Emulation Netzwerk-GerätBeispiel akk.io via UDP als „Netzwerk-Gerät
import akka.actor.{Actor, ActorRef, Props, ActorSystem}import akka.io.{IO, Udp}import akka.util.{ByteString, ByteIterator}import java.net.InetSocketAddress
class UdpProxy(localAdr: InetSocketAddress, remoteAdr: InetSocketAddress, udpDevice: UDPNetworkDevice) extends Actor { import context.system IO(Udp) ! Udp.Bind(self, localAdr) def receive = { case Udp.Bound(local) => val partner = sender() context become { case Udp.Received(data, remote) => val byteIter: ByteIterator = data.iterator val bytes: Array[Byte] = data.toArray[Byte] udpDevice.callBack(bytes) case msg: Array[Byte] => partner ! Udp.Send(ByteString(msg), remoteAdr) } } }
class UDPNetworkDevice(myPort: Int, remotePort: Int)(implicit system: ActorSystem) extends NetworkDevice { val proxy = system.actorOf( Props(classOf[UdpProxy], new InetSocketAddress("127.0.0.1", myPort), new InetSocketAddress("127.0.0.1", remotePort), this), "udpInterface") override def sendWire(msg: Array[Byte]) : Unit = { proxy ! msg }}
Seite 31
Protokoll-Implementierung
Protokoll-Handler– Implementierung einer Protokoll-Instanz
– wird an ein Netzwerk-Gerät gebunden
trait Codec[T] { // encode msg as byte-array def encode(msg: T): Array[Byte] // decode msg from byte-array def decode(bytes: Array[Byte]): T} Codec: Protokoll-spezifische Serialisierung
abstract class NetworkDriver[MSG_T](val codec: Codec[MSG_T]) {
// low level routine for frame transfer; will be set when connected to a device var sendWire: Array[Byte] => Unit = null
// used to encode and transfer msgs; to be used by derived classes final def sendMsg(msg: MSG_T) : Unit = { val bytes : Array[Byte] = codec.encode(msg) sendWire(bytes) } // called when a msg arrives; to be defined by derived classes def acceptMsg(msg: MSG_T) : Unit}
Treiber sind Ableitungen dieses Traits. Ein Treiber muss die Empfangsroutine definieren und kann Daten via sendWire senden. Ein Netzwerktreiber stellt die unterste Ebne einer Protokollimplementierung dar.
NetworkDriver
sendWire
konkreterNetworkDriver
acceptMsg
wird abhängig
vom Netzwerk-
Gerät gesetzt
wird abhängig
vom Protokoll definiert
Seite 32
Protokoll-Implementierung
Emulation Physical-Layer– Protokoll-Handler
– werden an Netzwerk-Geräte gebunden
DriverDriver DriverCodec Codec Codec
Physical-Layer
class PhysicalLayer { // all devices; devices are identified by symbols var devices : Map[Symbol, NetworkDevice] = Map[Symbol, NetworkDevice]() // install a new device; a device is always wired def installNetworkDevice(deviceId: Symbol, networkDevice: NetworkDevice): Unit = { devices = devices + (deviceId -> networkDevice) } def getLinkIds : Set[Symbol] = devices.keySet // install a handler at a device; the handler will accept all incoming msgs def installDriver[MSG_T](deviceId: Symbol, driver: NetworkDriver[MSG_T]): Unit = { val networkDevice = devices(deviceId) // install receive method of handler as callback of network device // the handler will get decoded bytes networkDevice.register( { (bytes: Array[Byte]) => driver.acceptMsg(driver.codec.decode(bytes)) }) // set low-level send routine driver.sendWire = networkDevice.sendWire } }
Seite 33
Protokoll-Implementierung
Protokoll-HandlerBeispiel: Echo-Protokoll – 1 : Nachrichten und deren (De-) Serialisierung
abstract class EchoMsgcase class EchoRequest(msg: String) extends EchoMsgcase class EchoReply(msg: String) extends EchoMsg
object EchoCodec extends Codec[EchoMsg] { val ECHO_REQUEST: Byte = 0 val ECHO_REPLY: Byte = 1 def encode(msg: EchoMsg): Array[Byte] = { msg match { case EchoRequest(str) => val strBytes : Array[Byte]= str.getBytes val strBytesLen = strBytes.length val result = new Array[Byte](strBytesLen+1) result(0) = ECHO_REQUEST strBytes.copyToArray(result, 1) result case EchoReply(str) => val strBytes = str.getBytes val strBytesLen = strBytes.length val result = new Array[Byte](strBytesLen+1) strBytes.copyToArray(result, 1) result(0) = ECHO_REPLY result } }
def decode(bytes: Array[Byte]): EchoMsg = { bytes(0) match { case ECHO_REQUEST => EchoRequest(new String(bytes, 1, bytes.length-1)) case ECHO_REPLY => EchoReply(new String(bytes, 1, bytes.length-1)) } }}
Seite 34
Protokoll-Implementierung
Protokoll-HandlerBeispiel: Echo-Protokoll – 2 : Protokoll-Handler Client-Seite
EchoClient
send
Codec
acceptMsg
Device
sendMsg
class EchoClient extends NetworkDriver[EchoMsg](EchoCodec) {
def send(str: String) { sendMsg(EchoRequest(str)) } def acceptMsg(msg: EchoMsg) : Unit = { msg match { case EchoReply(msg) => println("Client received reply " + msg) case _ => throw new Exception("unexpected Msg!") } }
}
Seite 35
Protokoll-Implementierung
Protokoll-HandlerBeispiel: Echo-Protokoll – 3 : Protokoll-Handler Server-Seite
EchoServer
Codec
acceptMsg
Device
sendMsg
class EchoServer extends NetworkDriver[EchoMsg](EchoCodec) {
def acceptMsg(msg: EchoMsg) : Unit = { msg match { case EchoRequest(msg) => println("server received " + msg) sendMsg(EchoReply(msg)) case _ => throw new Exception("unexpected Msg!") } }
}
Seite 36
Protokoll-Implementierung
Beispiel: Echo-Protokoll auf Physical-Layer / Test
import akka.actor.{ Actor, ActorRef, Props, ActorSystem}
class StringCodec extends Codec[String] { def encode(msg: String): Array[Byte] = msg.getBytes def decode(ba: Array[Byte]): String = new String(ba)}
object UDPNode_1 extends App { val myPort = 4711 val remotePort = 4712 implicit val system = ActorSystem("UDPSystem") val udpDevice = new UDPNetworkDevice(myPort, remotePort) object physicalLayer extends PhysicalLayer val stringCodec = new StringCodec val echoServer = new EchoServer physicalLayer.installNetworkDevice('A, udpDevice) physicalLayer.installDriver('A, echoServer)}
import akka.actor.{ Actor, ActorRef, Props, ActorSystem}
object UDPNode_2 extends App { val myPort = 4712 val remotePort = 4711 implicit val system = ActorSystem("UDPSystem") val udpDevice = new UDPNetworkDevice(myPort, remotePort) object physicalLayer extends PhysicalLayer val stringCodec = new StringCodec val echoClient = new EchoClient physicalLayer.installNetworkDevice('B, udpDevice) physicalLayer.installDriver('B, echoClient) Thread.sleep(500) echoClient.send("HALLO 1") echoClient.send("HALLO 2")}
Seite 37
Protokoll-Instanzen
Protokoll und Protokoll-Instanz – Auf einem Knoten ist in der Regel mehr als ein Protokoll aktiv
– Ein Protokoll kann zudem in mehr als einer Instanz gleichzeitig abgewickelt werden
– Verbindungsorientierte Protokolle: mehrere gleichzeitige Verbindungen
– Verbindungslose Protokolle die in mehreren Instanzen gleichzeitig abgewickelt werdenBeispiel: Broadcast
Mehrere Knoten können gleichzeitig an mehreren Broadcast beteiligt seinbei denen sie eventuell unterschiedliche Rollen spielen.
– Mehrere Protokolle Zuordnung der Nachrichten zu Automaten unterschiedlicher Protokolle
an Hand einer Protokoll-ID (Schicht-2 <=> Schicht-3)
– Mehrere Instanzen eines Protokolls Zuordnung der Nachrichten zu unterschiedlichen Instanzen des gleichen Automaten
an Hand einer SAP-Nummer („Port“)
Seite 38
Protokoll-Instanzen
Protokoll-Instanz
Protokoll Automat(Monitor)
cmd
Protokoll Automat(Monitor) Protokoll
Automat
cmdcmd
acceptPkt
sendWire
Empfangs-Prozess
resend
connect
erzeuge
receive
zustellen
registrieren
Beispiel: Protokoll-Instanzen bei einem
Verbindungsorientieten Protokoll
ServiceAccessPointsSAPs
Seite 39
Protokoll-Instanzen
Beispiel: Emulation einer Link-Layer-Implementierung Aufgabe des Link-Layers
– Allgemein: Bereitstellung einer Punkt-zu-Punkt-Kommunikation zwischen Netz-Knoten (Rechner) die direkt über ein Medium verbunden sind
– Protokoll-Dispatching: Installation von Protokoll-Handlern für das Behandeln von Nachrichten die an zu einem bestimmten Protokoll gehören
– Adressierung auf dem Link-Layer: Knoten-Identitäten festlegen / verwalten Namen / Adresse der lokalen Station feststellen / definieren Namen / Adressen der direkt erreichbaren Stationen feststellen / definieren
DriverDriver DriverCodec Codec Codec
Physical-Layer
DriverDriver DriverCodec Codec Codec
Link-LayerNode BNode A
sendTo(B, msg) (A, msg) = reciveFrom()
B A
Seite 40
Protokoll-Instanzen
Beispiel: Emulation einer Link-Layer-Implementierung Übersicht
object LinkLayer {
type NodeId = Int type DeviceId = Symbol type ProtocolId = Int case class Frame(protocolID: ProtocolId, payload: Array[Byte])}
abstract class LinkLayer(val localAddr: NodeId) { protected val physicalLayer: PhysicalLayer …
def sendTo(protId: ProtocolId, dest: NodeId, payload: Array[Byte]) { … } def connectPhysical(deviceId: DeviceId, remoteAdr: NodeId) : Unit = { …. } def registerProtocolHandler[MSG_T <: Any](handler: ProtocolHandler[MSG_T]) : Unit = { …. }}
Begleiter-Objekt: definiert Adresstypen und und Rahmenformat
Die Klasse ist abstrakt in Bezug auf den Physical-Layer mit dem der LL verbunden ist.
Sende eine Nachricht an einen bestimmten Knoten.
Verknüpfe ein Netzwerk-Gerät mit dem dem Namen der Station, die über dieses Gerät erreichbar ist.
Registriere einen Handler, der für die Nachrichten eines bestimmten Protokolls zuständig ist.
Seite 41
Protokoll-Instanzen
Beispiel: Emulation einer Link-Layer-Implementierung Protocol-Handler und deren Registrierung
abstract class ProtocolHandler[MSG_T <: Any](val protId: ProtocolId, val codec: Codec[MSG_T]) {
protected val ll: LinkLayer protected final def sendMsg(dest: NodeId, msg: MSG_T) : Unit = { val bytes : Array[Byte] = codec.encode(msg) ll.sendTo(protId, dest, bytes) } def acceptMsgFrom(msg: MSG_T, from: NodeId) : Unit}
Die Klasse ist abstrakt in Bezug auf den Link-Layer mit dem sie verbunden ist.
sendMsg wird von abgeleiteten Klassen zum Senden genutzt.
acceptMsgFrom wird von abgeleiteten Klassen definiert.
abstract class LinkLayer(val localAddr: NodeId) { …
// all installed protocol handlers, protocol handlers are identified by protocol ids private var handlers : Map[ProtocolId, ProtocolHandler[Any]] = Map[ProtocolId, ProtocolHandler[Any]]() def registerProtocolHandler[MSG_T <: Any](handler: ProtocolHandler[MSG_T]) : Unit = { handlers = handlers + (handler.protId -> handler.asInstanceOf[ProtocolHandler[Any]]) }
… }
Ein Protocol-Handler wird in einer map unter ihrer Prokollnummer gespeichert. Nachrichten werden vom Dispatcher an Hand der Nummer zugestellt.
Seite 42
Protokoll-Instanzen
Beispiel: Emulation einer Link-Layer-Implementierung Verbindung mit Netzwerkgeräten
abstract class LinkLayer(val localAddr: NodeId) { …
private var deviceToNode: Map[DeviceId, NodeId] = Map[DeviceId, NodeId]() private var nodeToDevice: Map[NodeId, DeviceId] = Map[NodeId, DeviceId]() def connectPhysical(deviceId: DeviceId, remoteAdr: NodeId) : Unit = { physicalLayer.installDriver( deviceId, new NetworkDriver[Frame](FrameCodec) { val device = physicalLayer.devices(deviceId)
override def acceptMsg(msg: Frame) : Unit = msg match { case Frame(protocolId, payload) => // dispatch to handler for protocolId handlers(protocolId).acceptMsgFrom(handlers(protocolId).codec.decode(payload), deviceToNode(deviceId)) case _ => throw new Exception("unexpected Msg!")
} }) deviceToNode = deviceToNode + (deviceId -> remoteAdr) nodeToDevice = nodeToDevice + (remoteAdr -> deviceId) }
…}
object FrameCodec extends Codec[Frame]{ def encode(msg: Frame): Array[Byte] = msg match { case Frame(protocolID: ProtocolId, payload: Array[Byte]) => val payloadLen = payload.length val result = new Array[Byte](payloadLen+1) result(0) = protocolID.toByte payload.copyToArray(result, 1) result } def decode(ba: Array[Byte]): Frame = Frame(ba(0).toInt, ba.slice(1, ba.length))}
Codec für Rahmen
Installation eines Netzwerktreibers auf dem Gerät das mit dem verbundenen Knoten kommuniziert und Nachrichten an Hand ihrer Protokoll-Id dispatcht.Der Absender ist der mit dem Gerät assoziierte Knoten.
Seite 43
Protokoll-Instanzen
Beispiel: Emulation einer Link-Layer-Implementierung Echo-Client und Server als Protocol-Handler
abstract class EchoClient extends ProtocolHandler[EchoMsg](1, EchoCodec) { val remoteId : NodeId override def acceptMsgFrom(msg: EchoMsg, from: NodeId) : Unit = { msg match { case EchoReply(str) => println(s"Client received $str"); case x: Any => println("Client received msg with wrong format " + msg) } } def sendUserMsg(str: String) { sendMsg(remoteId, EchoRequest(str)) } }
abstract class EchoServer extends ProtocolHandler[EchoMsg](1, EchoCodec) { val remoteId : NodeId override def acceptMsgFrom(msg: EchoMsg, from: NodeId) : Unit = { println(s"Server received $msg from $from") msg match { case EchoRequest(str) => sendMsg(from, EchoReply("ECHO "+ str)); case x: Any => println("Server received msg with wrong format " + msg) } } }
Seite 44
Protokoll-Instanzen
Beispiel: Emulation einer Link-Layer-Implementierung Echo-Nachrichten und ihr Codec
abstract class EchoMsgcase class EchoRequest(msg: String) extends EchoMsgcase class EchoReply(msg: String) extends EchoMsg
object EchoCodec extends Codec[EchoMsg] { val ECHO_REQUEST: Byte = 0 val ECHO_REPLY: Byte = 1 def encode(msg: EchoMsg): Array[Byte] = { msg match { case EchoRequest(str) => val strBytes : Array[Byte]= str.getBytes val strBytesLen = strBytes.length val result = new Array[Byte](strBytesLen+1) result(0) = ECHO_REQUEST strBytes.copyToArray(result, 1) result case EchoReply(str) => val strBytes = str.getBytes val strBytesLen = strBytes.length val result = new Array[Byte](strBytesLen+1) strBytes.copyToArray(result, 1) result(0) = ECHO_REPLY result } } def decode(bytes: Array[Byte]): EchoMsg = { bytes(0) match { case ECHO_REQUEST => EchoRequest(new String(bytes, 1, bytes.length-1)) case ECHO_REPLY => EchoReply(new String(bytes, 1, bytes.length-1)) } }}
Seite 45
Protokoll-Instanzen
Beispiel: Emulation einer Link-Layer-Implementierung Test
object LLEchoServer extends App { val myPort = 4711 val remotePort = 4712 val localNodeName = 1 val partnerNodeName = 2 implicit val system = ActorSystem("UDPSystem") val udpDevice = new UDPNetworkDevice(myPort, remotePort) object physicalLayerHere extends PhysicalLayer physicalLayerHere.installNetworkDevice('A, udpDevice) object linkLayer extends LinkLayer(localNodeName) { val physicalLayer = physicalLayerHere } linkLayer.connectPhysical('A, partnerNodeName)
object EchoServerInst extends EchoServer { val remoteId : NodeId = partnerNodeName val ll : LinkLayer = linkLayer } linkLayer.registerProtocolHandler(EchoServerInst) }
object LLEchoClient extends App { val myPort = 4712 val remotePort = 4711 val localNodeName = 2 val partnerNodeName = 1 implicit val system = ActorSystem("UDPSystem") val udpDevice = new UDPNetworkDevice(myPort, remotePort) object physicalLayerHere extends PhysicalLayer physicalLayerHere.installNetworkDevice('B, udpDevice) object linkLayer extends LinkLayer(localNodeName) { val physicalLayer = physicalLayerHere } linkLayer.connectPhysical('B, partnerNodeName)
object EchoClientInst extends EchoClient { val remoteId : NodeId = partnerNodeName val ll : LinkLayer = linkLayer } linkLayer.registerProtocolHandler(EchoClientInst) Thread.sleep(1000) for (i <- 1 to 10) { EchoClientInst.sendUserMsg("Hallo Nr " + i) }
}