Akka - Monadische Komposition im Workflow

  • View
    125

  • Download
    4

  • Category

    Science

Preview:

DESCRIPTION

Presentation about a workflow system for digitization processes that faciliates monadic composition provided by the Akka framework for a decoupled, scalable, resilient and asynchronous toolchain. Developed at the University Library of Würzburg (project Libri Sancti Kiliani, http://vb.uni-wuerzburg.de)

Citation preview

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

● Digitalisierungszentrum der Universitätsbibliothek Würzburg

Wer

Was

Funktionale Anforderungen

● Retrodigitalisierung● Metadaten-Anreicherung und -Pflege● Export*) und Archivierung

● Qualität maximieren● automatisiertes Qualitätsmanagement● Metadaten gewinnen● direktes Feedback● hohe Fehlertoleranz● hohe Performance (viel IO, hohe CPU-

Lasten für Analysen)

*) http://vb.uni-wuerzburg.de/

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Iacobus de Therinis (M.p.th.q.35, ca 1450)http://vb.uni-wuerzburg.de/ub/mpthq35/ueber.html

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Defensor Locogiacensis (M.p.th.f.13, ca 750)http://vb.uni-wuerzburg.de/ub/mpthf13/ueber.html

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

● Digitalisierungszentrum der Universitätsbibliothek Würzburg

Wer

Was

Funktionale Anforderungen

● Retrodigitalisierung● Metadaten-Anreicherung und -Pflege● Export*) und Archivierung

*) http://vb.uni-wuerzburg.de/

● Qualität maximieren● automatisiertes Qualitätsmanagement● Metadaten gewinnen● direktes Feedback● hohe Fehlertoleranz● hohe Performance (viel IO, hohe CPU-

Lasten für Analysen)

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●●●

●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

...

EIP: “Pipes and Filters”Beispiel (Handschrift)

analysis modules information retrievalmodules

exportmodules

blurdetection

angleanalysis

ToCextraction

onlinepublishing

archive

QuelleZiel

...Zeitachse

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

...

EIP: “Pipes and Filters”Beispiel (Handschrift)

analysis modules information retrievalmodules

exportmodules

blurdetection

angleanalysis

ToCextraction

onlinepublishing

archive

QuelleZiel

Funktionale Anforderungen:● Performance● Robustheit

...Zeitachse

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

a: AKlassischer Workflow

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

a: A b: Bf: A → BKlassischer Workflow

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

a: A b: Bf: A → BKlassischer Workflow

c: Cg: B → C ...

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

a: A b: Bf: A → B

ma: M<A>

return

Klassischer Workflow

Monadischer Workflow

c: Cg: B → C ...

Monade return → bind → →

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

a: A b: Bf: A → B

ma: M<A>

return

Klassischer Workflow

Monadischer Workflow

f’: (M<A> x (A → M<B>)) → M<B>

mb: M<B>bind

c: Cg: B → C ...

Monade return → bind → →

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

a: A b: Bf: A → B

ma: M<A>

return

Klassischer Workflow

Monadischer Workflow

f’: (M<A> x (A → M<B>)) → M<B>

mb: M<B>bind

c: Cg: B → C ...

Monade return → bind → →

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

a: A b: Bf: A → B

ma: M<A>

return

Klassischer Workflow

Monadischer Workflow

f’: (M<A> x (A → M<B>)) → M<B>

mb: M<B>bind

returnreturn

g’: (M<B> x (B → M<C>)) → M<C>

ma: M<C>bind

c: Cg: B → C ...

...

Monade return → bind → →

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

...

exportmodules

blurdetection

angleanalysis

ToCextraction

onlinepublishing

archive

Quelle

Ziel

...Zeitachse

information retrievalmodules

analysis modules

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

...

exportmodules

blurdetection

angleanalysis

ToCextraction

onlinepublishing

archive

Quelle

Ziel

...Zeitachse

returnM<A>

information retrievalmodules

analysis modules

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

...

exportmodules

blurdetection

angleanalysis

ToCextraction

onlinepublishing

archive

Quelle

Ziel

...Zeitachse

returnM<A>

M<B>

bind

information retrievalmodules

analysis modules

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

...

exportmodules

blurdetection

angleanalysis

ToCextraction

onlinepublishing

archive

Quelle

Ziel

...Zeitachse

returnM<A>

M<B>

bind

information retrievalmodules

analysis modules

bind

M<X>

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●○○

●○

●○○

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●○○

●○

●○○

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Fork-Join-Executor Threadpool-Executor

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Fork-Join-Executor Threadpool-Executor

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Fork-Join-Executor Threadpool-Executor

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Fork-Join-Executor Threadpool-Executor

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Fork-Join-Executor Threadpool-Executor

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Fork-Join-Executor Threadpool-Executor

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

● …●

○●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●

●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●

●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Quelle: https://www.packtpub.com/books/content/dispatchers-and-routers

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Quelle: http://letitcrash.com/post/17607272336/scalability-of-fork-join-pool

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

Quelle: http://letitcrash.com/post/17607272336/scalability-of-fork-join-pool

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

// dispatcher for p2p file transferfile-transfer {

executor = "thread-pool-executor"thread-pool-executor {

core-pool-size-min = 1core-pool-size-factor = 0.0core-pool-size-max = 1

}type = Dispatchertroughput = 5000 // give tcp slow start a chance …

}

...remote {

netty.tcp {port = 1337 maximum-frame-size = 10 MiBsend-buffer-size = 5 MiBreceive-buffer-size = 5 MiB

}}

val myActor = context.actorOf(Props[MyActor].withDispatcher("file-transfer"), "myactor2")

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

// dispatcher for fine-grained parallelism incl. work stealingimage-analysis { type = Dispatcher executor = "fork-join-executor" fork-join-executor { parallelism-min = 4 parallelism-factor = 1 parallelism-max = 16 }}

val myActor = context.actorOf(Props[MyActor].withDispatcher("image-analysis"), "myactor2")

// don’t do that → will run in default dispatcherval dummyActor = context.actorOf(Props[MyActor], "dummyActor")

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●○○

●○○

●○

●○

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

● …val probe = new TestProbe(system) {

def expectUpdate(x: Int) = {expectMsgPF() {case Update(id, _) if id == x => true

}sender() ! "ACK"}

}

val probe = TestProbe()probe.setAutoPilot( new TestActor.AutoPilot { def run(sender: ActorRef, msg: Any): TestActor.AutoPilot = msg match { case "stop" ⇒ TestActor.NoAutoPilot case x ⇒ testActor.tell(x, sender); TestActor.KeepRunning }})

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

repository.createIndex( config, context ).andThen( doTestQuery( repository, config.getLabel() ), context ).andThen( shutdown(context), context );

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

repository.createIndex( config, context ).andThen( doTestQuery( repository, config.getLabel() ), context ).andThen( shutdown(context), context );

public Future<Boolean> createIndex(final IndexSetup setup, finalExecutionContextExecutorService ec) {

return future(new Callable<Boolean>(){

@Overridepublic Boolean call() throws Exception{

return createIndex(setup);}

}, ec);}

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

owner.tell( sr, getSelf() );

monitor.tell( new Update(status) );

worker.tell( PoisonPill.getInstance() );

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

●●●●

hendrik.schoeneberg@zuehlke.com01. Oktober 2014Hendrik Schöneberg

public class TarWorker extends Worker {

...

@Overridepublic void doWork(ActorRef workSender, Object work, final Promise<BoxedUnit> p)

throws Exception {if (work instanceof Tar) {

try {tar((Tar) work, workSender);p.success(BoxedUnit.UNIT);

} catch (Exception e) {p.failure(e);

}}

else {log().error("TarWorker#doWork(): can not compute message {}", work);p.failure(new IllegalStateException("Can not compute message: "

+ work));}

}

Recommended