Kaninchen & Schlangen: RabbitMQ & Python

Preview:

DESCRIPTION

RabbitMQ ist ein Message Broker, der das Advanced Message Queuing Protocol (AMQP) nutzt. Man kann RabbitMQ zum Entkoppeln der verschiedenen Teile einer Applikation nutzen und die Nachrichten asynchron verarbeiten. Das bringt mehr Flexibilität, Skalierbarkeit und Performance.In diesem Vortrag werden verschiedene Python-Bibliotheken zur Nutzung des AMQP und ein Distributed Task Queue, der mit RabbitMQ arbeitet, vorgestellt.

Citation preview

Kaninchen & SchlangenRabbitMQ & Python

Markus Zapke-Gründemann11. DZUG-Tagung zu Zope, Plone und Python

MarkusZapke-Gründemann• Softwareentwickler seit 2001

• Schwerpunkt: Web Application Development mit Python und PHP

• Django, symfony & Zend Framework

• Freier Softwareentwickler und Berater seit 2008

• www.keimlink.de

Übersicht

• Warum Queues?

• AMQP

• RabbitMQ

• Python Bibliotheken

• Carrot

• Celery

Warum Queues?

• Entkoppeln von Informationsproduzenten und -konsumenten

• Asynchrone Verarbeitung

• Load Balancing

• Skalierbarkeit

AMQP

• Advanced Message Queuing Protocol

• Offenes Protokoll

• Plattformunabhängig

• Port 5673/tcp

• http://www.amqp.org/

AMQP

• Offener Standard für Messaging Middleware

• Virtual Hosts

• Exchange (durable oder auto-deleted)

• Binding

• Queue (durable oder auto-deleted)

Producer - Consumer

Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html

Fanout Exchange

Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html

Direct Exchange

Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html

Topic Exchange

Quelle: http://www.redhat.com/docs/en-US/Red_Hat_Enterprise_MRG/1.0/html/Messaging_Tutorial/index.html

RabbitMQ

• AMQP Message Broker

• Erlang

• Open Source

• Mitglied in der AMQP Working Group

• XMPP, SMTP, STOMP und HTTP (mit Adaptern)

• http://www.rabbitmq.com/

Virtual Host Access Controlmit RabbitMQ

$ rabbitmqctl add_user username secret$ rabbitmqctl add_vhost message-vhost$ rabbitmqctl set_permissions -p message-vhost username ".*" ".*" ".*"

Konfiguration

LesenSchreiben

Python Bibliotheken

• amqplib - Python AMQP Client

• carrot - AMQP Messaging Framework

• pika - Python AMQP Client

• txAMQP - Python AMQP Library für Twisted

Carrot

• AMQP Messaging Framework

• High-Level Interface

• Benutzt amqplib

• Serialisierung (Pickle, JSON, YAML)

• Autor: Ask Solem

• http://github.com/ask/carrot/

exchange = 'messaging'queue = 'mbox'routing_key = 'message'

conf.py

connection.pyfrom carrot.connection import BrokerConnection

conn = BrokerConnection(hostname='localhost', userid='username', password='secret', virtual_host='message-vhost')

import datetime

from carrot.messaging import Publisherfrom carrot.utils import gen_unique_id

from conf import exchange, queue, routing_keyfrom connection import conn

publisher = Publisher(connection=conn, exchange=exchange, routing_key=routing_key, queue=queue, serializer='pickle')data = {'message_id': gen_unique_id(), 'timestamp': datetime.datetime.now(), 'message': 'Lorem ipsum dolor sit amet.'}publisher.send(data)

publisher.py

consumer.pyfrom carrot.messaging import Consumer

from conf import exchange, queue, routing_keyfrom connection import conn

class MessageConsumer(Consumer): def receive(self, message_data, message): data = (message.delivery_tag, message_data['message'], message_data['message_id'], message_data['timestamp'].isoformat()) print 'Message %d "%s" with id %s sent at %s.' % data message.ack()

if __name__ == '__main__': consumer = MessageConsumer(connection=conn, queue=queue, exchange=exchange, routing_key=routing_key) consumer.wait()

CeleryDistributed Task Queue• Backends: RabbitMQ, STOMP, Redis, Ghetto

Queue

• Clustering mit RabbitMQ

• Webhooks

• Django-Integration (optional)

• Autor: Ask Solem

• http://github.com/ask/celery/

CeleryDistributed Task Queue

• Serialisierung (Pickle, JSON, YAML)

• Parallele Ausführung

• Zeitgesteuerte Ausführung

• SQLAlchemy, carrot, anyjson

Python Task# Task als Klassefrom celery.task import Taskfrom myproject.models import User

class CreateUserTask(Task): def run(self, username, password): User.create(username, password)

>>> from tasks import CreateUserTask>>> CreateUserTask().delay('john', 'secret')

# Task als Funktion mit Decoratorfrom celery.decorators import taskfrom myproject.models import User

@task()def create_user(username, password): User.create(username, password)

>>> from tasks import create_user>>> create_user.delay('john', 'secret')

Python Task@task() # Benutzt pickle, um das Objekt zu serialisieren.def check_means(user): return user.has_means()

>>> from tasks import check_means>>> result = check_means.delay(user)>>> result.ready() # Gibt True zurück wenn der Task beendet ist.False>>> result.result # Task ist noch nicht beendet, kein Ergebnis verfügbar.None>>> result.get() # Warten bis der Task fertig ist und Ergebnis zurückgeben.93.27>>> result.result # Jetzt ist ein Ergebnis da.93.27>>> result.successful() # War der Task erfolgreich?True

Python TaskKonfiguration

$ celeryd --loglevel=INFO

# celeryconfig.pyBROKER_HOST = "localhost"BROKER_PORT = 5672BROKER_USER = "myuser"BROKER_PASSWORD = "mypassword"BROKER_VHOST = "myvhost"

CELERY_RESULT_BACKEND = "database"CELERY_RESULT_DBURI = "mysql://user:password@host/dbname"

CELERY_IMPORTS = ("tasks", )

Zeitgesteuerter Task# Periodic taskfrom celery.decorators import periodic_taskfrom datetime import timedelta

@periodic_task(run_every=timedelta(seconds=30))def every_30_seconds(): print("Running periodic task!")

# crontabfrom celery.task.schedules import crontabfrom celery.decorators import periodic_task

@periodic_task(run_every=crontab(hour=7, minute=30, day_of_week=1))def every_monday_morning(): print("Execute every Monday at 7:30AM.")

$ celerybeat$ celeryd -B

Fotos der Titelfolie

• Kaninchen: Gidzy http://www.flickr.com/photos/gidzy/3614942864/

• Grüne Python: Frank Wouters - http://www.flickr.com/photos/frank-wouters/540417590/

Lizenz

Dieses Werk ist unter einem Creative Commons Namensnennung-Weitergabe unter gleichen

Bedingungen 3.0 Unported Lizenzvertrag lizenziert. Um die Lizenz anzusehen, gehen Sie bitte zu

http://creativecommons.org/licenses/by-sa/3.0/ oder schicken Sie einen Brief an Creative Commons, 171 Second Street, Suite 300, San Francisco, California

94105, USA.

Recommended