Stream Processing Plattformen & die Qual der Wahl · And some mind breaking Bulletpoints 1 •...

Preview:

Citation preview

Stream Processing Plattformen & die Qual der Wahl_

Matthias Niehoff

Die Basics

2

Warum Stream Processing?_

3

•Infinite and continuous data

Unendliche, kontinuierliche Daten_

4

•And some mind breaking Bulletpoints 1

•And some mind breaking Bulletpoints 2

•And some mind breaking Bulletpoints 3 • Or some great Sub-Bulletpoints 1 • Or some great Sub-Bulletpoints 2 • Or some great Sub-Bulletpoints 3

• And some mind breaking Bulletpoints 4 • And some mind breaking Bulletpoints 5

Geschwindigkeit & Real Time_

5

Erst Verarbeiten, dann speichern_

6

Persistenz

Query

Persistenzstream processing

stream processing

stream processing

•Unbegrenzter Datenstrom

•Kontinuierliche Verarbeitung, Aggregation und Analyse

•MapReduce ähnliches Verarbeitungsmodell

•In-Memory Verarbeitung

•Latenz im Bereich von Millisekunden oder Sekunden

•Skalieren durch Verteilen

•Häufig modelliert als DAG

Distributed Stream Processing_

7

• Eventzeit: • Zeitpunkt, an dem das Event aufgetreten ist

• Verarbeitungszeit: • Zeitpunkt, an dem das Event vom System beobachtet wurde

Eventzeit vs. Verarbeitungszeit_

8

Event

Verarbeitung

1 2 3 4 5 6 7 8 9t in Minuten

•Differenz ist nicht nur != 0

•Differenz schwankt stark • Ressourcen bedingt (CPU, Netzwerk,..) • Software bedingt (verteilte Systeme..) • Daten bedingt (Schlüsselverteilung, Varianzen in Daten selbst)

• Analyse nach Verarbeitungszeit • einfacher aber ggfs. zu ungenau

• Analyse nach Eventzeit • komplexer, dafür genauer

Eventzeit vs. Verarbeitungszeit_

9

•Nicht triviale Anwendungen benötigen meist einen State • z.b. Aggregationen über einen längeren / unendlichen Zeitraum • (input, state) -> (output, state’) • gespeichert in Memory • interessant im Fehlerfall

State & Window Verarbeitung_

10

• Window als (zeitlich) begrenzter State • Tumbling Window • Sliding Window • Session Window

• Unterschiedliche Trigger • Zeit • Anzahl

Windowing & Sliding_

11

Tumbling Window_

12

Sliding Window_

13

Session Window_

14

Zeit

User 1

User 2 Inaktivität

Inaktivität

•Mit Verarbeitungszeit einfach

•Mit Eventzeit schwerer • Vollständigkeit (out of order Events) • Buffering

• Strategien bei Eventzeit Windows • Watermarks • Trigger • Akkumulation

• Mehr Informationen • https://www.oreilly.com/ideas/the-world-beyond-batch-

streaming-101 • http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

Window Verarbeitung und Zeiten_

15

Die Kandidaten

16

•Gestartet 2010 durch BackType/Twitter, Apache seit 2013

•Pionier im Big Data / Stream Bereich

•Technologie der Lambda Architektur

•Low Level API

•Spouts und Bolts beschreiben eine Topologie

•Trident: High Level Erweiterung auf Storm Basis • Aggregationen • State & Window Operationen • Join, Merge, Group, --

Apache Storm_

17

•Open Source (2010) & Apache Projekt (2013)

•Einheitliche Batch & Stream Verarbeitung

•Breite Akzeptanz

•RDD als Basis

Apache Spark_

18

•Entwickelt bei LinkedIn, Open Source 2013

•Verfolgt den Log Ansatz von Kafka

•Ausgeführt auf YARN

•Geeignet für große States

•Erweiterbar über APIs

Apache Samza_

19

•Gestartet 2008 als europäisches Forschungsprojekt

•Low Latency Streaming und High Throughput Batch Processing

•Flexible States und Windows

•Streaming First Ansatz

Apache Flink_

20

Die Analyse

21

•Runtime

•Programming Model

•Skalierbarkeit

•Latenz

•Durchsatz

•Resilienz / Delivery Guarantees

•Reife

•Community

Aspekte von Streaming Anwendungen_

22

Runtime - Native Streaming_

23

Empfänger

Verarbeitung

Senke

geringe Latenz geringer Durchsatzflexibel Fehlertoleranz komplexer

Lastenverteilung komplexer

Runtime - Microbatching_

24

Empfänger

Verarbeitung

Senke

Microbatches

hoher Durchsatz höhere Latenzeinfacher Fehlertolerant weniger Flexibel (z.B. Windows)

State Verarbeitung komplexer

•Operatoren und Quellen als Komponenten

•Eigene Komponenten

•manuelle Topologie Definition

Programmiermodell_

25

Komponentenbasiert

•High Level API

•Higher Order Functions

•Abstrakte Datentypen

•Fortgeschrittene Operationen inkludiert

•Eingebaute Optimierungen

Deklarativ

Word Count - Flink_

26

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvaltext=env.socketTextStream("localhost",9999)

valcounts=text.flatMap(_.toLowerCase.split("\\W+")).filter(_.nonEmpty).map(_,1).groupBy(0).sum(1)

counts.print

env.execute("ScalaSocketStreamWordCount")

Word Count - Spark_

27

valsparkConf=newSparkConf().setAppName("StreamingWordCount")valssc=newStreamingContext(sparkConf,Seconds(1))ssc.checkpoint(".")

valmappingFunc=(key:String,value:Option[Int],state:State[Int])=>{valsum=value.getOrElse(0)+state.getOption.getOrElse(0)valoutput=(key,sum)state.update(sum)output}

valwordCountState=StateSpec.function(mappingFunc)

Word Count - Spark_

28

vallines=ssc.socketTextStream(args(0),args(1).toInt)valwords=lines.flatMap(_.split(""))valwordsWithCount=words.map(x=>(x,1))valstateDstream=wordsWithCount.mapWithState(wordCountState)stateDstream.print()ssc.start()ssc.awaitTermination()

Word Count - Storm_

29

TopologyBuilderbuilder=newTopologyBuilder();

builder.setSpout("spout",newRandomSentenceSpout(),5);

builder.setBolt("split",newSplitSentence(),8).shuffleGrouping(„spout");

builder.setBolt("count",newWordCount(),12).fieldsGrouping("split",newFields("word"));

Word Count - Storm_

30

Configconf=newConfig();conf.setMaxTaskParallelism(3);

LocalClustercluster=newLocalCluster();cluster.submitTopology("word-count",conf,builder.createTopology());Thread.sleep(10000);cluster.shutdown();

Word Count - Storm_

31

publicstaticclassWordCountextendsBaseBasicBolt{Map<String,Integer>counts=newHashMap<String,Integer>();publicvoidexecute(Tupletuple,BasicOutputCollectorcollector){Stringword=tuple.getString(0);Integercount=counts.get(word);if(count==null)count=0;count++;counts.put(word,count);collector.emit(newValues(word,count));}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("word","count"));}}

Word Count - Storm Trident_

32

TridentTopologytopology=newTridentTopology();TridentStatewordCounts=topology.newStream("spout1",spout).each(newFields("sentence"),newSplit(),newFields("word")).groupBy(newFields("word")).persistentAggregate(newMemoryMapState.Factory(),newCount(),newFields("count")).parallelismHint(6);

Trident

Word Count - Storm Trident_

33

publicclassSplitextendsBaseFunction{publicvoidexecute(TridentTupletuple,TridentCollectorcollector){Stringsentence=tuple.getString(0);for(Stringword:sentence.split("")){collector.emit(newValues(word));}}}

Trident

Word Count - Samza_

34

classWordCountTaskextendsStreamTaskwithInitableTask{

privatevarstore:CountStore=_

definit(config:Config,context:TaskContext){this.store=context.getStore("wordcount-store").asInstanceOf[KeyValueStore[String,Integer]]}

Word Count - Samza_

35

overridedefprocess(envelope:IncomingMessageEnvelope,collector:MessageCollector,coordinator:TaskCoordinator){

valwords=envelope.getMessage.asInstanceOf[String].split("")

words.foreach{key=>valcount:Integer=Option(store.get(key)).getOrElse(0)store.put(key,count+1)collector.send(newOutgoingMessageEnvelope(newSystemStream("kafka","wordcount"),(key,count)))}}

•Maximal mögliche Garantien

•Beeinflussen Performance

•Nicht in jeder Kombination möglich (abhängig von Quelle)

Zustellungsgarantien_

36

Trident

At-least-once Exactly-once* Exactly-once* At-least-once Exactly-once*

•Abhängig von der Runtime

•Höhere Latenz --> höherer Durchsatz

Latenz & Durchsatz_

37

~50ms 500ms 30.000ms

Samza

Storm

Flink

Spark Streaming

Storm Trident

Custom

Latenz & Durchsatz_

38

https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at

Throughput

Late

nz

•Viele Variablen, Unparteiische Tests schwierig

•Latenz vs. Durchsatz

•Delivery Guarantees

•Fehlertoleranz

•Tuning

•Netzwerk, Daten Lokalität, Serialisierung

Performance_

39

•Skalieren durch Partitionierung • Partitionieren der Daten • Partitionieren des Flows

Skalierbarkeit_

40

•Erneute Verarbeitung nicht einfach möglich

•Anfang und Ende schwer zu bestimmen

•State muss auch gesichert werden

•Verschiedene Ansätze • Record Ack • Micro Batching • Transactional Updates • Snapshots

Fault Tolerance_

41

Fault Tolerance - Storm_

42

Ack Ack

Ack

Ack

AckAck

•Fehlgeschlagene Microbatches werden wiederholt

•Batch Acknowledge statt Record Acknowledge

• Checkpoints für States

Fault Tolerance - Spark & Storm Trident_

43

•Transaktionale Updates auf Transaction Log

•Kafka als Transaction Log

Fault Tolerance - Samza_

44

partition 0

partition 1

partition 2

Checkpoint partition 0: offset .. partition 1: offset .. partition 2: offset ..

SamzaKafka

Kafka

•Distributed Checkpoints

Fault Tolerance - Flink_

45

•Native Eventzeitverarbeitung nur in Flink • Out-of-order Events • Watermarks • Trigger

• Eventzeit als Key in anderen Framework möglich • Keine out-of-order Events

Event- & Verarbeitungszeit_

46

finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Das Ergebnis

47

Überblick_

48

Trident

Runtime Nativ Microbatching Microbatching Nativ Nativ

Programmier-modell Komponentenbasiert Deklarativ Komponenten

basiert Deklarativ

Durchsatz Gering Mittel Hoch Hoch Hoch

Latenz Gering Mittel Mittel Gering Gering

Garantien At-least-once Exactly-once* Exactly-once* At-least-once Exactly-once*

Eventzeit Handling Nein Nein Nein Nein Ja

Reife & Community Hoch Hoch Hoch Mittel Mittel

•bereits Spark Batch Anwendungen vorhanden sind

•viele Umsysteme integriert werden

•eine große Community wichtig ist

•Scala kein Problem ist

•Latenz kein Kriterium ist

Spark wenn..._

49

•Sehr niedrige Latenz, niedriges Volumen

•At-Least Once Verarbeitung

•Zustandslose Verarbeitung

•Ggfs. Heron als Alternative

Storm für ..._

50

•Kafka ist omnipräsent

•Große States

•Kein Exactly Once

•Kafka Streams als Alternative

Samza wenn ..._

51

•Für Eventzeit Verarbeitung

•Für pures Streaming

•Sehr gute Konzepte

•Etwas weniger Umsysteme

•Nutzen und Mitarbeit an einem jungen Projekt

Und Flink ..._

52

•Apache Beam • High Level API für Streaming Runner

•Google Cloud Data Flow • Googles Cloud Streaming Framework; Beam Implementierung

•Apex • YARN based direct-streaming with checkpointing

•Flume • Logfile Streaming insb. in HDFS

•Kafka Streams • Streaming integriert in Kafka ab 0.10, einfache Anwendungen

• Heron • Storm Nachfolger, API kompatibel, verbesserter Throughput &

Latency

Ein Satz zu_

53

Questions?

Matthias Niehoff, IT-Consultant

90

codecentric AG Zeppelinstraße 2 76185 Karlsruhe, Germany

mobil: +49 (0) 172.1702676 matthias.niehoff@codecentric.de

www.codecentric.de blog.codecentric.de

matthiasniehoff

• Logfile: Linux Screenshots, Flickr

• Sensors, IT Network: Wikipedia

• Devices: Brad Forst, Flickr

• Speed: Rool Paap, Flickr

• Graph: Wikipedia

• Stateful Processing: data-artisans.com

• Window & Sliding Windows, Flink Übersicht, Flink Fault Tolerance: Apache Flink

• Storm Topologien: Apche Storm

• Spark Übersicht: Apache Spark

• Samza Übersicht: Apache Samza

• Unendliche Daten: https://i.ytimg.com/vi/9rE3kbGmP4w/maxresdefault.jpg

Picture Reference_

55

Recommended