Monday, 11 June 2012

Torquebox: Making Message Processors behave Synchronously

In Erlang using processes and in-built functions you are able to send message to processes anywhere in your cluster, and there is a pattern to extend this to wait on a returned message as well. This allows you to call out to a separate utility process to do some work and wait for its response.

This can be quite useful for example where you might want to move common processing off to a remote machine, (like image processing or something that takes time & cpu) where you don't want to consume the CPU on the current machine.

In erlang the code example is quite straight-forward for this (this is a simplified variation taken from: http://www.erlang.org/doc/getting_started/conc_prog.html):

-module(async).

-export([start/0, ping/1, pong/0]).

ping(Pong_PID) ->
    Pong_PID ! {ping, self()},
    receive
        pong ->
            io:format("Ping received pong~n", [])
    end.

pong() ->
    receive
        finished ->
            io:format("Pong finished~n", []);
        {ping, Ping_PID} ->
            io:format("Pong received ping~n", []),
            Ping_PID ! pong,
            pong()
    end.

start() ->
    Pong_PID = spawn(async, pong, []),
    spawn(async, ping, [Pong_PID]).

In this example in the 'start()' function we begin by spawning the receiver process 'pong'. This process is setup to receive messages and respond to them using the syntax:

Ping_PID ! pong

Thus we have our receiver/processor ready and prepared. The second step is that we spawn a second process that is responsible for sending the message, ie. the 'ping'. Now here is where it gets interesting.

The outbound message is sent with:

Pong_PID ! {ping, self()}

This means the outbound message is sending the symbol 'ping', and a reference to its own process. It then blocks indefinately, waiting for a response of 'pong':

receive
    pong ->
        io:format("Ping received pong~n", [])
end.

Quite simple really. The entire point here is that both of these processes can live on completely separate machines, the receiver or processor able to do work on its own, while the requester can simple wait for a response.

I saw this pattern recently echoed when I was looking at the Torquebox platform:

http://torquebox.org/documentation/2.0.3/messaging.html#synchronous-messaging

Here we have a very basic copy of the same concept:

queue = TorqueBox::Messaging::Queue.new('/queues/foo')
Thread.new {
  queue.receive_and_publish(:timeout => 5000) { |message| message.upcase }
}
message = queue.publish_and_receive "ping", :timeout => 5000

Which is interesting on its own, but it does require you to write your own Torquebox Service to listen for these requests.

I figured we could do better then that though. In Erlang it most certainly supports the concept of processor groups and supervisors and workers for scaling up a set of workers and I wanted to mimic this somehow.

So ... Torquebox already has its own concept of workers that can receive messages on queues and scale up as your needs scale - these are called Message Processors:

http://torquebox.org/documentation/2.0.3/messaging.html#messaging-consumers

Now the beauty of these Message Processors is that they are fairly much just Ruby objects that have little knowledge of the fact they are at the end of a queue. What I like about them is that they allow me to think using the Actor model, however they don't support the synchronous methodology out of the box as described above so I set out to mimic it myself.

First of all, synchronous messages contain some extra data when sending messages to designate the request and response. In particular the call 'publish_and_receive' includes the message parameter synchronous = 'true'.

So lets start out by making sure our message processor setup is correctly. In your torquebox.yml you add the following:

queues:
  /queues/ping_pong:
    durable: true

messaging:
  /queues/ping_pong:
    PingPong:
      selector: "synchronous = 'true'"
      concurrency: 5

Note the selector setting - this means it will only receive messages with this parameter setting in the JMS message. This is important and very relevant to ensure the receiver doesn't get all the response messages we're going to send back onto the queue later on. I've also set the concurrency to 5 just to show how you can control the number of workers/receivers.

Now for the receiver code (ping_pong.rb):

require 'rubygems'
require 'torquebox-messaging'

class PingPong < TorqueBox::Messaging::MessageProcessor
  def initialize(hash = {})
    @logger = TorqueBox::Logger.new
  end

  def on_message(body)
    # The synchronous methods always put the user message inside
    # a :message parameter, hence the need to descend here.
    message = body[:message][:command]

    # This should result in 'ping' being displayed in your logs
    @logger.info(message)

    sync_reply({'command' => 'pong'})
  end

  def sync_reply(reply)
    queue_name = message.jms_message.jms_destination.name
    jms_message_id = message.jms_message.jms_message_id

    queue = TorqueBox::Messaging::Queue.new(queue_name)
    queue.publish(reply, {
      :correlation_id => jms_message_id,
      :encoding => :json}
    )
  end
end

What's happening here, is that when the queue receives a message that matches the selector above - it will send the entire body of the message to the #on_message method. The body is already decoded using JSON.parse so we can just treat it like a hash. The main difference between this and normal Message Processor behaviour is that the method #publish_and_receive will always embed the users message inside the 'message' parameter, so we need to retrieve things from there instead.

At the end of processing (in this case we just log the received message) I've provided an extra method called 'sync_reply'. This method is responsible for creating the reply and adding the necessary 'correlation_id' setting to make sure the response is only caught by the original sender.

The important part here is that we must make sure:
  • We send the message back on the same queue as it was sent out on. We do this by unpacking the JMS message and pulling out the 'jms_destination'.
  • We make sure we add a :correlation_id field when sending the message back, this correlation_id must match the original sender's 'jms_message_id'. This is because the original sender is now waiting using a receive call with a selector designated to only match that correlation_id.
So now that you have a receiver built, if you want to send a message it's simple, you can use the built-in publish_and_receive method from somewhere else in your application:

queue = TorqueBox::Messaging::Queue.new("/queues/ping_pong")
response = queue.publish_and_receive(
  {:command => "ping"},
  {:encoding => :json, :timeout => 60000 }
)
puts response.inspect

And thus we have achieved synchronous messaging using Torquebox Message Processors.

No comments:

Post a Comment