Sending and Receiving messages using Spring’s AMQP

Introduction

The messaging standards that an application uses for communication varies and Advanced Message Queuing Protocol (ADQP) aims in providing standards with respect to messaging communication like the format of the message, various contracts to be implied, etc. Note that ADQP is just a protocol and it is not tied with any specific technology or language (unlike JMS which is tightly coupled with Java). Some of the popular implementations available are Qpid (from Apache), Rabbit MQ (from Vmware) etc.

also read:

Spring AMQP framework provides a complete end-to-end solution for integrating messaging solution providers into the application. Note that this framework itself doesn’t provide the messaging solutions such as sending and receiving messages, but instead it simplifies the job of integrating applications with existing messaging solution providers. The framework hierarchy streams into two divisions, one for the Java platform and the other for the .NET platform. The article covers only the integration support on the Java platforms. We will discuss about the various capabilities available in the form of classes/interfaces before moving on into demonstration.

Spring’s AMQP API

We will look into the core interfaces and classes of Spring’s AMQP API in this section.

Message

The Message class in Spring AMQP conforms to AMQP standards and as the name implies, it represents the message that an application can send or receive. However, an application never deals with this class directly, there are bunch of helper classes available for dealing with the data encapsulated in the message. Closely associated with the Message class is the MessageProperties class which defines the various properties of the message. One such property is the message type which can be plain, in serialized form, bytes or in JSON (Java script Object Notation). There are also other useful message properties such as message id, priority, headers, delivery mode, content length, reply to field etc.

Exchange/Queue/Binding

Exchange in AMQP defines the target for a sender application for sending messages. Queue refers to the target for the receiving applications for fetching the messages. Binding defines the linking between an Exchanger and a Queue. As mentioned, Exchange defines the destination where the messages will be placed, three different types of Exchanges are identified in AMQP. They are:

  • Direct
  • Topic
  • Fanout

In Direct Exchange, the queue that links with the Exchange using the Binding can have the link established only through the name of the queue that corresponds to the Exchange’s routing key. For example, if an Exchange is created with the routing key ‘test’, then the Queue that links with this Exchange must have the queue name configured as ‘test’. If the Queue fails to give to specify a name that is equivalent to the Exchange routing key, then the message won’t to be delivered to the Queue.
In Topic Exchange, the routing key may include patterns such as ‘*’ (match one character) and ‘#’ (match any character). For example, if the Exchange is created with the routing key ‘test*’, then the queue with name ‘test1′, ‘test2′ that links with the Binding can receive the message. In Fanout Exchange mode, whatever the routing key is specified in the Exchange, which means that whatever Queues that are bound with the exchange will have the messages delivered.

ConnectionFactory/Connection

ConnectionFactory represents the interface for creating Connection objects that can be used by the applications before sending or receiving messages. One concrete implementation of the ConnectionFactory interface namely SingleConnectionFactory is available that can be used for creating connection factory instance. Calling createConnection() on the ConnectionFactory creates and opens up a new connection to the messaging broker. The messaging broker properties such as the host name, port, username and password needs to be configured at the ConnectionFactory level before acquiring a Connection.

Setting up the environment

In this section, we will see how to make the environment ready for building a sample application that will be illustrated in the next section. The following packages need to be available for building and running the sample:

  • Spring AMQP and its dependencies – http://www.springsource.org/spring-amqp
  • Rabbit MQ Client libraries – http://www.rabbitmq.com/java-client.html
  • Rabbit MQ Server – http://www.rabbitmq.com/server.html
  • Erlang OTP – http://www.erlang.org/download/otp_win32_R13B03.exe

Spring AMQP and its dependencies

Unzip the downloaded distribution and place it in a folder, example – C:\Spring\AMQP. The distribution will contain the following the jars that has to be set to the class path for any AMQP application

  • spring-amqp-1.0.0.M1.jar
  • spring-erlang-1.0.0.M1.jar
  • spring-rabbit-1.0.0.M1.jar
  • spring-rabbit-admin-1.0.0.M1.jar

Note that the above libraries will have dependency with the core Spring libraries. We have used Spring 3 libraries in this example. So make sure that the following libraries are also available in the class path.

  • spring-aop-3.0.3.RELEASE.jar
  • spring-beans-3.0.3.RELEASE.jar
  • spring-context-3.0.3.RELEASE.jar
  • spring-context-support-3.0.3.RELEASE.jar
  • spring-core-3.0.3.RELEASE.jar
  • spring-expression-3.0.3.RELEASE.jar

Rabbit MQ Client libraries

Rabbit MQ client libraries will be required for client applications that need to send and receive messages. So make sure that the libraries are present in the classpath of the application.

  • commons-cli-1.1.jar
  • commons-io-1.2.jar
  • rabbitmq-client.jar

Erlang OTP

Rabbit MQ Server is dependant on Erand OTP. After downloading Erlang OTP, run the installation with the setup file. Once the installation is complete, make sure to create a environment variable ERLANG_HOME pointing to the installation’s home directory, for example “C:\Languages\Erl\Erl5.7.4″.
Rabbit MQ Server
Rabbit MQ Server provides the messaging solutions for sending and receiving messages. Installing the server is as simple as unzipping the distribution to a desired location. With the assumption that the environment variable is already set, locate the file ‘rabbitmq-server.bat’ found in RABBIT_MQ_SERVER_HOME/sbin directory and run it. It will produce an output similar to the following,

Activating RabbitMQ plugins ...
0 plugins activated:

+---+   +---+
|   |   |   |
|   |   |   |
|   |   |   |
|   +---+   +-------+
|                   |
| RabbitMQ  +---+   |
|           |   |   |
|   v2.0.0  +---+   |
|                   |
+-------------------+
AMQP 0-9-1 / 0-9 / 0-8
Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.
Licensed under the MPL.  See http://www.rabbitmq.com/

node           : rabbit@XYZ
app descriptor : f:/Messaging/RabbitMQ/Server/rabbitmq-server-windows-
2.0.0/rabbitmq_server-2.0.0/sbin/../ebin/rabbit.app
home dir       : C:\Documents and Settings\ XYZ
cookie hash    : R9IMCzDGrduFqrqU3VYieA==
log            : C:/Documents and Settings/ XYZ /Application Data/RabbitMQ/log/rabbit@XYZ.log
sasl log       : C:/Documents and Settings/XYZ/Application Data/RabbitMQ/log/rabbit@XYZ-sasl.log
database dir   : c:/Documents and Settings/XYZ/Application Data/RabbitMQ/db/rabbit@XYZ-mnesia
erlang version : 5.7.4

starting file handle cache server                                     ...done
starting worker pool                                                  ...done
starting database                                                     ...done
starting codec correctness check                                      ...done
-- external infrastructure ready
starting exchange type registry                                       ...done
starting exchange type topic                                          ...done
starting exchange type headers                                        ...done
starting exchange type fanout                                         ...done
starting exchange type direct                                         ...done
starting statistics event manager                                     ...done
starting internal event notification system                           ...done
starting logging server                                               ...done
-- kernel ready
starting alarm handler                                                ...done
starting node monitor                                                 ...done
starting cluster delegate                                             ...done
starting guid generator                                               ...done
starting memory monitor                                               ...done
-- core initialized
starting empty DB check                                               ...done
starting exchange recovery                                            ...done
starting queue supervisor and queue recovery                          ...done
-- message delivery logic ready
starting error log relay                                              ...done
starting networking                                                   ...done
-- network listeners available

broker running

Demonstration

In this example, we will see a simple demonstration on sending and receiving messages using Spring’s AMQP framework. The messaging broker that we will be using is Rabbit MQ Server. The demonstration is as simple as the sender application sends a message to the queue and the receiver applications picks up the message and displays it in the console.

Configuration

The Configuration class listed below does the job of returning a suitable Template object that will be used by the calling applications for sending and receiving message. The minimal requirements for a AMQP Template object is set by connection factory instance so that it can open up the connection to the specified messaging broker, specify the exchange mode (direct, topic, fanout) and to specify the queue name. Given these details, the template class is capable of creating connection, exchange and queue objects. Once can see that the Connection, Exchange, Queue classes are completely hidden from the application developer and the focus can be made only on the business logic.
TestConfiguration.java

package net.javabeat.articles.spring.amqp.test;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.AbstractRabbitConfiguration;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.SingleConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TestConfiguration extends AbstractRabbitConfiguration {

	private String routingKey = "test.queue";
	private String testQueueName = "test.queue";

	public ConnectionFactory getConnectionFactory() {
		SingleConnectionFactory connectionFactory = new SingleConnectionFactory("localhost");
		connectionFactory.setUsername("guest");
		connectionFactory.setPassword("guest");
		return connectionFactory;
	}

	@Override
	public RabbitTemplate rabbitTemplate() {
		RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory());
		rabbitTemplate.setRoutingKey(routingKey);
		rabbitTemplate.setQueue(testQueueName);
		return rabbitTemplate;
	}

}

The above configuration extends from AbstractRabbitConfiguration class and it is annotated with @Configuration annotation. Later on we will see the usage of this annotation. The template method rabbitTemplate() is overridden to return an instance of RabbitTemplate class. Before returning the instance, the mandatory properties such the Connection Factory instance, routing key and the queue name are specified. Note that in this example, the routing key and the queue name are the same which means the Binding will be established between Direct Exchange with the specified Queue. We have used the SingleConnectionFactory class for creating a ConnectionFactory instance for Rabbit MQ Server. The defaut username, password and port are ‘guest’, ‘guest’ and ‘5672’ respectively. If during the installation of Rabbit MQ Server, if any of these values got change, then appropriate values have to be specified to the ConnectionFactory instance.

Message Sender

The message Sender applications prepare the message to be sent, and it creates an instance of the Configuration class that we have created previously. Because we have used the annotations @Configuration and @Bean in the TestConfiguration class, it is now possible to instantiate the bean RabbitTemplate using context.getBean(RabbitTemplate.class).
MessageSender.java

package net.javabeat.articles.spring.amqp.test;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class MessageSender {

	public void send(){
		String messageToBeSent = "Test message";

		ApplicationContext context = new AnnotationConfigApplicationContext(TestConfiguration.class);
		RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
		rabbitTemplate.convertAndSend(messageToBeSent);

		System.out.println("Message '" + messageToBeSent + "' sent.");
	}

	public static void main(String[] args) {
		MessageSender sender = new MessageSender();
		sender.send();
	}

}

The above class makes use of the method convertAndSend() defined on the RabbitTemplate class for sending the message. In the later sections, we will see the variations of this method for sending messages.

Message Receiver

MessageReceiver.java

package net.javabeat.articles.spring.amqp.test;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class MessageReceiver {

	public void receive(){

		ApplicationContext applicationContext = new AnnotationConfigApplicationContext(TestConfiguration.class);
		RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);

		Object message = rabbitTemplate.receiveAndConvert();
		if (message instanceof String){

		String strMessage = (String)message;
			System.out.println("String message received is '" + strMessage + "'");
		}else{
			System.out.println("Unknown message received '" +  message + "'");
		}
	}

	public static void main(String[] args) {
		MessageReceiver receiver = new MessageReceiver();
		receiver.receive();
	}

}

The above class almost does the same job as that of the Message Sender except that it calls the method receiveAndConvert() defined on the RabbitTemplate class for receiving the message. Note that for running the application, the bat file ‘rabbitmq-server.bat’ has to be started. Once it is successfully started, open a command prompt and the run the class ‘MessageSender’ which will send the message. Now this message is stored in the Exchange of Rabbit MQ Server. Now open a different console for running the class Message Receiver which binds the queue to the Exchange for receiving the message which is then displayed in the console.

Miscellaneous

In this section, we will look into the overloaded functionalities present in Spring AMQP. We will look into the various ways of sending and receiving messages. We will also have a look at writing custom Message Converters and how to hook them.

Sending and Receiving Messages

A message producer application can send a message to the exchange in plenty of ways and the one that we have already seen is to call the method convertAndSend() defined on the RabbitTemplate object. For example, if we wish to post process the message once it is created, we can define a custom message post processor for achieving that. Have a look at the following code snippet,
MessagePostProcesserTest.java

package net.javabeat.articles.spring.amqp.misc;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class MessagePostProcesserTest {

	public static void main(String[] args) {

		RabbitTemplate rabbitTempate = null;
		// Inititate the above object

		Object messageObject = null;
		// Inititate the above object

		CustomMessagePostProcessor postProcessor = new CustomMessagePostProcessor();
		rabbitTempate.convertAndSend(messageObject, postProcessor);
	}
}

class CustomMessagePostProcessor implements MessagePostProcessor{

	@Override
	public Message postProcessMessage(Message message) throws AmqpException {
		message.getMessageProperties().setAppId("TestApplication-1");
		return message;
	}

}

In the above class, we have ensured that the application id is always set on the message object before sending it by attaching a custom message post processor object. This scenario is usually used when the message comes from a different source and the current application has to modify the message properties.
It is also possible to specify the routing key that represents the exchange destination to which the message has to be sent. By setting this property, it will override the routing key that is already set on the Rabbit Template object. Consider the following code which just does that.
SendMessageTest1.java

package net.javabeat.articles.spring.amqp.misc;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class SendMessageTest1 {

	public static void main(String[] args) {

		RabbitTemplate rabbitTemplate = null;
		// Populate the rabbitTemplate object.

		String routingKey = "ABC";

		Object messageObject = null;
		// Populate the message object

		rabbitTemplate.convertAndSend(routingKey, messageObject);
	}
}

The other way to send a message is to make use of the Message Creator object which actually creates a message before sending it. The following code will demonstrate that.
SendMessageTest2.java

package net.javabeat.articles.spring.amqp.misc;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageCreator;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class SendMessageTest2 {

	public static void main(String[] args) {

		RabbitTemplate rabbitTemplate = null;
		// Populate the rabbitTemplate object.

		String routingKey = "ABC";

		Object messageObject = null;
		// Populate the message object

		MyMessageCreator creator = new MyMessageCreator();
		rabbitTemplate.send(routingKey, creator);
	}
}

class MyMessageCreator implements MessageCreator{

	@Override
	public Message createMessage() {

		MessageProperties properties = null;
		// Populate the properties object

		Message myCustomMessage = new Message("ABCD".getBytes(), properties);
		return myCustomMessage;
	}
}

In the above code, the message creating process is externalized from the code that sends the message with the help of MessageCreator interface. Messages can be received either in synchronous or in asynchronous fashion. In the example code that we saw in the last section, we received the message synchronously. Technically this means that the method call is blocked until the message is consumed by the message receiver application. The following code will explain the nature of receiving asynchronous messages.
ReceiveMessageTest1.java

package net.javabeat.articles.spring.amqp.misc;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

public class ReceiveMessageTest1 {

	public static void main(String[] args) {

		SimpleMessageListenerContainer container = null;
		// Instantiate the above container

		MyMessageListener listener = new MyMessageListener();
		container.setMessageListener(listener);
	}
}

class MyMessageListener implements MessageListener{

	@Override
	public void onMessage(Message message) {

		byte[] messageContent = message.getBody();
		System.out.println("Message receied is " + messageContent);
	}

}

In the above code, we have ensured that the Simple Message Listener Container has been set with a message listener object so that the method MessageListener.onMessage() is called whenever a message arrives.

Message Converters

Message Converter does the job of converting messages between Message and a regular java object which is represented through MessgeConverter interface. Have a look at the following code.
ConverterTest1.java

package net.javabeat.articles.spring.amqp.convert;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SimpleMessageConverter;

public class ConverterTest1 {

	public static void main(String[] args) {

		RabbitTemplate rabbitTemplate = null;
		// Instantiate the above object.

		SimpleMessageConverter converter = new SimpleMessageConverter();
		rabbitTemplate.setMessageConverter(converter);

		Object message = null;
		// Instantiate the above message object

		rabbitTemplate.convertAndSend(message);
	}
}

In the above code, before sending the message, we have created an instance of SimpleMessgeConverter object and attached it to the RabbitTemplte object. However, this is not needed, since if the message converter object is not explicitly specified by the caller, it will make use of SimpleMessageConverter for converting the message. If the encoding of the message object is set to text/plain, then this converter will convert the byte content into a String, whereas if the encoding is set to application/x-java-serialized-object, then it wil use Java Seriaization mechanism for de-serializing the byte array (found in the message) into a Java object. It is always possible to hook in a Custom Message Converter implementation by extending the interface MessageConverter.

Conclusion

This article provided introductory knowledge on Spring’s AMQP for sending and receiving message through Spring’s configuration mechanism. It outlined the benefits of this technology over the rest of the messaging solutions. Setting up the environment for running the sample was also explained in greater depth. Through the usage of the sample application, the various concepts like Exchange, Queue and Message objects were also demonstrated. Finally the article concluded with exploring the alternate APIs for sending and receiving messages through Message Creators and Message Post Processors along with Message Converters.

also read:

Comments

comments

About Krishna Srinivasan

He is Founder and Chief Editor of JavaBeat. He has more than 8+ years of experience on developing Web applications. He writes about Spring, DOJO, JSF, Hibernate and many other emerging technologies in this blog.

Trackbacks

  1. […] Job Scheduling support with JDK Timer and QuartzTesting Support in Spring FrameworkSpring PythonSending and Receiving messages using Spring’s AMQPSpring Framework FORM […]

Speak Your Mind

*