This can be quite useful for example where you might want to move common processing off to a remote machine, (like image processing or something that takes time & cpu) where you don't want to consume the CPU on the current machine.
In erlang the code example is quite straight-forward for this (this is a simplified variation taken from: http://www.erlang.org/doc/getting_started/conc_prog.html):
-module(async). -export([start/0, ping/1, pong/0]). ping(Pong_PID) -> Pong_PID ! {ping, self()}, receive pong -> io:format("Ping received pong~n", []) end. pong() -> receive finished -> io:format("Pong finished~n", []); {ping, Ping_PID} -> io:format("Pong received ping~n", []), Ping_PID ! pong, pong() end. start() -> Pong_PID = spawn(async, pong, []), spawn(async, ping, [Pong_PID]).
In this example in the 'start()' function we begin by spawning the receiver process 'pong'. This process is setup to receive messages and respond to them using the syntax:
Ping_PID ! pong
Thus we have our receiver/processor ready and prepared. The second step is that we spawn a second process that is responsible for sending the message, ie. the 'ping'. Now here is where it gets interesting.
The outbound message is sent with:
Pong_PID ! {ping, self()}
receive pong -> io:format("Ping received pong~n", []) end.
I saw this pattern recently echoed when I was looking at the Torquebox platform:
http://torquebox.org/documentation/2.0.3/messaging.html#synchronous-messaging
Here we have a very basic copy of the same concept:
queue = TorqueBox::Messaging::Queue.new('/queues/foo') Thread.new { queue.receive_and_publish(:timeout => 5000) { |message| message.upcase } } message = queue.publish_and_receive "ping", :timeout => 5000
Which is interesting on its own, but it does require you to write your own Torquebox Service to listen for these requests.
I figured we could do better then that though. In Erlang it most certainly supports the concept of processor groups and supervisors and workers for scaling up a set of workers and I wanted to mimic this somehow.
So ... Torquebox already has its own concept of workers that can receive messages on queues and scale up as your needs scale - these are called Message Processors:
http://torquebox.org/documentation/2.0.3/messaging.html#messaging-consumers
Now the beauty of these Message Processors is that they are fairly much just Ruby objects that have little knowledge of the fact they are at the end of a queue. What I like about them is that they allow me to think using the Actor model, however they don't support the synchronous methodology out of the box as described above so I set out to mimic it myself.
First of all, synchronous messages contain some extra data when sending messages to designate the request and response. In particular the call 'publish_and_receive' includes the message parameter synchronous = 'true'.
So lets start out by making sure our message processor setup is correctly. In your torquebox.yml you add the following:
queues: /queues/ping_pong: durable: true messaging: /queues/ping_pong: PingPong: selector: "synchronous = 'true'" concurrency: 5
Note the selector setting - this means it will only receive messages with this parameter setting in the JMS message. This is important and very relevant to ensure the receiver doesn't get all the response messages we're going to send back onto the queue later on. I've also set the concurrency to 5 just to show how you can control the number of workers/receivers.
Now for the receiver code (ping_pong.rb):
require 'rubygems' require 'torquebox-messaging' class PingPong < TorqueBox::Messaging::MessageProcessor def initialize(hash = {}) @logger = TorqueBox::Logger.new end def on_message(body)
# The synchronous methods always put the user message inside
# a :message parameter, hence the need to descend here. message = body[:message][:command]
# This should result in 'ping' being displayed in your logs
@logger.info(message) sync_reply({'command' => 'pong'}) end def sync_reply(reply) queue_name = message.jms_message.jms_destination.name jms_message_id = message.jms_message.jms_message_id queue = TorqueBox::Messaging::Queue.new(queue_name) queue.publish(reply, {
:correlation_id => jms_message_id,
:encoding => :json}
) end end
What's happening here, is that when the queue receives a message that matches the selector above - it will send the entire body of the message to the #on_message method. The body is already decoded using JSON.parse so we can just treat it like a hash. The main difference between this and normal Message Processor behaviour is that the method #publish_and_receive will always embed the users message inside the 'message' parameter, so we need to retrieve things from there instead.
At the end of processing (in this case we just log the received message) I've provided an extra method called 'sync_reply'. This method is responsible for creating the reply and adding the necessary 'correlation_id' setting to make sure the response is only caught by the original sender.
The important part here is that we must make sure:
- We send the message back on the same queue as it was sent out on. We do this by unpacking the JMS message and pulling out the 'jms_destination'.
- We make sure we add a :correlation_id field when sending the message back, this correlation_id must match the original sender's 'jms_message_id'. This is because the original sender is now waiting using a receive call with a selector designated to only match that correlation_id.
queue = TorqueBox::Messaging::Queue.new("/queues/ping_pong") response = queue.publish_and_receive( {:command => "ping"}, {:encoding => :json, :timeout => 60000 } ) puts response.inspect
And thus we have achieved synchronous messaging using Torquebox Message Processors.
No comments:
Post a Comment