RPC using EventMachine & RabbitMQ

I've been starting to play around with RabbitMQ RPC sample code provided in AMQP gem doc, trying to write very simple code performing synchronous remote calls:

require "amqp"

module RPC
  class Base
    include EM::Deferrable

    def rabbit(rabbit_callback)
      rabbit_loop = Proc.new {
        AMQP.connect do |connection|
          AMQP::Channel.new(connection) do |channel|
            channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
              self.callback(&rabbit_callback)
              self.succeed(connection, channel, requests_queue)
            end # requests_queue
          end # AMQP.channel
        end # AMQP.connect

        Signal.trap("INT")  { connection.close { EM.stop } }
        Signal.trap("TERM") { connection.close { EM.stop } }
      }

      if !EM.reactor_running?
        EM.run do
          rabbit_loop.call
        end
      else
        rabbit_loop.call
      end
    end
  end

  class Server < Base

    def run
      server_loop = Proc.new do |connection, channel, requests_queue|
        consumer = AMQP::Consumer.new(channel, requests_queue).consume
        consumer.on_delivery do |metadata, payload|
          puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
          channel.default_exchange.publish(Time.now.to_s,
                                           :routing_key    => metadata.reply_to,
                                           :correlation_id => metadata.message_id,
                                           :mandatory      => true)
          metadata.ack
        end
      end
      rabbit(server_loop)
    end

  end

  class Client < Base

    def sync_push(request)
      result = nil
      sync_request = Proc.new do |connection, channel, requests_queue|
        message_id = Kernel.rand(10101010).to_s

        response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
        response_queue.subscribe do |headers, payload|
          if headers.correlation_id == message_id
            result = payload
            connection.close { EM.stop }
          end
        end

        EM.add_timer(0.1) do 
          puts "[request] Sending a request...#{request} with id #{message_id}"
          channel.default_exchange.publish(request,
                                           :routing_key => requests_queue.name,
                                           :reply_to    => response_queue.name,
                                           :message_id  => message_id)
        end
      end

      rabbit(sync_request)
      result
    end
  end
end

The idea is pretty simple: I want to have a message queue always ready (this is handled by rabbit method). Whenever the client wants to send a request, it starts by creating a temporary queue for the response along with a message id; it then publishes the request to the main message queue and waits for a response with the same message id in the temporary queue in order to know when the answer for this specific request is ready. I guess that the message_id is somehow redundant with the temporary queue (as the queue should also be unique).

I now run dummy script using this client/server code

# server session
>> server = RPC::Server.new
=> #<RPC::Server:0x007faaa23bb5b0>
>> server.run
Updating client properties
[requests] Got a request 3315740. Sending a reply to amq.gen-QCv8nP2dI5Qd6bg2Q1Xhk0...

and

# client session
>> client = RPC::Client.new
=> #<RPC::Client:0x007ffb6be6aed8>
>> client.sync_push "test 1"
Updating client properties
[request] Sending a request...test 1 with id 3315740
=> "2012-11-02 21:58:45 +0100"
>> client.sync_push "test 2"
AMQ::Client::ConnectionClosedError: Trying to send frame through a closed connection. Frame is #<AMQ::Protocol::MethodFrame:0x007ffb6b9c83d0 @payload="x002x00nx00x00x00fx00x00x00x00", @channel=1>

There are two points I really don't understand:

  • related to EventMachine: in the Client code, why do I have to call EM.add_timer if I want my message to actually be published? And why using EM.next_tick doesn't work? My understanding is that "everything" is supposed to be "ready" when publish is called here.
  • related to AMQP: why does my client crash due a closed connection for the second request? A brand new EM/AMQP loop is supposed to be created each time a new request is pushed.
  • There sadly is very few code available online dealing with EM/AMQP so any help will be deeply appreciated! Any comment regarding the efficiency of this would also be much appreciated.


    Digging documentation, I finally found that I actually needed the once_declared callback to be sure that the queue is ready when the client start using it.

    Regarding the connection problem, it seems that, somehow, using EM::Deferrable causes issues so the (quite unsatisfactory) solution simply consists in not including EM::Deferrable .

    require "amqp"
    
    module RPC
    
      module Base
    
        def rabbit(rabbit_callback)
          rabbit_loop = Proc.new {
            AMQP.start do |connection|
              AMQP::Channel.new(connection) do |channel|
                channel.queue("rpc.queue", :exclusive => false, :durable => true) do |requests_queue|
                  requests_queue.once_declared do
                    rabbit_callback.call(connection, channel, requests_queue)
                  end
                end
              end
            end
    
            Signal.trap("INT")  { AMQP.stop { EM.stop } }
            Signal.trap("TERM") { AMQP.stop { EM.stop } }
          }
    
          if !EM.reactor_running?
            @do_not_stop_reactor = false
            EM.run do
              rabbit_loop.call
            end
          else
            @do_not_stop_reactor = true
            rabbit_loop.call
          end
        end
      end
    
      class Server
        include Base
    
        def run
          server_loop = Proc.new do |connection, channel, requests_queue|
            consumer = AMQP::Consumer.new(channel, requests_queue).consume
            consumer.on_delivery do |metadata, payload|
              puts "[requests] Got a request #{metadata.message_id}. Sending a reply to #{metadata.reply_to}..."
              channel.default_exchange.publish(Time.now.to_s,
                                               :routing_key    => metadata.reply_to,
                                               :correlation_id => metadata.message_id,
                                               :mandatory      => true)
              metadata.ack
            end
          end
          rabbit(server_loop)
        end
    
      end
    
      class Client
        include Base
    
        def sync_push(request)
          result = nil
          sync_request = Proc.new do |connection, channel, requests_queue|
            message_id = Kernel.rand(10101010).to_s
    
            response_queue = channel.queue("", :exclusive => true, :auto_delete => true)
            response_queue.subscribe do |headers, payload|
              if headers.correlation_id == message_id
                result = payload
                AMQP.stop { EM.stop unless @do_not_stop_reactor }
              end
            end
    
            response_queue.once_declared do
              puts "[request] Sending a request...#{request} with id #{message_id}"
              channel.default_exchange.publish(request,
                                               :routing_key => requests_queue.name,
                                               :reply_to    => response_queue.name,
                                               :message_id  => message_id)
            end
          end
    
          rabbit(sync_request)
          result
        end
      end
    end
    
    链接地址: http://www.djcxy.com/p/61220.html

    上一篇: RabbitMQ RPC教程查询

    下一篇: RPC使用EventMachine&RabbitMQ