Introduction
RabbitMQ is a fantastic way for small applications to communicate with each other. That is why it is an ideal technology to use for developing micro services. Micro services are small applications that can be developed quickly, can easily be tested in isolation, and can be monitored and maintained with ease. To facilitate the communications, using RabbitMQ or any other messaging broker technologies is one way to get your micro services ecosystem up quickly. There are a couple things I prefer RabbitMQ over the other messaging broker technologies:
- RabbitMQ supports mainly queue and async based messaging exchange. This allows the sender to send as many messages as possible without caring whether they are processed at destination.
- RabbitMQ can be used as a temporary message storage and a retry mechanism, and you can write your micro service in a way that for any unacknowledged messages, the message get retried until it is either processed or pull out of the queue by force. RabbitMQ when setup correctly, will never lose message (I am sure this is true with many other alternatives out there, so similar technique can apply to these messaging brokers).
- You can design your system and use RabbitMQ as a ESB. RabbitMQ supports several different ways of routing messages which can be taken advantages to move messages like an ESB.
- It has a good simple and easy to use security system.
- It is fairly easy to setup and manage.
One question people might ask is what about synchronous message processing? I used to consider synchronous processing is important and necessary. It is not. As far as I can tell any synchronous operations between two service app can be expressed in asynchronous ways, the down side might be that the operation might be a bit hard to understand. By doing so, you can gain an advantage, both service apps do not have to be up in order to talk to each other, either one can send messages as fast as they could, and expect the other to process successfully. And it will because RabbitMQ guarantees the message will be delivered. This is basically a primer on async messaging service architecture.
When I started working on RabbitMQ, I was using .NET, and I didn't like it much. What I need was a mature framework that has a good documentation, and a great wrapper over the RabbitMQ barebone APIs. Before I left that job, I started experimenting with Spring AMQP. I liked what I saw. This tutorial article is the summary of what I have learned.
Prerequisite
In order to run the this sample project and its associated client application. You must do the following
- Install latest updated JDK for 1.7
- Install latest Maven 3.3.x
- Install Erlang and RabbitMQ. You can go to RabbitMQ official site and check the installation guide
- Get familiar with RabbitMQ, you have to create a user, a virtual host, and associate the user with the virtual host.
This tutorial is aimed for developers who had previous experience with Spring framework, and RabbitMQ messaging framework. If you are unfamiliar with this, please skip this tutorial. Also having some knowledge of Maven build tools can also help you understand this tutorial.
For this sample project, I have created a user for RabbitMQ called "testuser" and the password is "secret". This is only for test purpose. Please do not use this credential in production systems. I assigned administrator role my "testuser". Then I created a virtual host called "hanbo1", and give full access of this virtual host to the user. That is the pre-development setup I have to do for this sample project. Again, if you find these simple steps of preparation hard to understand, you should first get familiar with the related technologies before reading any further.
Objective
My objective for this sample project is as the following:
- I want to create a sample micro service that can receive mock user requests and handle it. The sample request should be received as a POJO (Plain Old Java Object).
- I want to create a sample client application that can send the request as a POJO, which will be received by the sample micro service.
These two objectives should sufficiently demo the capability of Spring AMQP. The micro service and the client both deals with requests (and responses) as regular POJO, the transportation of the requests and response would be in the form of JSON strings. The conversion between POJO and JSON string is automatically taken care by Spring or other 3rd party components.
A Quick Primer on RabbitMQ
In order to be able to receive or send messages to RabbitMQ messaging broker, you need the following information:
- The server URI, the user name, password, and virtual host. These help the service application connect to the RabbitMQ messaging broker.
- The exchange name, the queue name, and the routing key
The difference between RabbitMQ and JMS based messaging brokers is that RabbitMQ is mainly queue based, ideal for asynchronous message processing. JMS messaging broker supports both synchronous and asynchronous message processing. The way message flow through RabbitMQ messaging broker is different from JMS messaging brokers. The message first hits an exchange setup either manually by user or automatically by application. Then the exchange has one or more routing keys stored, each routing key will route the message to a named queue. Whatever is subscribed to the queue will then receive the message. There can be multiple listeners per queue, and one message can only be consumed by one consumer at a time.
One cool thing about RabbitMQ, and possibly for some other types of messaging systems is that if the message received by client is not acknowledged, the message goes back to the queue. In the case of RabbitMQ, the message goes back to the top of queue. This is great as a retry mechanism. In order to use this to your advantage, what you need to do is to configure your application context in a way that whenever an exception happened, the unhandled message goes back to the queue. This article will demonstrate how this can be done (with minimum effort). By doing this, it can also blow up in your face. Imagine this, when an unhandled exception occurred repeatedly, the message also goes back to the queue repeatedly and the application would stuck in a loop. The easiest way to correct this is to put a catch block for all known exceptions, and hope that there is no unhandled exceptions that would cause this type of issues. If uncaught exception would be a concern, then finding a way to catch all the exceptions. But in doing so. If you do this, you might lose the advantage of using RabbitMQ as a retry mechanism.
That is about it as a quick primer. Next. I will explain each step I have taken to create this workable service and client program.
Step 1 -- Create the project folder structure
This tutorial consists of two projects, one is for the service, and the other is the client program. Both projects should have similar file structures, Here is the file structure for the service application:
<project base dir>
|
-- src
|
-- main
|
-- java
|
-- org
|
-- hanbo
|
-- amqp
|
-- sample
|
-- messages
|
-- UserDetail.java
|
-- services
|
-- UserDetailService.java
|
-- Main.java
|
-- RabbitMessageListener.java
|
-- resources
|
-- application-context.xml
|
-- log4j.properties
|
---- pom.xml
The project file structure for the client program is almost the same. The main difference is in the package directories. You should be able to check it after downloading the sample zip archive (on the top).
Step 2 - Setting up Maven POM File
The first thing I did is create a Maven pom.xml for my project. Inside I define all the dependencies I need. You might ask how would one know what dependencies to use. Here is how I figure them out:
- This is a spring based application, so I will include some most core spring dependencies. Like spring-core, spring-context, and spring-beans. Once I have these three core dependencies, I can add the Spring AMQP dependencies.
- I am only playing with Spring AMQP, by searching online I found that the only ones I need is spring-amqp, and spring-rabbit.
- Later, I have to use the Jackson framework for converting between regular Java objects and JSON formatted strings. So I added some Jackson related dependencies.
- I also added a couple logging related dependencies so that when I ran the sample project, I can trouble-shoot by looking at a nice looking log file (if necessary).
My Maven POM file looks like the following:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.hanbo.spring-amqp-test</groupId>
<artifactId>spring-amqp-testapp</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>spring-amqp-testapp</name>
<url>http://maven.apache.org</url>
<properties>
<spring.version>3.2.11.RELEASE</spring.version>
<spring.amqp.version>1.3.9.RELEASE</spring.amqp.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
<version>${spring.amqp.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring.amqp.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.3</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
</dependencies>
<build>
<finalName>amqp-test</finalName>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<phase>install</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
The first part of the POM file defines all the necessary dependencies. What is cool about Maven is that when building the binary executable, it will not only download the specified dependencies, but also any needed dependencies that are not specified in the POM file. The second part of the POM file defines the build process. I choose not only the compiler plugin, but also the dependency plugin so that it takes all the needed jars into a specific location. This allows me to easily copy not only my jar but all the dependency jars to anywhere I want.
Note that I am using the older version of the spring-amqp dependencies. It is possible that when you download the newer version of the dependencies certain things in this sample project might not work. If this happens, please do you best by looking up answers online.
Step 3 - The Main Entry
The easiest part of the service project is the main entry class, here is the source code:
package org.hanbo.amqp.sample;
import org.springframework.context.support.GenericXmlApplicationContext;
public class Main
{
@SuppressWarnings("resource")
public static void main(String[] args)
{
GenericXmlApplicationContext context =
new GenericXmlApplicationContext("classpath:/application-context.xml");
context.registerShutdownHook();
}
}
In this class, there is just one method which is the main entry of the entire application. It does only two things, first loading the application context file from classpath. Then it will register the shutdown hook. The shutdown hook is used when application receives a shutdown signal from the system, it will perform clean up before exiting.
When the application starts, it will run for ever. When user press Ctrl+C, the application will trap the shutdown signal and perform the clean up.
Step 4 - Application Context
Before I get into the details on how to create the XML file for defining application context, I just want to point out that it is not necessary to have an XML definition of your application context. As an alternative, you can create a Java class (POJO) for the dependency injection configuration. The reason I choose to use an XML is that I have a lot of projects that uses XML based application context definitions, creating a sample project out of these existing projects is very easy for me. I also like the separation of configuration from the Java source code. It makes the source code cleaner. And it is just a personal preference.
There is a lack of documentation. I used to do extensive research online trying to find out what elements I need to create such an application context. It all changed when I found the XSD (spring-rabbit-1.3.xsd), and I had no problem creating the XML file. Later, I will explain how you can utilize the XSD files to find the elements you need.
Let me show you the entire application context definition:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
<context:component-scan base-package="org.hanbo.amqp.sample" />
<rabbit:connection-factory id="connectionFactory"
host="localhost"
port="5672"
virtual-host="hanbo1"
username="testuser" password="secret"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
message-converter="defaultMessageConverter" />
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="testQueue2" durable="true"/>
<rabbit:direct-exchange name="test-exchange2" durable="true">
<rabbit:bindings >
<rabbit:binding queue="testQueue2" key="org.hanbo.amqp.test2"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:listener-container
connection-factory="connectionFactory"
acknowledge="auto"
requeue-rejected="true"
message-converter="defaultMessageConverter">
<rabbit:listener ref="messageListener" method="handleIncoming" queues="testQueue2"/>
</rabbit:listener-container>
<bean id="messageListener" class="org.hanbo.amqp.sample.RabbitMessageListener"></bean>
<bean id="defaultMessageConverter"
class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
</beans>
This application context is the heart of the service application. It ties all components together to accomplish a common goal. Let me just walk through all the components of this file, and it will make sense. The first is the root element of this XML file:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
...
</beans>
This root element (called "beans") has a number of namespaces inside its definition, which is a way to import additional xsd element definitions from various xsd files. One of them is the xmlns:rabbit
.
xsi:schemaLocation="
...
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd"
This imports a handful of spring-amqp specific xml object definitions that I can use. It took me a while to figure out how to use the elements I needed. If you try to find a formal definition of each of the elements from the official Spring AMQP reference document, you are wasting your time. Here is the trick I have learned, by copying and pasting the URI of this namespace (http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd) to the browser navigation bar, I was able to get the full XSD. From there I was able to find out the elements I need. In order to utilize this XSD file, you need to know how XSD syntax work. Further more, you need to be able to correlated these elements to the RabbitMQ configurations.
The next part of this file is a no-brainer. I want Spring to scan any Java packages I have defined for injectable beans.
<context:component-scan base-package="org.hanbo.amqp.sample" />
What is nice about this is that Spring will scan not only the specified package, but also the sub packages under it.
For the next couple blocks, it is about the RabbitMQ related beans. The first one is called rabbit:connection-factory
. It defines a connection factory which manufactures connections to the RabbitMQ broker. I don't use this bean directly in my code, instead I pass it to the next bean. Before I get to the next bean, let's examine the definition of rabbit:connection-factory
:
<rabbit:connection-factory id="connectionFactory"
host="localhost"
port="5672"
virtual-host="hanbo1"
username="testuser" password="secret"/>
The definition of this bean is not hard to understand:
- The attribute id defines the name of the bean. Other beans can use this id to reference this bean.
- The attribute host defines the location of the server. In this case it would be localhost.
- The attribute port defines the the port number which AMQP broker is listening.
- The attribute virtual-host (I found in the xsd) defines the source or destination virtual host.
- The username and password attributes defines the user credential that can be used to connect to the RabbitMQ broker. Note that a user can be assigned to a specific virtual host and not to another. In this case, the user "testuser" is assigned to virtual host "hanbo1". If I specify a different virtual host, the authorization will fail.
The next block defines two beans, 1) one is called rabbit:template
, and 2) the other is called rabbit:admin
.
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
message-converter="defaultMessageConverter" />
<rabbit:admin connection-factory="connectionFactory"/>
I don't know why they choose the name rabbit:template
. This bean is a sender and receiver, very much like a combination of JMS consumer and producer, capable of both sending message and receiving message. This bean has an id which allows it to be used in Java code as an object. It also needs a reference to a connection factory, which is defined earlier. Finally, this bean takes a reference of message converter. The reason for this is to have automatic conversion of POJOs and non plain old Java object formats (like XML or JSON). With this converter, the service application or the client can deal with Java objects without worrying explicit conversions. Although there is nothing wrong about explicit conversion. What you need to do is make sure you create a common method or object for such conversion and use it in the key locations where conversion is necessary. What you don't want to do is duplicate the same conversion code segment all over your application. If you do so, and someone tells you that you need to change the conversion process, you will have a hard time locating all the places and make the change.
The next one is rabbit:admin
. As name suggested, it is used to do administration against the RabbitMQ broker. The reason why this is necessary because when the service/client application first logged into the RabbitMQ broker, the queues and routing key might not be registered (created). Rather than creating the queues and routing keys manually, your service application and client can use this rabbit:admin
to create them automatically. This bean only takes the id of the connection factory bean.
Next, I will configure the exchange and queue, and how messages can be routed via the routing key. There are several types of exchanges:
- Direct exchange queues all the messages to specific queues based on the routing key. This is what I am using.
- Fanout exchange queues the same messages to multiple queues, it ignores the routing key.
- Topic based exchange queues the messages to queues based on routing key and some matching patterns. Basically routing message by routing key and some topic associated with the queue.
- Header based exchange queues the messages by looking at the header data, and match the message to specific queues.
What I use is the direct exchange, which allows me to route messages by the routing key. The bean definition for this is as the following:
<rabbit:queue name="testQueue2" durable="true"/>
<rabbit:direct-exchange name="test-exchange2" durable="true">
<rabbit:bindings >
<rabbit:binding queue="testQueue2" key="org.hanbo.amqp.test2"/>
</rabbit:bindings>
</rabbit:direct-exchange>
The first line declares the queue. It has a name, and another attribute called durable which is set to "true". At the RabbitMQ end, this will create a durable queue, which is when it doesn't exist, the application will create it; and it will not disappear if the application exits. The lines after the queue bean is defining the exchange bean. The definition is quite readable, the bean will create an exchange (if it doesn't exists) called "test-exchange2". Again, the exchange will be a durable exchange. Inside the bean definition, there is the binding property where I bind the queue to this exchange and assign the routing key. I believe it is possible to assign multiple routing keys to the same queue. But I didn't try in this example.
The next one is the definition of listeners. Listeners (or observers) are put into a listener container. Defining the listener container bean was the most time consuming work, I had to check online to find how to properly configure this bean. The listeners can consume messages off the queue then pass it to a service object for processing. The process should be as transparent as possible. That is, the listener should automatically acknowledge the message; and if exception prevents the message to be processed, automatically return the message back to the queue; finally, text based message should be converted into a POJO for the back end services to handle. Here is my definition of the listener container and listener beans:
<rabbit:listener-container
connection-factory="connectionFactory"
acknowledge="auto"
requeue-rejected="true"
message-converter="defaultMessageConverter">
<rabbit:listener ref="messageListener" method="handleIncoming" queues="testQueue2"/>
</rabbit:listener-container>
The definition of the listener-container bean first takes a reference of the connection factory object. It is set to acknowledge the messages automatically (acknowledge = "auto"). If the listener service object failed to process the message due to unhandled exceptions, the message should be returned to the queue (requeue-rejected = "true"). As I have mentioned before, I would show you how to configure your application to use RabbitMQ as a temporary storage and retry mechanism, this is it!
Then I associate the message converter as the default message converter for the container. This default message converter converts JSON formatted messages into POJO. Here is the definition of the message converter:
<bean id="defaultMessageConverter"
class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
Inside the listener container, there can be one or more listeners. Each listener is a user-defined object that can handle the incoming message object (after it is converted to POJO). The listener properties in listener container defines the reference to the bean instantiated from the user defined object type. The listener properties also specifies the method inside the user defined class that will handle the message. The last attribute of the listener line specifies which queue to pull the message from.
Finally here is the definition of the message listener bean:
<bean id="messageListener" class="org.hanbo.amqp.sample.RabbitMessageListener"></bean>
This are the basic elements of my application context. In the next section, I will quickly go over the client program's main entry and application context; then to the source code of the service application.
Step 5 -- The Main Entry and Application Context of Client Program
The client program is the simplest program I have written since I held my first job. Here is the code:
package org.hanbo.amqp.sample.client;
import org.hanbo.amqp.sample.messages.UserDetail;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.support.GenericXmlApplicationContext;
public class ClientMain
{
public static void main(String[] args)
{
GenericXmlApplicationContext context =
new GenericXmlApplicationContext("classpath:/application-context.xml");
AmqpTemplate sender = context.getBean(AmqpTemplate.class);
for (int i = 0; i < 100; i++)
{
sendAmqpMessage(i, sender);
}
context.close();
}
private static void sendAmqpMessage(int id, AmqpTemplate sender)
{
UserDetail userDetail = new UserDetail();
userDetail.setUserActive(id % 2 == 1? false : true);
userDetail.setUserEmail(String.format("testuser%d@test.org", id));
userDetail.setUserId(123 + id);
userDetail.setUserName(String.format("testuser%d", id));
sender.convertAndSend("test-exchange2", "org.hanbo.amqp.test2", userDetail);
}
}
This is the only Java source code for client program, all it does is get a message sender (a.k.a rabbit:template
), and send 100 messages. The message is created as a POJO, the sender will convert the message object into JSON format and sent. You can find this in the second method in above code. The first parameter is the name of the exchange; the second parameter is the routing key; the last one is the message object. The code is simple and self explanatory. This amqp template object type (a.k.a Java class RabbitTemplate
) has a lot of methods. The one I picked for sending the message is the one I feel most useful/appropriate to my design. You should do the same.
I also create the application context for this client program. What I have done is taking the application context from previous section, and strip off anything I don't need for the client, test it out. And that would be what the client program needed. Here it is:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd">
<context:component-scan base-package="org.hanbo.amqp.sample" />
<rabbit:connection-factory id="connectionFactory"
host="localhost"
port="5672"
virtual-host="hanbo1"
username="testuser" password="secret"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" message-converter="defaultMessageConverter" />
<rabbit:admin connection-factory="connectionFactory"/>
<bean id="defaultMessageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter"></bean>
</beans>
You should be able to compare the above one with the one in previous section, The difference is obvious -- no listener container, and no queue and exchange. application only receives. Now that we have seen the source code of the sample client, let's take a look of source code of the service application.
Step 6 -- The Message Object Definintion
In order to have client application and service application, a common message object type should be defined. Both applications should include the same message object type. This way, the sender side should be able to create the message object, pass it to the convertAndSend()
of the RabbitTemplate class. The message gets converted into a JSON string (well, or XML if you use a serializing framework for XML) and send to the receiving end:
package org.hanbo.amqp.sample.messages;
public class UserDetail
{
private String userName;
private String userEmail;
private int userId;
private boolean userActive;
public String getUserName()
{
return userName;
}
public void setUserName(String userName)
{
this.userName = userName;
}
public String getUserEmail()
{
return userEmail;
}
public void setUserEmail(String userEmail)
{
this.userEmail = userEmail;
}
public int getUserId()
{
return userId;
}
public void setUserId(int userId)
{
this.userId = userId;
}
public boolean isUserActive()
{
return userActive;
}
public void setUserActive(boolean userActive)
{
this.userActive = userActive;
}
}
This message type class UserDetail
is just a dummy class I created to demo the automatic conversion of the message object between two sides using JSON. What is more interesting is the listener class of the service application. That is the last piece of this example.
Step 7 -- The Listener Class of the Service Application
The source code of the listener class looks like this:
package org.hanbo.amqp.sample;
import org.hanbo.amqp.sample.messages.UserDetail;
import org.hanbo.amqp.sample.services.UserDetailService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMessageListener
{
@Autowired
private UserDetailService userDetailService;
public void handleIncoming(UserDetail userDetail)
throws Exception
{
userDetailService.processUserDetail(userDetail);
}
}
I use the @Component annotation to mark the class as a class that can be injected into other classes in the same spring application. Inside the class, there is just one property which is injected by the spring framework. I implemented this listener class not as the primary message handler. Instead, the message is handed off to the property named userDetailService
.
This class contains only one method handleIncoming()
. If you go back to the application context of the service application, you will see that this method is specified as the message handler method. Let me show you the segment:
<rabbit:listener-container
connection-factory="connectionFactory"
acknowledge="auto"
requeue-rejected="true"
message-converter="defaultMessageConverter">
<rabbit:listener ref="messageListener" method="handleIncoming" queues="testQueue2"/>
</rabbit:listener-container>
You should see the part method="handleIncomong"
. All this handleIncoming()
does is invoking the userDetailService's processUserDetail()
which will process the message. The definition of the UserDetailService
looks like this:
package org.hanbo.amqp.sample.services;
import org.hanbo.amqp.sample.messages.UserDetail;
import org.springframework.stereotype.Service;
@Service
public class UserDetailService
{
public void processUserDetail(UserDetail userDetail)
{
System.out.println(
String.format("User Name: %s", userDetail.getUserName())
);
System.out.println(
String.format("User Email: %s", userDetail.getUserEmail())
);
System.out.println(
String.format("User ID: %d", userDetail.getUserId())
);
System.out.println(
String.format("User Active: %b", userDetail.isUserActive())
);
}
}
I implemented this UserDetailService
class simply outputs the UserDetail object to the commandline window.
Essentially, all the code segments listed here are the key elements to make up this simple demo project. Next I will show you how to run the application.
Step 8 -- Run the Service and Client
Assuming you have installed Java 1.7, Maven and RabbitMQ broker, also properly configured the sample project with your own settings, you should be ready to test the sample project. There are several ways to do this:
- Run with Maven
- Run using Java
Using Maven to run the two program, first start the service program by using the following command:
mvn exec:java -Dexec.mainClass="org.hanbo.amqp.sample.Main"
To stop the service program, just hit Ctrl + C, and it will stop.
Once the service program is running successfully, time to try the client program. Open a new commandline window and run the following command line:
mvn exec:java -Dexec.mainClass="org.hanbo.amqp.sample.client.ClientMain"
The client will send 100 test messages to the service program. You will see the UserDetail being printed out on the console that runs the service program. Once the 100 messages are sent, the client program will shut down automatically.
If you want to be adventurous, feel free to try running the two programs using Java command. You can get all the dependent jars in target/lib after yo build with Maven command. Put dependent jars plus the jar of the sample programs to two locations (one for the service program and one for the client program). Then run the following command(s) (service first, then the client program):
java -cp<all the jars including the jar of the service or client program> "class of the main entry"
Good luck.
References
History
- 05/17/2016 initial draft.