Jun 21, 2013

The best Spring JMS Tutorial with ActiveMQ

In this post I'll look at Springs messaging support and how it can be used to integrate with Message Oriented Middleware (MOM) offerings such as Apache ActiveMQ. Although the sample application in this post will use ActiveMQ as its message broker, the application itself is vendor agnostic and can  integrate with any JMS compliant messaging platform. I've kept the application as loosely coupled from ActiveMQ as possible and will highlight the bits that would need to change if you were to choose another message platform such as IBM MQSeries.

Technologies Used
The sample code in this post will be built and deployed as a simple web application. Obviously this doesn't have to be the case but I've chosen to build a web app as these type of enterprise integration components tend to be deployed as web applications in the real world. The sample app will be built using the following stack.
  • Apache ActiveMQ - JMS Message Broker(not part of the actual application but used to test our applications JMS fucntionality)
  • Spring 3.1
  • Maven
  • Tomcat

Setting Up ActiveMQ
If you don't already have ActiveMQ you'll need to download it at http://activemq.apache.org/download.html The latest version at the time of writing is 5.6.0. To set up ActiveMQ follow the steps below.
  1. Copy the downloaded zip to C:\Program Files and unzip. 
  2. Open a command window and cd to C:\Program Files\apache-activemq-5.6.0-bin\apache-activemq-5.6.0\bin.
  3. Start ApacheMQ by calling activemq.bat
ActiveMQ should start up as shown in figure 1.0 below.
Figure 1.0 ActiveMQ Start-up
Now that ActiveMQ has started we can open the admin console by navigating to http://localhost:8161/admin/index.jsp. On the home page we'll see some information about the broker and some administration menu options.
Figure 2.0 ActiveMQ Admin Console
Next we need to create 2 new Queues, one that our sample application will consume messages from and another Queue that we'll write messages to. Click on the Queues link and create a new Queue by entering a Queue name and clicking submit. For our sample application we'll create 2 Queues, TestQueueOne and TestQueueTwo as shown in figure 3.0 below.
Figure 3.0 Create New Queues
Now that we have our message broker set up lets take start building an application to use it.

Creating the Project
We'll start off by creating a simple Spring web project like the one shown in figure 4.0 below.
Figure 4.0 Project Structure

Spring Configuration
We'll start by defining the Spring configuration for our application. I've commented the configuration below to help explain the main components. The most important parts are the DefaultMessageListenerContainer (used to consume messages) and the JMSTemplate (used to put message onto a queue).
1:  <?xml version="1.0" encoding="UTF-8"?>  
2:  <beans xmlns="http://www.springframework.org/schema/beans"  
3:                 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
4:                 xmlns:context="http://www.springframework.org/schema/context"  
5:                 xmlns:jee="http://www.springframework.org/schema/jee"  
6:                 xsi:schemaLocation="http://www.springframework.org/schema/beans  
7:                                       http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
8:                                       http://www.springframework.org/schema/context  
9:                                       http://www.springframework.org/schema/context/spring-context-3.0.xsd  
10:                                      http://www.springframework.org/schema/beans  
11:                                      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
12:                                      http://www.springframework.org/schema/jee  
13:                                      http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">  
14:    
15:    
16:       <!-- Use Springs JNDI support to look up JMS Connection Factory and Queue definitions from the  
17:            container. This means that specific connection details are not embedded in the application  
18:        -->  
19:       <jee:jndi-lookup id="mqConnectionFactory" jndi-name="java:comp/env/jms/mqConnectionFactory" />  
20:       <jee:jndi-lookup id="testQueueOne" jndi-name="java:comp/env/jms/testQueueOne" />  
21:       <jee:jndi-lookup id="testQueueTwo" jndi-name="java:comp/env/jms/testQueueTwo" />  
22:    
23:       <!-- Our message listener implementation that implements the JMS MessageListener interface and implements the  
24:             onMessage method to process incoming messages  
25:        -->  
26:       <bean id="testMessageListener" class="com.blog.spring.jms.TestMessageListener">  
27:          <property name="testMessageSender" ref ="testMessageSender" />  
28:       </bean>  
29:    
30:       <!-- DefaultMessageListenerConatiner is the Spring equivalent to an EJB Message Driven Bean.  
31:          It polls and consumes messages from a JMS queue. The configuration below is as follows  
32:    
33:          1. connectionFactory - the connection factory definition used to connect to the Message Broker  
34:             which in our case is Active MQ  
35:          2. destination - the Queue which the MessageListener container is listening on from incoming messages  
36:          3. messageListener - the implementation class that will actually handle the incoming messages. The  
37:             DeafultMesssageListener takes messages from the queue and passes them to the message listener for  
38:             processing. We've defined our message listener above (testMessageListener)  
39:          4. concurrentConsumers - this is the number of threads that the DeafultMesaegListenerContainer will  
40:             spawn to handle incoming messages. The default is 1 but in our application we'll have 2 separate  
41:             threads processing incoming messages.  
42:        -->  
43:       <bean id="poiMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
44:          <property name="connectionFactory" ref ="mqConnectionFactory" />  
45:          <property name="destination" ref ="testQueueOne"/>  
46:          <property name="messageListener" ref ="testMessageListener"/>  
47:          <property name="concurrentConsumers" value="2" />  
48:       </bean>  
49:    
50:       <!-- MessageSender is a simple POJO that we supply with a JMSTemplate and  
51:            the Queue that we want to send messages to  
52:        -->  
53:       <bean id="testMessageSender" class="com.blog.spring.jms.TestMessageSender">  
54:          <property name="jmsTemplate" ref="jmsTemplate"/>  
55:          <property name="testQueue" ref="testQueueTwo"/>  
56:       </bean>  
57:    
58:       <!-- JMSTemplate is a Spring template that allows us to communicate with  
59:            a message broker via JMS. JMSTemplate takes care of boiler plate code such as exception handling  
60:            and resource management such as connection pooling. This allows us concentrate on solving the 'business'  
61:            problem. We supply the JMS template with the connection factory mentioned above  
62:        -->  
63:       <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
64:          <property name="connectionFactory" ref="mqConnectionFactory" />  
65:       </bean>  
66:    
67:  </beans>  

JMS Resource Configuration
Next we'll configure the ActiveMQ Connection Factory and Message Queues which we referenced on lines 19 to 21 above. These resources are defined in context.xml, a configuration file that Tomcat uses to look up runtime resources. This approach allows us to define connection details on the Servlet Container and outside of the actual application, which ensures that our application is not tightly coupled to the the JMS implementation. The resource lookups in context.xml is the only place where we reference ActiveMQ specifics. The application itself does not contain any reference to ActiveMQ, which means that we could easily switch to another JMS broker without changing any code in our application. To integrate with a different JMS broker all we'd need to do is update the context.xml to lookup Connection Factory and Queue definitions specific to our new JMS implementation, IBM MQSeries for example. Our context.xml is defined below.
1:  <?xml version="1.0" encoding="UTF-8"?>  
2:  <Context>  
3:    
4:    <!--  
5:          Active MQ Connection Factory manages pooled connections  
6:          to the ActiveMQ broker. Tomcat will connect with the  
7:          broker using a TCP connection on port 61616 - this is the  
8:          default port for ActiveMQ  
9:    -->  
10:    <Resource name="jms/mqConnectionFactory"  
11:              auth="Container"  
12:              type="org.apache.activemq.ActiveMQConnectionFactory"  
13:              description="JMS Connection Factory"  
14:              factory="org.apache.activemq.jndi.JNDIReferenceFactory"  
15:              brokerURL="tcp://localhost:61616" />  
16:    
17:    <!--  
18:          This is a reference to the first Queue we defined  
19:          earlier in the ActiveMQ admin console  
20:    -->  
21:    <Resource name="jms/testQueueOne"  
22:              auth="Container"  
23:              type="org.apache.activemq.command.ActiveMQQueue"  
24:              factory="org.apache.activemq.jndi.JNDIReferenceFactory"  
25:              physicalName="TestQueueOne"/>  
26:    
27:    <!--  
28:          This is a reference to the second Queue we defined  
29:          earlier in the ActiveMQ admin console  
30:    -->  
31:    <Resource name="jms/testQueueTwo"  
32:              auth="Container"  
33:              type="org.apache.activemq.command.ActiveMQQueue"  
34:              factory="org.apache.activemq.jndi.JNDIReferenceFactory"  
35:              physicalName="TestQueueTwo"/>  
36:    
37:  </Context>  
The final part of our application configuration is web.xml. Here we simply define the context loader listener and point it at the Spring configuration file we defined earlier. 
1:  <?xml version="1.0" encoding="UTF-8"?>  
2:  <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
3:             xmlns="http://java.sun.com/xml/ns/javaee"  
4:             xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"  
5:             xsi:schemaLocation="http://java.sun.com/xml/ns/javaee  
6:                                 http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"  
7:             id="WebApp_ID"  
8:             version="2.5">  
9:    
10:       <!--  
11:            Main configuration file for this Spring web application.  
12:       -->  
13:       <context-param>  
14:            <param-name>contextConfigLocation</param-name>  
15:            <param-value>  
16:                 /WEB-INF/config/spring-config.xml  
17:            </param-value>  
18:       </context-param>  
19:    
20:       <!--  
21:            Loads the Spring web application context using the config file defined above.  
22:       -->  
23:       <listener>  
24:            <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>  
25:       </listener>  
26:    
27:  </web-app>  

Message Listener
Now we're ready to start writing some code. You may be glad to know that you that don't have to write very much code to consume messages from a JMS broker. In fact all we need is a class that implements the MessageListener interface - we override the onMessage method which is then invoked by the DefaultMessageListenerContainer (defined earlier) when a message is read from the queue. Our implementation of onMessage is very straight forward - we simply log the incoming message and then pass it to the message sender (defined later). The Message Listener class is defined below.
1:    
2:  package com.blog.spring.jms;  
3:    
4:  import javax.jms.JMSException;  
5:  import javax.jms.Message;  
6:  import javax.jms.MessageListener;  
7:  import javax.jms.TextMessage;  
8:  import org.apache.log4j.Logger;  
9:    
10:  /**  
11:   * Class handles incoming messages  
12:   *  
13:   * @see PointOfIssueMessageEvent  
14:   */  
15:  public class TestMessageListener implements MessageListener  
16:  {  
17:    
18:       private TestMessageSender messageSender_i;  
19:       private static final Logger logger_c = Logger.getLogger(TestMessageListener.class);  
20:    
21:       /**  
22:        * Method implements JMS onMessage and acts as the entry  
23:        * point for messages consumed by Springs DefaultMessageListenerContainer.  
24:        * When DefaultMessageListenerContainer picks a message from the queue it  
25:        * invokes this method with the message payload.  
26:        */  
27:       public void onMessage(Message message)  
28:       {  
29:            logger_c.debug("Received message from queue [" + message +"]");  
30:    
31:            /* The message must be of type TextMessage */  
32:            if (message instanceof TextMessage)  
33:            {  
34:                 try  
35:                 {  
36:                      String msgText = ((TextMessage) message).getText();  
37:                      logger_c.debug("About to process message: " + msgText);  
38:    
39:                      /* call message sender to put message onto second queue */  
40:                      messageSender_i.sendMessage(msgText);  
41:    
42:                 }  
43:                 catch (JMSException jmsEx_p)  
44:                 {  
45:                      String errMsg = "An error occurred extracting message";  
46:                      logger_c.error(errMsg, jmsEx_p);  
47:                 }  
48:            }  
49:            else  
50:            {  
51:                 String errMsg = "Message is not of expected type TextMessage";  
52:                 logger_c.error(errMsg);  
53:                 throw new RuntimeException(errMsg);  
54:            }  
55:       }  
56:    
57:       /**  
58:        * Sets the message sender.  
59:        *  
60:        * @param messageSender_p the new message sender  
61:        */  
62:       public void setTestMessageSender(TestMessageSender messageSender_p)  
63:       {  
64:            this.messageSender_i = messageSender_p;  
65:       }  
66:  }  
67:    

MessageSender
Now that our application can consume messages lets take a look at how we push messages onto a queue. We defined a JMSTemplate in the Spring configuration earlier - this class provides a convenient way of pushing messages onto a queue and saves us from having to write boiler plate code for opening and closing connections, handling JMS exceptions etc. This allows the developer to simply specify a queue and the message payload that you want to push onto that queue.
1:  package com.blog.spring.jms;  
2:    
3:  import javax.jms.JMSException;  
4:  import javax.jms.Queue;  
5:  import org.apache.log4j.Logger;  
6:  import org.springframework.jms.core.JmsTemplate;  
7:  import org.springframework.stereotype.Service;  
8:    
9:  /**  
10:   * The TestMessageSender class uses the injected JMSTemplate to send a message  
11:   * to a specified Queue. In our case we're sending messages to 'TestQueueTwo'  
12:   */  
13:  @Service  
14:  public class TestMessageSender  
15:  {  
16:       private JmsTemplate jmsTemplate_i;  
17:       private Queue testQueue_i;  
18:       private static final Logger logger_c = Logger .getLogger(TestMessageSender.class);  
19:    
20:       /**  
21:        * Sends message using JMS Template.  
22:        *  
23:        * @param message_p the message_p  
24:        * @throws JMSException the jMS exception  
25:        */  
26:       public void sendMessage(String message_p) throws JMSException  
27:       {  
28:            logger_c.debug("About to put message on queue. Queue[" + testQueue_i.toString() + "] Message[" + message_p + "]");  
29:            jmsTemplate_i.convertAndSend(testQueue_i, message_p);  
30:       }  
31:    
32:       /**  
33:        * Sets the jms template.  
34:        *  
35:        * @param template the jms template  
36:        */  
37:       public void setJmsTemplate(JmsTemplate tmpl)  
38:       {  
39:            this.jmsTemplate_i = tmpl;  
40:       }  
41:    
42:       /**  
43:        * Sets the test queue.  
44:        *  
45:        * @param queue the new test queue  
46:        */  
47:       public void setTestQueue(Queue queue)  
48:       {  
49:            this.testQueue_i = queue;  
50:       }  
51:  }  

Testing the Application
Now that the application is complete lets test it. Using the ActiveMQ admin console we'll put a message onto TestQueueOne. Conveniently you can tell the broker to submit the message as may times as you like. We'll ask the broker to submit our message 100 times so that we have time to see  it get picked up and processed in the eclipse console. When we put a message onto TestQueueOne we should see the following
  1. DefaultMessageListenerContainer will pick up the next message from TestQueuOne and invoke the onMessage method in our Message listener class with the message payload.
  2. onMessage will log the message payload and then call the TestMessageSender sendMessage with the message we've just received.
  3. TestMessageSender will use the JMSTemplate to put the message onto TestQueueTwo. 
To test the application follow the steps below.
  • In the ActiveMQ admin console go to the TestQueueOne definition and click the SendTo link. Add a message and set the 'Number of Messages to Send' to 100. This will ensure that broker puts the message onto the queue 100 times.
Figure 5.0 Send JMS Message
  • Click Send. 100 messages will be put onto the queue and you should see the application log the consumption of each message as shown in the log extract below.
 DEBUG: [Sep-16 22:59:29,911] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 96, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:92, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725510, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@855da06, marshalledProperties = org.apache.activemq.util.ByteSequence@3c5cc430, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:29,911] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:29,911] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]  
 DEBUG: [Sep-16 22:59:29,947] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 97, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:93, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725510, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6e544a45, marshalledProperties = org.apache.activemq.util.ByteSequence@5fd83099, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:29,947] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:29,948] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]  
 DEBUG: [Sep-16 22:59:29,986] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 98, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:94, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725510, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6a5ebdf7, marshalledProperties = org.apache.activemq.util.ByteSequence@7209d9af, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:29,986] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:29,986] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]  
 DEBUG: [Sep-16 22:59:30,022] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 99, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:95, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725511, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@64b2aaa6, marshalledProperties = org.apache.activemq.util.ByteSequence@de1abf0, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:30,022] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:30,022] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]  
 DEBUG: [Sep-16 22:59:30,052] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 100, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:96, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725484, arrival = 0, brokerInTime = 1347832725511, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5adf20ae, marshalledProperties = org.apache.activemq.util.ByteSequence@6edaae1d, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]  
 DEBUG: [Sep-16 22:59:30,053] spring.jms.TestMessageListener - About to process message: Test Message!!  
 DEBUG: [Sep-16 22:59:30,053] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]  
  • Now take a look at the number of messages on TestQueueTwo - it should now have 100 pending messages. These are the 100 messages that were consumed from TestQueueOne and pushed onto TestQueueTwo by our application. See screenshot below.  
Figure 6.0 Messages pushed to TestQueueTwo

(Source: Internet)

No comments:

Post a Comment