Broadcasting message to all clients using Pika + sockjs

I'm new to realtime apps based on WebSockets, and stucked at one point. My app has following components:

  • Simple Python script which is fired when some generic event occurs. It receives data and sends it to the queue (RabbitMQ) using pika.
  • Tornado app (with sockjs-tornado) receiving message from queue (pika asynchronous client), processing its content, saving new app state to database and broadcastig data to clients (SockJS clients). Communication with clients is only in one direction - they just connect to server and receive data.
  • The problem is that I can't figure out how to pass data received from queue to all clients. I've done a pub/sub exchange, so when user connects to server, new connection is established with RabbitMQ for every user, but that's not what I want. Below is what I've got as far.

    common/pika_client.py :

    import logging
    
    import pika
    from pika.adapters.tornado_connection import TornadoConnection
    
    
    class PikaClient(object):
    
        def __init__(self, exchange, host='localhost', port=5672, vhost=''):
            # Default values
            self.connected = False
            self.connecting = False
            self.connection = None
            self.channel = None
            self.host = host
            self.port = port
            self.vhost = vhost
            self.exchange = exchange
    
            # preparing logger
            self.log = logging.getLogger(__name__)
            self.set_log_level()
    
        def set_log_level(self, log_level=logging.WARNING):
            self.log.setLevel(log_level)
    
        def connect(self):
            self.log.info("CONNECTING")
            if self.connecting:
                self.log.info('%s: Already connecting to RabbitMQ' %
                    self.__class__.__name__)
                return
    
            self.log.info('%s: Connecting to RabbitMQ on localhost:5672' %
                self.__class__.__name__)
            self.connecting = True
    
            param = pika.ConnectionParameters(
                host=self.host,
                port=self.port,
                virtual_host=self.vhost
            )
    
            self.connection = TornadoConnection(param,
                on_open_callback=self.on_connected)
            self.connection.add_on_close_callback(self.on_closed)
    
        def on_connected(self, connection):
            self.log.info('%s: Connected to RabbitMQ on localhost:5672' %
                self.__class__.__name__)
            self.connected = True
            self.connection = connection
            self.connection.channel(self.on_channel_open)
    
        def on_channel_open(self, channel):
            self.log.info('%s: Channel Open, Declaring Exchange %s' %
                (self.__class__.__name__, self.exchange))
            self.channel = channel
            self.channel.exchange_declare(
                exchange=self.exchange,
                type="fanout",
                callback=self.on_exchange_declared
            )
    
        def on_exchange_declared(self, frame):
            self.log.info('%s: Exchange Declared, Declaring Queue' %
                (self.__class__.__name__))
            self.channel.queue_declare(exclusive=True,
                callback=self.on_queue_declared)
    
        def on_queue_declared(self, frame):
            self.log.info('%s: Queue Declared, Binding Queue %s' %
                (self.__class__.__name__, frame.method.queue))
            self.queue_name = frame.method.queue
            self.channel.queue_bind(
                exchange=self.exchange,
                queue=frame.method.queue,
                callback=self.on_queue_bound
            )
    
        def on_queue_bound(self, frame):
            self.log.info('%s: Queue Bound. To receive messages implement 
                           on_queue_bound method' % self.__class__.__name__)
    
        def on_closed(self, connection):
            self.log.info('%s: Connection Closed' % self.__class__.__name__)
            self.connected = False
            self.connection = None
            self.connecting = False
            self.channel = None
            self.connection = self.connect()
    
        def add_message_handler(self, handler):
            self.message_handler = handler
    

    tracker.py

    from sockjs.tornado import SockJSConnection
    
    import settings
    from common.pika_client import PikaClient
    
    
    class QueueReceiver(PikaClient):
        """Receives messages from RabbitMQ """
    
        def on_queue_bound(self, frame):
            self.log.info('Consuming on queue %s' % self.queue_name)
            self.channel.basic_consume(consumer_callback=self.message_handler,
                queue=self.queue_name
            )
    
    
    class TrackerConnection(SockJSConnection):
    
        def on_open(self, info):
            self.queue = QueueReceiver('clt')
            self.queue.add_message_handler(self.on_queue_message)
            self.queue.set_log_level(settings.LOG_LEVEL)
            self.queue.connect()
    
        def on_queue_message(self, channel, method, header, body):
            self.send(body)
            self.queue.channel.basic_ack(delivery_tag=method.delivery_tag)
    

    It works, but like I mentioned, I'd like to have only one connection to queue, receive messages, do some stuff and broadcast results to clients using broadcast() method. Thanks in advance for any help.

    链接地址: http://www.djcxy.com/p/44770.html

    上一篇: Amfphp中的Web套接字用于所有客户端的数据同步

    下一篇: 向使用Pika + sockjs的所有客户广播消息