Source code for gemstone.event.transport.redis_transport

import urllib.parse

import simplejson as json

try:
    import redis
except ImportError:
    redis = None

from gemstone.event.transport.base import BaseEventTransport


[docs]class RedisEventTransport(BaseEventTransport): def __init__(self, redis_url): """ Event transport that uses a Redis server as message transport by using the PUBSUB mechanism. :param redis_url: A string that specifies the network location of the redis server: - ``redis://[:password@]hostaddr:port/dbnumber`` (plaintext) - ``rediss://[:password@]hostaddr:port/dbnumber`` (over TLS) - ``unix://[:password@]/path/to/socket?db=dbnumber`` (Unix socket) """ if not redis: raise RuntimeError("RedisEventTransport requires 'redis' to run") super(RedisEventTransport, self).__init__() conn_details = urllib.parse.urlparse(redis_url) if conn_details.scheme not in ("redis", "rediss", "unix"): raise ValueError( "Invalid redis url: scheme '{}' not allowed".format(conn_details.scheme)) self.connection_pool = redis.ConnectionPool.from_url(redis_url) self.handlers = {} def get_pubsub(self): conn = self.get_redis_connection() pubsub = conn.pubsub(ignore_subscribe_messages=True) return pubsub def get_redis_connection(self): return redis.StrictRedis(connection_pool=self.connection_pool) def start_accepting_events(self): pubsub = self.get_pubsub() pubsub.subscribe(*tuple(self.handlers.keys())) for message in pubsub.listen(): event_name = message["channel"].decode() event_data = message["data"].decode() self.on_event_received(event_name, json.loads(event_data)) def register_event_handler(self, handler_func, handled_event_name): self.handlers[handled_event_name] = handler_func def emit_event(self, event_name, event_body): conn = self.get_redis_connection() conn.publish(event_name, json.dumps(event_body)) def on_event_received(self, event_name, event_body): handler = self.handlers.get(event_name, None) if not handler: return self.run_on_main_thread(handler, (event_body,), {})