48
1 Cloud Data Management Kapitel 6: MapReduce und Datenbanken Dr. Michael Hartung Sommersemester 2012 Universität Leipzig Institut für Informatik http://dbs.uni-leipzig.de

Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

  • Upload
    others

  • View
    0

  • Download
    0

Embed Size (px)

Citation preview

Page 1: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

1

Cloud Data Management

Kapitel 6: MapReduce undDatenbanken

Dr. Michael HartungSommersemester 2012

Universität LeipzigInstitut für Informatikhttp://dbs.uni-leipzig.de

Page 2: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

2

Inhaltsverzeichnis

• SQL-Anfrageformulierung mit MapReduce

• Joins mit MapReduce

• Data Warehousing mit MapReduce– Hive

• RDBMS + MapReduce– HadoopDB

Page 3: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

3

SQL-Anfrageformulierung mit MapReduce

• (manuelle) Umschreibung SQL → MapReduce

• Beispiel: CouchDB– Dokumenten-orientierte Datenbank

– kein Schema

– Dokumente in JSON-Format

• Anfragen durch View-Definitionen– Definition von map- und reduce-Funktion in Javascript (und anderen Sprachen)

Page 4: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

4

{“_id”:”1”, "name":"fish.jpg",”time":”17:46","user":"bob“,"camera":"nikon","info":{"width":100,"height":200,"size":12345},"tags":["tuna","shark"]}

{“_id”:”2”, "name":"trees.jpg",“time":”17:57”,"user":"john”,"camera":"canon","info":{"width":30,"height":250,"size":32091},"tags":["oak"]}

....

Beispieldaten

• Konzeptionell: Repräsentation als verschachtelte Tabelle

• Intern: Repräsentation als Dokumentenmenge (JSON-Format)

id name time user camera info tags

width height size

1 fish.jpg 17:46 bob nikon 100 200 12345 [tuna, shark]

2 trees.jpg 17:57 john canon 30 250 32091 [oak]

3 snow.png 17:56 john canon 64 64 1253 [tahoe, powder]

4 hawaii.png 17:59 john nikon 128 64 92834 [maui, tuna]

5 hawaii.gif 17:58 bob canon 320 128 49287 [maui]

6 island.gif 17:43 zztop nikon 640 480 50398 [maui]

Quelle: http://labs.mudynamics.com/wp-content/uploads/2009/04/icouch.html

Page 5: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

5

Selektion

• Selektion = Bedingung für Attributwert(e)– SQL: ... WHERE attr = “xy”

• MapReduce– map: Prüfung durch IF-Bedingung, Ausgabe der selektierten Dokumente

– reduce: Id-Funktion

• Beispiel– SQL: SELECT * FROM table WHERE user = “bob”

id name time user camera info tags

width height size

1 fish.jpg 17:46 bob nikon 100 200 12345 [tuna, shark]

5 hawaii.gif 17:58 bob canon 320 128 49287 [maui]

Page 6: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

6

Selektion: Beispiel

{id:1,user:bob ...}

{id:2,user:john ...}

{id:3,user:john ...}

{id:4,user:john ...}

{id:5,user:bob ...}

{id:6,user:zztop...}

map

shuf

fle +

sor

t

redu

ce

key value

null {id:1 ...}

null {id:5 ...}

key values

null [{id:1 ...}, {id:5 ...}]

[{id:1 ...},{id:5 ...}]

map

function (doc) {

if (doc.user == “bob”)

emit (doc.id, doc);

}

reduce

function (key, values) {

return values;

}

emit (null, doc);

key value key values [{id:1 ...}]

[{id:5 ...}]

map

shffl

+srt

redu

ceAlternative

Page 7: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

7

Projektion

• Projektion = Einschränkung des Ergebnisses auf Attribute– SQL: SELECT Attr1, Attr2 FROM ...

• MapReduce– map: Generierung eines neuen Dokuments

– reduce: Id-Funktion

• Duplikateliminierung– map: Key = Attribut(kombination)

– reduce: Ausgabe des ersten Values

• Beispiel– SQL: SELECT (DISTINCT) user FROM table

user

bob

john

john

john

bob

zztop

user

bob

john

zztop

Page 8: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

8

Projektion: Beispiel (ohne Duplikateliminierung)

{id:1,user:bob ...}

{id:2,user:john ...}

{id:3,user:john ...}

{id:4,user:john ...}

{id:5,user:bob ...}

{id:6,user:zztop...}

map

redu

ce

key value

1 {user:bob }

2 {user:john}

3 {user:john}

4 {user:john}

5 {user:bob}

6 {user:zztop}

[{user:bob }]

[{user:john}]

[{user:john}]

[{user:john}]

[{user:bob}]

[{user:zztop}]

map

function (doc) {

emit(doc.id,{“user”:doc.user});

}reduce

function (key, values) {

return values;

}

key values

1 [{user:bob }]

2 [{user:john}]

3 [{user:john}]

4 [{user:john}]

5 [{user:bob}]

6 [{user:zztop}]

shuf

fle +

sor

t

Page 9: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

9

Projektion: Beispiel (mit Duplikateliminierung)

{id:1,user:bob ...}

{id:2,user:john ...}

{id:3,user:john ...}

{id:4,user:john ...}

{id:5,user:bob ...}

{id:6,user:zztop...}

map

redu

ce

key value

bob {user:bob }

john {user:john}

john {user:john}

john {user:john}

bob {user:bob}

zztop {user:zztop}

[{user:bob }]

[{user:john}]

[{user:zztop}]

map

function (doc) {

emit(doc.user,{“user”:doc.user});

}reduce

function (key, values) {

return values[0];

}

key values

bob [{user:bob }, {user:bob }]

john [{user:john}, {user:john}, {user:john}]

zztop [{user:zztop}]shuf

fle +

sor

t

Page 10: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

10

Gruppierung und Aggregatfunktion

• Gruppierung– Zusammenfassen von Datensätzen mit gleichen Attributwerten

– Bildung aggregierter Attributwerte pro Gruppe durch Aggregatfunktionen (z.B. SUM)

• MapReduce– map: Gruppierungsattribut(e) als Key

– reduce: Anwendung der Aggregatfunktion

• Beispiel– SELECT camera, AVG(info.size) as avgsize

FROM TableGROUP BY camera

camera avgsize

canon 27543.3

nikon 51859

Page 11: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

11

Gruppierung und Aggregatfunktion: Beispiel

{id:1,user:bob ...}

{id:2,user:john ...}

{id:3,user:john ...}

{id:4,user:john ...}

{id:5,user:bob ...}

{id:6,user:zztop...}

map

redu

ce

key value

nikon 12345

canon 32091

canon 1253

nikon 92834

canon 49287

nikon 50398

27543.3

51859

map

function (doc) {

emit(doc.camera,

doc.info.size);

}

reduce

function (key, values) {

sum = 0;

for (i=0; i<values.length; i++) {

sum = sum + values[i];

}

return sum/values.length;

}

key values

shuf

fle +

sor

t

Page 12: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

12

Equi-Join + Mehrwertiges Attribut

• Equi-Join = Verknüpfung zweier Relationen über Attributgleichheit– SQL: ... WHERE Tab1.Attr1 = Tab2.Attr2

• Mehrwertige Attribute in 1NF – 1:N/N:M-Beziehung = weitere Relation(en), die durch Join verknüpft werden

• MapReduce– map: Join-Attribut als Key

– reduce: Iteration über Paare

• Beispiel (SQL)– SELECT Tab1.name AS name1, Tab2.name AS name2

FROM table AS Tab1, table AS Tab2WHERE Tab1.name < Tab2.nameAND EXISTS (SELECT tag FROM TagTab WHERE TagTab.id=Tab1.idINTERSECTSELECT tag FROM TagTab WHERE TagTab.id=Tab2.id

)

name1 name2

fish.jpg 17:46

trees.jpg 17:57

snow.png 17:56

hawaii.png 17:59

hawaii.gif 17:58

island.gif 17:43

hawaii.png island.gif

hawaii.gif hawaii.png

hawaii.gif island.gif

Page 13: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

13

Equi-Join + Mehrwertiges Attribut: Beispiel (1)

map

function (doc) {

for (i=0; i<doc.tags.length; i++) {

emit (doc.tags[i], doc.name);

}

reduce

function (key, values) {

var result = new Array();

for (i=0; i<values.length; i++) {

for (k=0; k<values.length; k++) {

if (values[i]<values[k] {

result.push ({name1:values[i], name2:values[k]});

}

}

}

return result;

}

Page 14: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

14

Equi-Join + Mehrwertiges Attribut: Beispiel (2)

{id:1,...}

{id:2,...}

{id:3,...}

{id:4,...}

{id:5,...}

{id:6,...}

map

redu

ce

key value

tuna fish.jpg

shark fish.jpg

oak tree.jpg

tahoe snow.png

powder snow.png

maui hawaii.png

tuna hawaii.png

maui hawaii.gif

maui island.gif

key value

maui [hawaii.png, hawaii.gif, island.gif]

oak [tree.jpg]

power [snow.png]

shark [fish.jpg]

tahoe [snow.png]

tuna [fish.jpg, hawaii.png]

shuf

fle +

sor

t

Page 15: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

15

Inhaltsverzeichnis

• SQL-Anfrageformulierung mit MapReduce

• Joins mit MapReduce

• Data Warehousing mit MapReduce– Hive

• RDBMS + MapReduce– HadoopDB

Page 16: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

16

Join-Realisierung mit MapReduce

• Join ist wichtige und teure Datenbank-Operation– unterschiedliche Join-Arten (Natural, Outer, ...), Anzahl beteiligter Relationen

– Fokus im Folgenden: Natural Join zwischen R und S

• Häufiger Anwendungsfall für WebApps: Logfile-Auswertung– Logfile ⋈ User über Attribut UserId

– Logfile (#Klicks) meist deutlich größer als Referenztabelle (#User)

– Geringe Join-Selektivität (nur x% der User pro Tag auf Website)

• Join-Performanz u.a. abhängig von– gleichmäßiger Lastbalancierung der Knoten

→ siehe Lastbalancierung bei Entity Matching

– Datenmenge, die zwischen Map- und Reduce-Phase sortiert und transferiert wird(ggf. unter Berücksichtigung der Lokalität)

• Verschiedene Verfahren– Repartition Join

– Broadcast Join

– Semi-JoinQuelle diese und nächste Folien: [MRJoin]

Page 17: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

17

Repartition Join

a b

a1 3

a2 3

a3 3

a4 2

a5 3

a6 2

a7 2

a8 2

b c

1 c12 c23 c34 c4

(3, R:a1)

(3, R:a2)

(3, R:a3)

(2, R:a4)

(3, R:a5)

(2, R:a6)

(2, R:a7)

(2, R:a8)

(1, S:c1)

(2, S:c2)

(3, S:c3)

(4, S:c4)

a b c

a4 2 c2a6 2 c2a7 2 c2a8 2 c2a1 3 c3a2 3 c3a3 3 c3a5 3 c3

map

shuf

fle +

sor

t

redu

ce

• Naiver Ansatz– map: Key=Join-Attribut, Value=Relationsname+Attribute

– reduce: Alle Paare mit unterschiedlichem Relationsnamen

R

S

Page 18: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

18

Repartition Join: Nachteile

• Alle Daten werden zwischen Map- und Reduce-Phase sortiert und an die Reduce Tasks geschickt – Verbesserung durch Broadcast Join, Semi-Join (nächste Folien)

• Reducer muss alle N Datensätze pro Key puffern– keine Reihenfolge bzgl. Input-Relation, da nur nach Join-Attribut sortiert

– Hadoop-Implementierung erlaubt nur sequentiellen Datenzugrifff

• Lösung– Erweiterung des Map-Keys um Relationsname, Gruppierung nur nach Join-Attribut

– Sortierung derart, dass Keys der kleineren Relation (S) vor Keys der größeren Relation (R) stehen → Reduce muss nur noch Datensätze von S puffern

• Beispielnormal(2, R:a4)

(2, S:c2)

(2, R:a6)

(2, R:a7)

(2, R:a8)

erweiterter Key(S:2, c2)

(R:2, a4)

(R:2, a6)

(R:2, a7)

(R:2, a8)

Page 19: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

19

Broadcast-Join

• Idee– Kleinere Relation (S) als

zusätzlichen map Input

– Join nur in map-Phase, kein Reduce notwendig

• Notwendiger Datentransfer – kleinere Relation muss an alle

n Knoten geschickt werden → n⋅|S| Datensätze

– kein Transfer von R, da jeder map-Task seine “lokale” map-Partition bearbeitet

• Vergleich Repartition-Join– beide Relationen werden auf alle reduce-

Tasks aufgeteilt → |R|+|S| Datensätze

• Dynamische Entscheidung möglich, welche Relation kleiner und ob Broadcast-Join vorteilhaft ist

a b

a1 3

a2 3

a3 3

a4 2

a5 3

a6 2

a7 2

a8 2

b c

1 c12 c23 c34 c4

map

R

S

a b c

a1 3 c3a2 3 c3a3 3 c3a4 2 c2a5 3 c3a6 2 c2a7 2 c2a8 2 c2

Page 20: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

20

Semi-Join

• Neben Anzahl der Datensätze auch Größe der zu transferierenden Datensätze entscheidend– Größe (Join-Attribut) << Größe (Datensatz)

• Semi-Join-Idee: R ⋈ S = R ⋈ ( Πb (R) S ) mit Join-Attribut b– Realisierung durch drei Map-Reduce-Schritte

1. R’ = Πb (R) – map extrahiert Join-Attribut, 1 reduce task entfernt Duplikate

2. S’ = R’ S– Broadcast-Join mit “kleinerer Relation” R’

3. R ⋈ S’: – Broadcast-Join mit kleinerer Relation S’

Page 21: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

21

Semi-Join: Beispiel

a b

a1 3

a2 3

a3 3

a4 2

a5 3

a6 2

a7 2

a8 2

(3, ⊥)

(3, ⊥)

(3, ⊥)

(2, ⊥)

(3, ⊥)

(2, ⊥)

(2, ⊥)

(2, ⊥)

map

shuf

fle +

sor

t

redu

ce

R (2, ⊥)

(2, ⊥)

(2, ⊥)

(2, ⊥)

(3, ⊥)

(3, ⊥)

(3, ⊥)

(3, ⊥)

b

R’

b c

1 c12 c23 c34 c4

map

b c

S’S

a b

a1 3

a2 3

a3 3

a4 2

a5 3

a6 2

a7 2

a8 2

map

R a b c

a1 3 c3a2 3 c3a3 3 c3a4 2 c2a5 3 c3a6 2 c2a7 2 c2a8 2 c2

Page 22: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

22

Evaluation in [MRJoin] #Datensätze

(Relation S)

Repartition

(erw. Key)

Broad-

cast

0.3 Millionen 145 GB 6 GB

10 Millionen 145 GB 195 GB

300 Millionen 151 GB 6240 GB

• Broadcast für kleine S

• Repartitioning: Nutzen des erweiterten Keys

• Semi-Join muss zweimal(große) Relation R einlesen

Repartitioning

Repart. (erw. Key)

Broadcast

Semi-Join

• Datenmenge, die durch das Netzwerkgeschickt wird (oben)

• geringer als Map-Output (z.T. gleicher Knoten für map und reduceTask)

Page 23: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

23

Inhaltsverzeichnis

• SQL-Anfrageformulierung mit MapReduce

• Joins mit MapReduce

• Data Warehousing mit MapReduce

– Hive

• RDBMS + MapReduce– HadoopDB

Page 24: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

24

Vergleich: Hadoop/MR vs. Parallele DBSHadoop / MapReduce Shared Nothing-RDBMS

Datengröße PB TB-PB

Struktur Semistrukturierte Daten Statisches Schema

Partitionierung Blöcke in DFS (Byteweise) Horizontal

Anfrage MapReduce-Programme Deklarativ (SQL)

Zugriff Batch Punkt/Bereich via Indexes

Updates Write once read many times Read and write many times

Scheduling

Verarbeitung

Datenfluss

Fehlertoleranz

Skalierbarkeit Linear, unbegrenzt Linear (existierende Setups), begrenzt

HW-Umgebung Heterogen (preiswerte Standard-HW) Homogen (teure High-End-Hardware)

SW-Kosten Frei / Open Source Sehr teuer

Page 25: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

25

Vorteile: MapReduce vs. Parallele DBS

• Vorteile MapReduce– Skalierbarkeit/Fehlertoleranz

– Konfigurationsaufwand

– Kosten

– Kein initialer Ladevorgang

• Vorteile SN-DBS– Deklarative Anfragesprache

– Anfragen werden um Größenordnungen schneller beantwortet

– (Zzt.) Arbeit auf komprimierten Daten

– Random Access

• Typische Anwendungsfälle MapReduce– ETL

– Data-Mining, Data-Clustering

– Analyse semistrukturierter Daten (Web-Logs, …)

– Einmal-Analysen eines Datenbestandes

Page 26: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

26

Datenanalyse: Beispiel Facebook

• Facebook– 4TB komprimierte Daten pro Tag

– 135TB komprimierte Daten werden pro Tag analysiert

• Aggregationen– Anzahl Clicks/Pageviews pro Tag/Monat/...

• Ad-hoc-Analyse– Wieviele Foto-Uploads zu Neujahr in den USA pro County/State?

• Data Mining– Nutzerverhalten als Funktion von Attributen (#Pageviews, #Sessions, Zeit, ...)

• Spam-Erkennung– (Verdächtige) häufige Muster in UGC (user generated content)

• Auswertung / Optimierung von Werbung– Anzahl AdClicks pro Nutzertyp/...

Page 27: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

27

Hive

• Datenbank / Data Warehouse basierend auf Hadoop

• Hive = MapReduce + SQL– SQL ist einfach und weit verbreitet

– MapReduce skaliert sehr gut

• Automatische Übersetzung von SQL nach MapReduce nötig– Programme schwer zu warten, kaum Reuse

– Barriere für Nicht-Experten

– Fehlende Ausdrucksmächtigkeit, z.B. hoher Zeitaufwand, um simple Count/Avg-Anfragen in MapReduce zu realisieren

Quellen für diese und folgende Folien: [Hive], [Hive1], [Hive2], [Hive3]

Page 28: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

28

Hive: Übersicht

• Verwaltung und Analyse strukturierter Daten mit Hilfe von Hadoop– keine Online-Datenbank, hohe Latenzzeit

• Datenhaltung im HDFS, Metadaten für Abbildung auf Tabellen– Komplexe Datentypen (u.a. Listen, Maps)

– Direkter Zugriff auf Dateien und unterschiedliche Datenformate

• Anfragen mit HiveQL, Ausführung mit MapReduce– Einbindung von Skripten (z.B. Python) in Anfragen

– Metadaten u.a. für Optimierung (u.a. Join, Group By)

• Skalierbarkeit und Fehlertoleranz– durch HDFS + MapReduce

• Erweiterbarkeit – User-Defined Table-Generating Functions (UDTF)

– User-Defined Aggregate Functions (UDAF)

Page 29: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

29

Hive: Architektur

• Metastore– Tabellen, Spalten/Typ

– Location, Partitionen

– (De)Serialisierungsinformationen

• CLI / Web-GUI– Browse Metastore

– Absetzen von Abfragen

• Thrift– Cross-language Service → HiveQL

• Compiler + Optimizer– Anfrageoptimierung und Übersetzung

des HiveQL-Statements in DAG von MapReduce Jobs

• Executor– Ausführung der MR-Jobs entsprechend

DAG

Page 30: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

30

Hive: Datentypen & Datenzugriff

• Datentypen– einfache und zusammengesetzte Datentypen

– Listen, Maps

• Flexible (De)Serialisierung von Tabellen– unterschiedliche (nutzerdefinierte) Formate, z.B. XML, JSON, CSV

– unterschiedliche Speicherung, z.B. Datei, ProtocolBuffer (geplant)

• Vorteil– keine Konvertierung (und Replizierung!) der Originaldaten in relationale Form

sondern direkter Hive-Zugriff

• Nachteil– keine Vorverarbeitung (z.B. Indexierung) möglich

– immer full Table/File Scans nötig

Page 31: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

31

Hive: Tabellen, Partitionen und Dateien

• Tabelle kann auf exist. Daten im HDFS verweisen– Tabelle hat korrespond. HDFS-Verzeichnis : /wh/pvs

– Definition von Spalten, anhand denen Daten partitioniert werden/wh/pvs/ds=20090801/ctry=US

/wh/pvs/ds=20090801/ctry=CA

– Bucketing: Aufteilen der Dateien eines Verz. anhand Hash-Wert (Datenparallelität)/wh/pvs/ds=20090801/ctry=US/part-00000 …

/wh/pvs/ds=20090801/ctry=US/part-00020

Partitionen (evtl. mehrere Level)

HDFS Dateien (evtl. als Hash-Bucket)

Tabelle

Clicksds=2090801

ds=2090802

Page 32: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

32

Hive: Tabellen

• ErstellungCREATE EXTERNAL TABLE pvs

(userid int, pageid int, ds string, stry string)

PARTITIONED ON(ds string, ctry string)

STORED AS textfile

LOCATION ‘/path/to/existing/file’

• Laden von Daten status_updates

(user_id int, status string, ds string)

LOAD DATA LOCAL

INPATH ‘/logs/status_updates’

INTO TABLE status_updates

PARTITION (ds=’2009-03-20’)

Page 33: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

33

Hive-QL

• SQL-ähnliche Anfragesprache– Selektion, Projektion, Equi-Join, Union, Sub-Queries, Group By, Aggregatfunktionen

• Erweiterung von Queries um – MapReduce Skripte

– UDF, auch auf komplexen Objekten (Lists, Map)

FROM (

FROM pv_users

SELECT TRANSFORM(pv_users.userid, pv_users.date)

USING 'map_script'

AS(dt, uid)

CLUSTER BY(dt)

) map

INSERT INTO TABLE pv_users_reduced

SELECT TRANSFORM(map.dt, map.uid)

USING 'reduce_script'

AS (date, count);

Page 34: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

34

Hive-QL: Anfrageübersetzung

• Hive-QL Query wird in DAG (directed acyclic graph) übersetzt

• Knoten: Operatoren– TableScan

– Select, Extract

– Filter

– Join, MapJoin, Sorted Merge Map Join

– GroupBy, Limit

– Union, Collect

– FileSink, HashTableSink, ReduceSink

– UDTF

• Graph repräsentiert Datenfluss

• Mehrere (parallele) Map/Reduce Phasen möglich

Page 35: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

35

Hive-QL: Anfrageübersetzung (Beispiel)

• Beispiel

SELECT *

FROM status_updates

WHERE status

LIKE ‘michael jackson’

Page 36: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

36

Hive: Anfrageübersetzung (2)

SELECT COUNT(*)

FROM status_updates

WHERE ds=‘2009-08-01’

Updates/Nutzer

Alle Updates

Zwischenspeicherndes Map-Outputs

Page 37: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

37

Hive: Anfrageübersetzung und -optimierung

• Anfragepläne können sehr komplex werden

• Anfrageoptimierung– Verwerfen nicht benötigter Spalten

• Berücksichtigung von (Outer-)Join-und Selektionsattributen

– Frühes Anwenden von Selektionsprädikaten

– Verwerfen nicht benötigter Partitionen

Page 38: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

38

Hive: Join

userid age ...

111 25 ...

222 32 ...

key value

111 <R,1>

111 <R,2>

222 <R,2>

pageId userId ...

1 111 ...

2 111 ...

1 222 ...

key value

111 <S,25>

222 <S,32>

key value

111 <R,1>

111 <R,2>

111 <S,25>

key value

222 <R,2>

222 <S,32>

pageId age

1 25

2 25

pageId age

2 32

page_view

user map

shuf

fle +

sor

t

redu

ce

INSERT INTO TABLE pv_users

SELECT pv.pageid, u.age

FROM page_view pv

JOIN user u ON (pv.userid = u.userid)

pv_users

• Key = Join-Key, Value mit Flag (R oder S) zur Unterscheidung d. Tabellen

• Mehrweg-Join mit selben Join-Key → 1 MapReduce job

• Mehrweg-Join mit n Join-Keys → n MapReduce job

Page 39: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

39

Join: Performanzsteigerung durch MapJoin

• MapJoin (aka Broadcast Join)– kleine Tabelle als zusätzlicher Map-

Input

– kann vorher zu Hash-Tabelle umgewandelt werden (ggf. zusätzlich komprimiert)

– kein Reduce notwendig

– n Wege-Join möglich, wenn n-1 Tabellen für map verfügbar

• Dynamische Join-Entscheidung– Bestimmung großer/kleiner Tabelle

zur Laufzeit

– Anwendung von MapJoin falls kleine Tabelle(n) “klein genug”

pageId userId ...

1 111 ...

2 111 ...

1 222 ...

page_view

pageId userIds

1 [111,222]

2 [111]

HashTable

userid age ...

111 25 ...

222 32 ...

user

map

pageId age

1 25

2 25

2 32

pv_users

Page 40: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

40

Hive: Group ByINSERT INTO TABLE pageid_age_sum

SELECT pageid, age, count(*)

FROM pv_users

GROUP BY pageid, age

pageId age

1 25

1 25

pv_users

pageId age

1 25

2 32

key value

key value

map

shuf

fle +

sor

t

redu

ce

key value

key value

pageId age count

1 25 3

pageId age count

2 32 1

pageid_age_sum

• Key = Gruppierungsattribute

• Reduce = Aggregationsfunktion– “Voraggregation” durch Combiner in Map möglich (z.B (<1,25>,2))

Page 41: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

41

Nutzer-definierte Skripte

• Verwendung von Skripten in HiveQL-Anfragen mittels TRANSFORM-Operator– Daten(de-)serialisierung

– Austausch per stdin/stdout

firstletter.py

import sys

for line in sys.stdin:

line = line.strip()

id, title = line.split('\t')

firstl = title[:1]

print '\t'.join([id, title, firstl])

ADD FILE firstletter.py;

SELECT firstl, count(id) AS n

FROM (

SELECT

TRANSFORM (id, title)

USING 'python firstletter.py'

AS id, title, firstl

FROM item ) f

GROUP BY firstl;

id title

1 Body Snatcher

2 Armageddon

3 AI

firstl n

B 1

A 2

Page 42: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

42

Hive: Sortierung

• Sortierung durch zusätzlichen Reduce-Step

• SORT BY = Sortierung pro Reducer

• ORDER BY = globale Sortierung = SORT BY mit nur einem Reducer

id title

1 Body Snatcher

2 Armageddon

3 AI

4 Vegas Vacation

5 Vermin

6 The Visitors

key val

B 1

A 1

A 1

V 1

V 1

T 1

key val

A 1

A 1

B 1

key val

T 1

V 1

V 1

key val

A 2

B 1

key val

T 1

V 2

map

1

shuf

fle +

sor

t

redu

ce1

key val

key val

redu

ce2

key val

redu

ce2

SELECT firstl, count(id) AS n

FROM ... GROUP BY firstl

SORT BY | ORDER BY n DESC

SORT BY

ORDER BY

Page 43: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

43

Inhaltsverzeichnis

• SQL-Anfrageformulierung mit MapReduce

• Joins mit MapReduce

• Data Warehousing mit MapReduce– Hive

• RDBMS + MapReduce

– HadoopDB

Page 44: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

44

HadoopDB

• Ziel: Kombination Fehlertoleranz+Skalierbarkeit von Hadoop mit Performance paralleler DBS

• Idee von HadoopDB– Viele unabhängige Single-Node DBS (PostgreSQL/MySQL)

– Hadoop als Koordinator und Kommunikations-Layer

– Anfragen mit MapReduce parallelisiert

– Großteil der Arbeit wird in DBS-Nodes verrichtet

MapReduce Shared Nothing-RDBMS

Fehlertoleranz Neustart des Map-/Reduce-Tasks Query-Restart (z. T. Operator-Restart)

Skalierbarkeit + o

Performanz o (u.a. TableScan auf Daten, Join, synchrone MR-Phasen, materialisierte Zwischenergebnisse, ...)

+ (u.a. effiziente Zugriff mit Indizes, asynchrones Pipelining, kostenbasierte Anfrageoptimierung, ...)

Page 45: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

45

HadoopDB: Architektur

Page 46: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

46

HadoopDB: Architektur (2)

• DB-Connector– Hadoop InputFormat-Implementierung

– “Datenbanken für Hadoop wie HDFS-Blöcke”

• Katalog– Metadaten über DB (Location, Driver, …)

– Metadaten über gehaltene Daten (Partitionierung, Replikation, …)

• Data Loader– Partitionierung der Daten während des Ladens

• SQL to MapReduce to SQL Planner– Erweitert Hive

– Hive ist regelbasiert, ohne kostenbasierte Anfrageoptimierung

– Anpassung der Hive Ausführungspläne• Erstellen von Single-Node-Queries (Optimierung durch DBS)

• Kombination mit MapReduce

Page 47: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

47

Zusammenfassung

• MapReduce ist kein DBMS, kann aber zur “datenbank-artigen” Verarbeitung großer Datenmengen genutzt werden– SQL-Anfragen können automatisch in MapReduce-Programme transformiert werden

– MR kann flexibel auf die (semi-strukturierten) Originaldaten (d.h. Dateien) zugreifen

• RDBMS sind “pro Knoten” effizienter als MapReduce– ... aber MapReduce skaliert deutlich besser und ist fehlertoleranter

• Kombination der Stärken von RDBMS und MapReduce sinnvoll– ... und Gegenstand aktueller Forschung

Page 48: Kapitel 6: MapReduce und Datenbanken - uni-leipzig.dedbs.uni-leipzig.de/file/CDM_SS_2012_06_MapReduce_LargeData.pdf · 2 Inhaltsverzeichnis • SQL-Anfrageformulierung mit MapReduce

48

Quellen & Literatur

• [MRJoin] Blanas et al.: A Comparison of Join Algorithms for Log Processing in MapReduce. SIGMOD 2010

• [Hive] http://hadoop.apache.org/hive/

• [Hive1] http://www.slideshare.net/zshao/hive-data-warehousing-analytics-on-hadoop-presentation

• [Hive2] http://www.slideshare.net/ragho/hive-user-meeting-august-2009-facebook

• [Hive3] http://www.slideshare.net/jsichi/hive-evolution-apachecon-2010