30
Verteilte Systeme CS5001 Th. Letschert TH Mittelhessen Gießen University of Applied Sciences Aktoren - Aktor-Modelle und -Systeme - Aktoren in Scala / Akka (Kurzeinführung)

Verteilte Systeme CS5001 Th. Letschert - homepages.thm.dehg51/Veranstaltungen/VS-13/Folien/vs-07.pdf · Ein Aktor ist eine Laufzeit-Entität die sich entsprechend dem Aktor-Paradigma

Embed Size (px)

Citation preview

Verteilte Systeme CS5001

Th. LetschertTH Mittelhessen Gießen

University of Applied Sciences

Aktoren - Aktor-Modelle und -Systeme - Aktoren in Scala / Akka (Kurzeinführung)

Seite 2

Aktoren

Aktoren

– Abgeschlossene (re-) aktive Objekte: Zustand + Verhalten

– Aktoren habeneinen jeweils eigenen Zustand (Speicher, ...) es gibt keinen gemeinsamen Zustand (Speicher, ...)

– Sie interagieren ausschließlich über asynchrone Nachrichten

– Jeder Aktor hat eine Mailboxeine Queue für alle eingehenden Nachrichten

– NachrichtenverarbeitungEintreffende Nachrichten aktivieren lokale HandlerHandler erden aktiv wenn die Mailbox eine passende Nachricht enthält

– ModellVariante der Modelle verteilte Systeme mit asynchroner Kommunikationmit sehr hoher Ausdruckskraft (dynmaische Topologie alles kann gesendet werden) [ Diese Ausdrucksmächtigkeit muss nicht unbedingt ausgenutzt werden ! ]

– Programmiersprache / ProgrammiersystemeProgrammiersprachen / Bibliothekenmit Ausdrucksmittel entsprechend dem Aktor-Modell

Computer science is about understanding and construction.

R. Milner

Seite 3

Aktor-Modell

Das Aktor-Modell - Prinzipien

– Ein Aktor ist eine atomare Entität sie hat ein Verhalten – d. h. sie reagiert in einer

bestimmten Weise auf eintreffende Nachrichten sie kann als Teil ihres Verhaltens Nachrichten erzeugen und an andere Aktoren

versenden sie kann ihr Verhalten mit dem Eintreffen von Nachrichten ändern Nachrichten sind

unveränderliche Werte (Version A des Aktor-Modells) Aktoren (Version B des Aktor-Modells)

– Verwendung Theoretische Untersuchungen zu den Grundlagen der Informatik

Was ist die Essenz verteilter und nebenläufiger Systeme Autoren: Carl Hewitt / Gul Agha / Irene Greif ...

– BeschreibungGul A. Agha. ACTORS: A Model of Concurrent Computation in Distributed Systems. Series in Articial Intelligence. The MIT Press, Cambridge, Massachusetts, 1986.

Carl Hewitt on the actor model - live:http://ak.channel9.msdn.com/ch9/01ad/2ca509cc-d410-4663-8679-6ece29cd01ad/LangNEXT2012HewittMeijerSzyperkiActors_mid.mp4

Seite 4

Aktor-Modell

Das Aktor-Modell - Charakteristika

– Asynchrones Modell Senden und Empfangen sind entkoppelt Nachrichten können verloren gehen Aktive Elemente können nicht ausfallen

– Nachrichtenzustellung Die Zustellung der Nachrichten ist außerhalb des Modells

gesendete Nachrichten können in beliebiger Reihenfolgeirgendwann zugestellt werden – oder auch nicht Einzige Restriktion: Nur gesendete Nachrichten werden ausgeliefert

sie kann ihr Verhalten mit dem Eintreffen von Nachrichten ändern

Nachrichten sind unveränderliche Werte (Version A des Aktor-Modells) Aktoren (Version B des Aktor-Modells)

– Toplogie Aktornetze sind dynamisch keine Kanäle / Ports o.Ä. Nachrichten werden an Aktoren gesendet alles ist ein Aktor – auch Nachrichten

The completely unknown

outer space

Seite 5

Aktoren

Aktor-Systeme

– Programmiersprachen / Programmiersysteme denen das Aktor-Modell zugrunde liegt.

Wikipedia: Actor Implementierungen

Computer science is about understanding and construction.

R. Milner

Seite 6

Aktor-Systeme

Akka

– Bibliothek / Toolkit für verteilte und nebenläufige Anwendungen– Basiert auf dem Aktormodell– Java und Scala-API– Scala 2.10: übernimmt Aktoren im Akka-Stil– Warum Akka

Einsatz: Verteilte, skalierbare, robuste Anwendungen mit hohen Durchsatz (geeignet für Realzeit-Anwendungen)moderne Variante der Middleware-Idee (natürlich nicht die einzige Option)

http://akka.io/

Seite 7

Akka-Installation

Installation (Eclipse)

Installiere Eclipse (Eclispe IDE for Java-Developers)

Passe eclipse.ini an [ siehe http://scala-ide.org/docs/user/advancedsetup.html ]

Starte Eclipse und installiere Updates

Installiere Scala Plugin für Scala 2.10.x [ siehe http://scala-ide.org/ ]

Installiere Akka 2.1.1 für Scala 2.10 [ siehe http://typesafe.com/stack/downloads/akka ]

Definiere User-Library (Window/Preferences/Java/Build Path/User Libraries), z.B. mit dem Namen AkkaLib und füge akka-2.1.1/lib/akka/*.jar als externe Jars hinzu

Wechsle in die Scala-Perspektive und definiere ein Scala-Projekt mit der User-Library AkkaLib

Akka wird hier (zunächst) nur als Bibliothek für logisch verteilt aber nicht real verteilte Scala-Anwendungen mit Akka-Aktoren genutzt, d.h. für Stufe 1 der Entwicklung verteilter Anwendungen: Erprobe Algorithmen in einer einfachen lokalen Umgebung.

Literatur: - Scala-Literatur - im Akk-Download enthaltene Dokumentation

Seite 8

Aktoren: Erzeugen, senden, empfangen

Aktor erzeugen / Nachricht senden / Nachricht empfangenErzeuge ein Aktor-System

Definiere eine Aktor-Klasse als Unterklasse von akka.actor.Actor

Erzeuge eine Aktor-Referenz vom Typ akka.actor.ActorRef

Aktoren werden indirekt über Objekte von diesem Typ angesprochen

Um eine Aktor-Referenz zu erzeugen wird

– ein Aktor-System akka.actor.ActorSystem und

– eine Konfigurationsklasse akka.actor.Props

benötigt.

Beispiel: package examples

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Props

class MyActor extends Actor { def receive = { case s: String => println("Actor received \"" + s + "\" from " + sender ) case _ => println("Actor received unknown msg from " + sender ) }}

object Example_1 extends App { val system = ActorSystem("MySystem") val myActor = system.actorOf(Props[MyActor], name = "myActor") myActor ! "Hello actor"}

Actor received "Hello actor" from Actor[akka://MySystem/deadLetters]Der Sender ist kein Aktor, darum „deadLetter“ als Absender.

main-Thread sendet

Aktor empfängt

sende mit Aktor ! msg

empfange mit receive

Seite 9

Aktoren: Erzeugen, senden, empfangen

Aktor-DSL, Life-cycle eventsDie Actor-DSL erlaubt eine vereinfachte Notation für die Erzeugung von Aktoren

Aktoren können mit Life-cycle events ausgestattet werden

Beispiel:

package examples

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsimport akka.actor.ActorDSL._

class MyActor extends Actor { def receive = { case s: String => println("Actor received \"" + s + "\" from " + sender ) case _ => println("Actor received unknown msg from " + sender ) }}

object Example_1 extends App {

implicit val system = ActorSystem("MySystem") val myActor_1 = system.actorOf(Props[MyActor], name = "myActor") val myActor_2 = actor(new Act { whenStarting { myActor_1 ! "Hello from an actor" } })

}

Actor received "Hello from an actor" from Actor[akka://MySystem/user/$a]

Aktor wird mir der Actor-DSL erzeugtund mit einem life-cycle event ausgestattet. Nach dem Start sende!Alle erzeugten Aktoren werden automatisch gestartet.

MyActor-Instanzen sind rein reaktiv: sie agieren nur beim Empfang von Nachrichten.

Seite 10

Aktoren: Erzeugen, senden, empfangen

Aktoren aus Aktor-Klassen mit KonstruktorAktoren können auf zwei Arten erzeugt werden

Aktoren aus Aktor-Klassen via Default-Konstruktor: system.actorOf(Props[Aktor-Klasse], ...)

Aktoren aus Aktor-Klassen via Konstruktor mit Parameter: system.actorOf(Props(Aktor-Instanz), ...)

Beispiel:

package examples

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsimport akka.actor.ActorDSL._import akka.actor.ActorRef

object Example_2 extends App {

val ringSize = 10 val actorArray = new Array[ActorRef](ringSize); class HelloActor(id: Int) extends Actor {

override def preStart() { // Start-Hook ~ whenStarting if (id == 0) { actorArray(1) ! "Hello form actor 0" } }

def receive = { case s: String => println("Actor " + id + " received \"" + s + "\"") actorArray((id+1)%ringSize) ! "Hello form actor " + id } }

val system = ActorSystem("HelloSystem") for (i <- 0 until actorArray.size) actorArray(i) = system.actorOf(Props(new HelloActor(i)), name = "helloActor_"+i) }

Ein Ring aus 10 Aktoren. Der erste wird mit einem life-cycle event ausgestattet.Sie senden sich im Kreis Nachrichten zu.

Die Aktoren werden aus Instanzen erzeugt, die mit einem parametrisierten Konstruktor erzeugt wurden.

Seite 11

Aktoren-System: Strukturen, Komponenten

AktorEin Aktor ist eine Laufzeit-Entität die sich entsprechend dem Aktor-Paradigma verhält.

Aktor-InstanzEine Aktor-Instanz ist eine Instanz einer von Klasse die von akka.actor.Actor abgeleitet ist.

Aktor-ReferenzAktoren werden vom Aktor-System erzeugt. Das Aktor-System liefert dabei eine Aktor-Referenz

vom Typ akka.actor.ActorRef als „Handle“ für den Aktor.

Aktor-SystemAktoren werden vom Aktor-System erzeugt. Das Aktor-System liefert dabei eine Aktor-Referenz

vom Typ akka.actor.ActorRef als „Handle“ für den Aktor.

Aktor-HierarchieAktoren stehen in einem hierarchischen Verhältnis zueinander. Jeder ist seinem Erzeuger und Aufseher (Supervisor) untergeordnet. An der Spitze

Seite 12

Aktoren-System: Strukturen, Komponenten

System-AktorenEin Aktor-System hat stets drei Guardian-Aktoren, die top-level Supervisors

– Root Guardian Actor

erzeugt und beaufsichtigt die beiden anderen

– (User) Guardian Actor

erzeugt und beaufsichtigt alle Aktoren des Benutzers

– System Guardian Actor

für System-Aufgaben

Graphik aus der Akka-Doku

Seite 13

Aktoren-System: Strukturen, Komponenten

Beispiel Aktor und untergeordnete Aktoren

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsimport akka.actor.ActorRefimport akka.actor.ActorDSL._

abstract sealed class Msgcase object Create extends Msgcase object Kill extends Msgcase class Info(s: String) extends Msg

class Child extends Actor { def receive = { case s: String => println("Child received \"" + s + "\" from " + sender ) }}

class Parent extends Actor { var child : Option[ActorRef] = None def receive = { case Create => child match { case Some(a)=> println("Create ignored") case None => child = Some(context.actorOf(Props[Child], name = "child")) } case Info(s) => child match { case Some(a)=> a ! s case None => println("No child to send to!") } case Kill => child match { case Some(a)=> context.stop(a) child = None case None => println("No child to kill!") } }}

object Example_3 extends App { val system = ActorSystem("MySystem") val parent = system.actorOf(Props[Parent], name = "parent") parent ! Info("Hello 1") parent ! Create parent ! Info("Hello 2") parent ! Kill parent ! Info("Hello 3")}

No child to send to!Created Child akka://MySystem/user/parent/childNo child to send to!Child received "Hello 2" from Actor[akka://MySystem/user/parent]

Erzeugung eines User-Aktors durch das System, d.h. den User Guardian.

Erzeugung eines User-Aktors durch durch einen anderen User-Aktor.

Seite 14

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsimport akka.actor.ActorRefimport akka.actor.ActorDSL._

class Slave(i: Int) extends Actor { def receive = { case s:String => println(self + " received " + s) case _ => println(self + " received unknown msg") }}

class Master extends Actor { var slave : Array[Option[ActorRef]] = Array(None, None, None) override def preStart() { println ("This is the master " + context.self.path) for ( i <- 0 until 3 ) { slave(i) = Some(context.actorOf(Props(new Slave(i)), name = "Slave_"+i)) println("Master created " + slave(i).get.path) } } def receive = { case _ => println("Master has slaves: ") for (c <- context.children) { println(" -> " + c.path) }

}}

Aktoren-System: Strukturen, Komponenten

Pfade

Jeder Benutzer-Aktor hat einen hierarchisch aufgebauten Pfad akka://Aktor-System/user/name1/name2/... Über den Pfad kann auf eine Aktor-Referenz zugegriffen werden. Beispiel:

object Example_4 extends App { val system = ActorSystem("MySystem") val master = system.actorOf(Props[Master], name = "master") master ! "Your Slaves?" system.actorFor("akka://MySystem/user/master/Slave_2") ! "Hi"}

This is the master akka://MySystem/user/masterMaster created akka://MySystem/user/master/Slave_0Master created akka://MySystem/user/master/Slave_1Master created akka://MySystem/user/master/Slave_2Master has slaves: Actor[akka://MySystem/user/master/Slave_2] received Hi -> akka://MySystem/user/master/Slave_0 -> akka://MySystem/user/master/Slave_1 -> akka://MySystem/user/master/Slave_2

Bezug auf einen existierenden Aktor.

Aktor-Erzeugung.

Seite 15

Aktoren-System: Strukturen, Komponenten

Referenz, Pfad, Context

Aktor-Referenzen, -Pfade und -Kontexte sind verknüpft.

Graphik aus der Akka-Doku

Seite 16

Aktoren: Lebenszyklus

Erzeugung ~> Start

Aktoren werden bei ihrer Erzeugung sofort gestartet. Eine besondere Start-Anweisung ist unnötigVarianten der Erzeugung:

– Im System-Kontext:val system = ActorSystem("MySystem")

val myActor = system.actorOf(Props[MyActor], name = "myActor")

– Im System-Kontext mit Konstruktor-Parametern:val system = ActorSystem("HelloSystem")

val myActor = system.actorOf(Props(new MyActor(x)), name = "myActor")

– Innerhalb eines Aktors:context.actorOf(Props(new MyActor(x)), name = "myActor") bzw.:

context.actorOf(Props(new MyActor(x)), name = "myActor")

Seite 17

Aktoren: Lebenszyklus

Erzeugung / Start mit Aktor-DSL

Aktoren können der Aktor-DSL erzeugt und gestartet werden

import akka.actor.ActorDSL._import akka.actor.ActorSystem

object Example_5 extends App { implicit val system = ActorSystem("MySystem")

val actor_1 = actor(system)(new Act {whenStarting {

actor_2 ! "Hello"}

})

val actor_2 = actor(new Act {become {

case s:String => println("actor 2 received "+s+" from "+sender)

} })}

(system) ist hier impliziter Parameter

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.ActorDSL._

object Example_6 extends App { implicit val system = ActorSystem("MySystem")

val actor_parent = actor(new Act {

val actor_child = actor(context)( new Act {become {

case s:String => println("actor child received " + s + " from " + sender)

}})

whenStarting {

actor_child ! "Hello"}

})}

Erzeugung im System

Erzeugung im Aktor-Kontext

Erzeugung im System

Seite 18

Aktoren: Lebenszyklus

Stoppen von Aktoren: Aktoren können mit Giftpillen und der Stop-Methode gestoppt werden

System beenden: Das gesamte Aktorsystem kann mit shutdown herunter gefahren werden

import akka.actor.ActorSystemimport akka.actor.ActorDSL._import akka.actor.PoisonPill

object Example_7 extends App { implicit val system = ActorSystem("MySystem")

val actor_1 = actor(system)(new Act {override def postStop() {

println("actor 1 got stopped")}whenStarting {

actor_2 ! "Hello"actor_2 ! PoisonPillactor_2 ! "Still alive ?"

} })

val actor_2 = actor(new Act {override def postStop() {

println("actor 2 got stopped")}become {

case s:String => println("actor 2 received " + s + " from " + sender)

} })

Thread.sleep(1000)

println("main stops actor 1") system.stop(actor_1)

system.shutdown}

Actor 1 „tötet“ Aktor 2 mit einer Giftpille

Actor 1 wird vom main-Thread gestoppt

Das Aktor-System wird herunter gefahren

Seite 19

Aktoren: Lebenszyklus

Stoppen von Aktoren:

Graceful Stop

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsimport akka.actor.ActorRefimport akka.actor.ActorDSL._import akka.actor.Terminatedimport akka.pattern.gracefulStopimport scala.concurrent.Awaitimport scala.concurrent.Futureimport scala.concurrent.duration._

class Helper(i: Int) extends Actor { def receive = { case s:String => println(

self + " received " + s) }}

class Worker extends Actor { var helper : Array[Option[ActorRef]] = Array(None, None, None) override def preStart() { for ( i <- 0 until 3 ) { helper(i) = Some(context.actorOf(Props(new Helper(i)), name = "helper_"+i)) context.watch(helper(i).get) } } def receive = { case msg: String => for (h <- context.children) { h ! msg } case Terminated(child) => println("Oops someone stopped " + child) }}

object Example_7_1 extends App { val system = ActorSystem("MySystem") val worker = system.actorOf(Props[Worker], name = "worker") worker ! "Hi" Thread.sleep(100) //system.actorFor("akka://MySystem/user/worker/helper_1") ! PoisonPill try {// graceful Stop val actorRef = system.actorFor("akka://MySystem/user/worker/helper_1") val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds)(system) Await.result(stopped, 6 seconds) println(stopped.value.get.get) } catch { case e: akka.pattern.AskTimeoutException => println (e) } Thread.sleep(100) worker ! "How are you?" Thread.sleep(100) system.shutdown}

Der Aktor wird „mit Anmut“ gestoppt.

Seite 20

Aktoren: Lebenszyklus

Start- / Stopp-Hooks

preStart / whenSarting

wird vor dem Start des Aktors aufgerufen

postStop / whenStopping

wird nach dem Stopp aktiviert

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.ActorDSL._import akka.actor.Props

class HookedActor extends Actor { override def preStart { println("I'm going to be started") } override def postStop { println("I was stoped") } def receive = { case _ => println("I've got a msg") } }

object Example_8 extends App { implicit val system = ActorSystem("MySystem") val actor_1 = system.actorOf(Props[HookedActor], name = "actor_1") val actor_2 = actor(new Act {

whenStarting {println("I'm going to be started - DSL version")

}whenStopping {

println("I was stopped - DSL version")}become {

case _ => println("I've got a msg")}

}) system.shutdown}

Seite 21

Aktoren: Lebenszyklus

Überwachung

context.watch

überwacht unter-geordneten Aktor

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsimport akka.actor.ActorRefimport akka.actor.ActorDSL._import akka.actor.Terminatedimport akka.actor.PoisonPill

class Helper(i: Int) extends Actor { def receive = { case s:String => println(self + " received " + s) }}

class Worker extends Actor { var helper : Array[Option[ActorRef]] = Array(None, None, None) override def preStart() { for ( i <- 0 until 3 ) { helper(i) = Some(context.actorOf(Props(new Helper(i)), name = "helper_"+i)) context.watch(helper(i).get) } } def receive = { case msg: String => for (h <- context.children) { h ! msg } case Terminated(child) => println("Oops someone stopped " + child) }}

object Example_9 extends App { val system = ActorSystem("MySystem") val worker = system.actorOf(Props[Worker], name = "worker") worker ! "Hi" system.actorFor("akka://MySystem/user/worker/helper_1") ! PoisonPill}

Überwachung aktivieren

Stopp bemerken

Aktor stoppen

Seite 22

supervised received msg no 1 Blasupervised received msg no 2 BlaBlasupervised received msg no 3 dangerousMSG

java.lang.Exception: can not handle this msgat ...etc...

supervised received msg no 1 Blubbersupervised received msg no 2 BlubberBlubber

Aktoren: Lebenszyklus

Restart

abgestürzte Aktoren werden automatisch neu gestartet

import akka.actor.Actorimport akka.actor.ActorSystemimport akka.actor.Propsimport akka.actor.ActorRef

class Supervised() extends Actor { var msgCount = 0 def receive = { case msg:String => msgCount = msgCount + 1 println("supervised received msg no " + msgCount + " " + msg) if (msg == "dangerousMSG") { throw new Exception("can not handle this msg") } }}

class Supervisor extends Actor { var supervised : Option[ActorRef] = None override def preStart() { supervised = Some(context.actorOf(Props(new Supervised()), name = "supervised")) } def receive = { case msg: String => for (h <- context.children) { h ! msg } }}

object Example_10 extends App { val system = ActorSystem("MySystem") val supervisor = system.actorOf(Props[Supervisor], name = "supervisor") supervisor ! "Bla" supervisor ! "BlaBla" supervisor ! "dangerousMSG" Thread.sleep(250) supervisor ! "Blubber" supervisor ! "BlubberBlubber"}

Absturz

neu gestartet geht es weiter

Seite 23

Aktoren: Lebenszyklus

Restart

Wenn ein Fehler (eine Exception) in einem Aktor auftritt werden folgende Aktionen ausgeführt:

– der Aktor wird angehalten

– der preRestart-Hook wird ausgeführt

mit Default-Verhalten: alle Kinder (Aktoren die er beaufsichtigt) werden gestoppt der postStop-Hook wird ausgeführt

– der Aktor durch eine neu erzeugte Instanz ersetzt

– Die Aktor-Referenz zeigt auf den neuen Aktor

– Die Mailbox bleibt in ihrem Zustand

alle nicht empfangen Nachrichten sind weiterhin nicht empfangen

(mit Ausnahme der Nachricht, die zum Fehler führte)

– der postRestart-Hook wird aufgerufen

mit Default-Verhalten:Aufruf des preStart-Hooks

– alle Kinder werden mit dem gleichen Verfahren (rekursiv) neu gestartet

– der Aktor beginnt seine Nachrichtenverarbeitung aufs neue

Seite 24

Aktoren: Nachrichten Senden und Empfangen

Empfangen

Aktoren sind reaktiv. Nachrichten werden von der receive-Methode verarbeitet

import akka.actor.Actorimport akka.actor.Propsimport akka.actor.ActorSystemimport akka.actor.ActorDSL._

class ReceivingActor extends Actor {

def receive = {case "test" => println("received \"test\" from " + sender)case _ => println("received unknown message from " + sender)

}}

object Example_11 extends App { val system = ActorSystem("MySystem") val receivingActor = system.actorOf(Props[ReceivingActor], name = "receivingActor")

val sender = actor(system)(new Act { whenStarting {

receivingActor ! "test" } })

system.shutdown

}

Seite 25

Aktoren: Nachrichten Senden und Empfangen

VerhaltensänderungenAktoren können ihr Verhalten ändern mit context.become

import akka.actor.Actorimport akka.actor.Propsimport akka.actor.ActorSystemimport akka.actor.ActorDSL._

class ChangingActor extends Actor { def receive = {

case msg : String => println("received " + msg)if (msg == "Sugar") { context.become(friendly)}

}

def friendly: Receive = {case msg: String => println("received " + msg + " Thank you very much !")

}}

object Example_12 extends App { val system = ActorSystem("MySystem") val changingActor = system.actorOf(Props[ChangingActor], name = "changingActor")

val sender = actor(system)(new Act { whenStarting {

changingActor ! "Blubber"changingActor ! "Sugar"changingActor ! "BlaBla"

} }) system.shutdown

}

import context._...become(friendly)

val myDSLActor = actor(new Act {become {

case _ => println("Got a msg")}

})

Aktor-DSL: mit become wird das vordefinierte Verhalten ersetzt

Seite 26

Aktoren: Nachrichten Senden und Empfangen

VerhaltensänderungenAktoren können ihr Verhalten ändern mit context.unbecome

import akka.actor.ActorSystemimport akka.actor.Actorimport akka.actor.Propsimport akka.actor.ActorRef

class PingPong(partner: ActorRef) extends Actor { import context._ def receive = {

case "Ping" => partner ! "Pong" become ({

case "Pong" => partner ! "Ping" unbecome }) }}

class PongPing(partner: ActorRef) extends Actor { import context._ def receive = {

case "Pong" => partner ! "Pong" become ({ case "Ping" => partner ! "Ping" unbecome })

}}object Example_13 extends App { val system = ActorSystem("MySystem") val pingpong: ActorRef = system.actorOf(Props(new PingPong(pongping)), name = "pingpong") val pongping: ActorRef = system.actorOf(Props(new PongPing(pingpong)), name = "pongping") pingpong ! "Ping"}

mit unbecome wird das ursprüngliche Verhalten wieder eingesetzt

Seite 27

Aktoren: Nachrichten Senden und Empfangen

sendentell / ! : Senden und vergessen

Asynchrone Nachrichtenzustellung. Der Sender wird durch das Senden nicht blockiert.

import akka.actor.Actorimport akka.actor.Propsimport akka.actor.ActorSystemimport akka.actor.ActorDSL._

object Example_14 extends App { val system = ActorSystem("MySystem")

val act_1 = actor(system)(new Act { whenStarting { act_2 ! "How are you?" } context.become { case s:String => println("Actor " + sender + " is " + s) }

}) val act_2 = actor(system)(new Act {

context.become { case "How are you?" => sender.tell("Fine") //oder: sender.tell("Fine", self) }

}) system.shutdown

}

Actor Actor[akka://MySystem/deadLetters] is Fine

Die Aktor-Referenz des Senders wird bei Verwendung von ! implizit mit gesendet. Auf sie kann beim Empfänger über sender zugegriffen werden. Bei tell ist sie optionales zweites Argument.

Nachricht ohne Absenderangabe

Seite 28

Aktoren: Nachrichten Senden und Empfangen

sendenask / ? : Senden mit Erzeugung eines Future-Objekts für die Antwort

Asynchrone Nachrichtenzustellung. Der Sender wird durch das Senden nicht blockiert.

Es wird dabei ein Future-Objekt erzeugt, das die Antwort enthalten wird (eventuell irgendwann)

import akka.actor.Actorimport akka.actor.Propsimport akka.pattern.askimport scala.concurrent.duration._import akka.util.Timeoutimport scala.actors.Futureimport scala.concurrent.Awaitimport akka.actor.ActorSystemimport akka.actor.ActorDSL._

class Fib extends Actor { implicit val timeout: Timeout = 15 seconds def receive = { case x:Int => if (x <= 2) { sender ! 1 } else { // version 1 val future_1 = context.actorOf(Props[Fib], "f_1").ask(x-2)//implicit argument:(15 seconds) val v1 = Await.result(future_1, timeout.duration).asInstanceOf[Int] // version 2 val helper_2 = context.actorOf(Props[Fib], "f_2") val future_2 = helper_2 ? (x-1) //implicit argument:(15 seconds) val v2 = Await.result(future_2, 15 seconds).asInstanceOf[Int] sender ! (v1 + v2) } }}

object Example_15 extends App {

val system = ActorSystem("MySystem") val fibActor = system.actorOf(Props[Fib], name = "fibActor") val startActor = actor(system)(new Act { whenStarting { fibActor ! 20 } become { case v => println("Result: " + v) system.shutdown } }) }

Result: 6765

Seite 29

Aktoren: Nachrichten Senden und Empfangen

sendenask / ? : Senden mit Erzeugung eines Future-Objekts für die Antwort

Beispiel 2

import akka.actor.Actorimport akka.actor.Propsimport akka.pattern.askimport scala.concurrent.duration._import akka.util.Timeoutimport scala.actors.Futureimport scala.concurrent.Awaitimport akka.actor.ActorSystemimport akka.actor.ActorDSL._

class Fibo extends Actor {

implicit val timeout: Timeout = 15 seconds import context.dispatcher

def receive = { case x:Int => if (x <= 2) { sender ! 1 } else { sender ! ( for { v1 <- context.actorOf(Props[Fibo], "f_1").ask(x-1).mapTo[Int] v2 <- context.actorOf(Props[Fibo], "f_2").ask(x-2).mapTo[Int] } yield v1+v2 ) } } }

object Example_16 extends App { val system = ActorSystem("MySystem") val fibActor = system.actorOf(Props[Fibo], name = "fibActor") val startActor = actor(system)(new Act { whenStarting { fibActor ! 20 } become { case v => println("Result: " + v) system.shutdown } }) }

Seite 30

Aktoren: Nachrichten zur Seite legen

empfangen und weglegenstash / unstash

Nachrichten, die momentan nicht bearbeitet werden können, werden zurück gelegt und später wieder hervor geholt.

class Buffer[T](size: Int) extends Actor with Stash {

var count = 0; var placeToPut = 0; var placeToTake = 0; val items = new ArraySeq[T](size)

def receive = {

case PutMsg(data:T) => { if (count == size) { stash } else { items(placeToPut) = data placeToPut = (placeToPut + 1) % size count = count + 1; sender ! OKMsg unstashAll() } }

case GetMsg => { if (count == 0) { stash } else { sender ! AnswerMsg(items(placeToTake)) placeToTake = (placeToTake + 1) % size count = count-1 unstashAll() } } }}

synchronisierter Puffer mit beschränkter Kapazität.

Für Aktoren mit Stash muss die Mailbox explizit konfiguriert werden. Z.B. mit der Datei application.conf mit dem Inhalt:

DequeBasedMailboxDispatcher { mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" }

im Wurzelverzeichnis des Klassenpfades (bin-Verzeichnis bei Eclipse-Projekten).