Source code for gemstone.event.transport.rabbitmq

import json

try:
    import pika
except ImportError:
    pika = None

from gemstone.event.transport.base import BaseEventTransport


[docs]class RabbitMqEventTransport(BaseEventTransport): EXCHANGE_PREFIX_BROADCAST = "gemstone.broadcast." def __init__(self, host="127.0.0.1", port=5672, username="", password="", **connection_options): """ Event transport via RabbitMQ server. :param host: ipv4 or hostname :param port: the port where the server listens :param username: username used for authentication :param password: password used for authentication :param connection_options: extra arguments that will be used in :py:class:`pika.BlockingConnection` initialization. """ if not pika: raise RuntimeError("RabbitMqEventTransport requires 'pika' to run") super(RabbitMqEventTransport, self).__init__() self._handlers = {} self.connection = pika.BlockingConnection( pika.ConnectionParameters( host=host, port=port, credentials=pika.PlainCredentials(username=username, password=password), **connection_options ) ) self.channel = self.connection.channel() def register_event_handler(self, handler_func, handled_event_name): self._handlers[handled_event_name] = handler_func def start_accepting_events(self): for event_name, event_handler in self._handlers.items(): # prepare broadcast queues current_exchange_name = self.EXCHANGE_PREFIX_BROADCAST + event_name self.channel.exchange_declare( exchange=current_exchange_name, type="fanout" ) result = self.channel.queue_declare(exclusive=True) queue_name = result.method.queue self.channel.queue_bind(exchange=current_exchange_name, queue=queue_name) self.channel.basic_consume(self._callback, queue=queue_name, no_ack=True) self.channel.start_consuming() def _callback(self, channel, method, properties, body): if not method.exchange.startswith(self.EXCHANGE_PREFIX_BROADCAST): return event_name = method.exchange[len(self.EXCHANGE_PREFIX_BROADCAST):] self.on_event_received(event_name, body) def on_event_received(self, event_name, event_body): handler = self._handlers.get(event_name) if not handler: return if isinstance(event_body, bytes): event_body = event_body.decode() event_body = json.loads(event_body) self.run_on_main_thread(handler, [event_body], {}) def emit_event(self, event_name, event_body): exchange_name = self.EXCHANGE_PREFIX_BROADCAST + event_name self.channel.basic_publish( exchange=exchange_name, routing_key='', body=json.dumps(event_body) ) def __del__(self): if hasattr(self, "channel"): self.channel.close()