Technologies Used
The sample code in this post will be built and deployed as a simple web
application. Obviously this doesn't have to be the case but I've chosen
to build a web app as these type of enterprise integration components
tend to be deployed as web applications in the real world. The sample
app will be built using the following stack.
- Apache ActiveMQ - JMS Message Broker(not part of the actual application but used to test our applications JMS fucntionality)
- Spring 3.1
- Maven
- Tomcat
Setting Up ActiveMQ
If you don't already have ActiveMQ you'll need to download it at http://activemq.apache.org/download.html The latest version at the time of writing is 5.6.0. To set up ActiveMQ follow the steps below.
Now that ActiveMQ has started we can open the admin console by navigating to http://localhost:8161/admin/index.jsp. On the home page we'll see some information about the broker and some administration menu options.
Next we need to create 2 new Queues, one that our sample application
will consume messages from and another Queue that we'll write messages
to. Click on the Queues link and create a new Queue by entering a Queue
name and clicking submit. For our sample application we'll create 2
Queues, TestQueueOne and TestQueueTwo as shown in figure 3.0 below.
Now that we have our message broker set up lets take start building an application to use it.
Creating the Project
Message Listener
- Copy the downloaded zip to C:\Program Files and unzip.
- Open a command window and cd to C:\Program Files\apache-activemq-5.6.0-bin\apache-activemq-5.6.0\bin.
- Start ApacheMQ by calling activemq.bat
Figure 1.0 ActiveMQ Start-up |
Figure 2.0 ActiveMQ Admin Console |
Figure 3.0 Create New Queues |
Creating the Project
We'll start off by creating a simple Spring web project like the one shown in figure 4.0 below.
Spring Configuration
JMS Resource Configuration
Next we'll configure the ActiveMQ Connection Factory and Message Queues which we referenced on lines 19 to 21 above. These resources are defined in context.xml, a configuration file that Tomcat uses to look up runtime resources. This approach allows us to define connection details on the Servlet Container and outside of the actual application, which ensures that our application is not tightly coupled to the the JMS implementation. The resource lookups in context.xml is the only place where we reference ActiveMQ specifics. The application itself does not contain any reference to ActiveMQ, which means that we could easily switch to another JMS broker without changing any code in our application. To integrate with a different JMS broker all we'd need to do is update the context.xml to lookup Connection Factory and Queue definitions specific to our new JMS implementation, IBM MQSeries for example. Our context.xml is defined below.
Figure 4.0 Project Structure |
Spring Configuration
We'll start by defining the Spring configuration for our application.
I've commented the configuration below to help explain the main
components. The most important parts are the
DefaultMessageListenerContainer (used to consume messages) and the
JMSTemplate (used to put message onto a queue).
1: <?xml version="1.0" encoding="UTF-8"?>
2: <beans xmlns="http://www.springframework.org/schema/beans"
3: xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4: xmlns:context="http://www.springframework.org/schema/context"
5: xmlns:jee="http://www.springframework.org/schema/jee"
6: xsi:schemaLocation="http://www.springframework.org/schema/beans
7: http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
8: http://www.springframework.org/schema/context
9: http://www.springframework.org/schema/context/spring-context-3.0.xsd
10: http://www.springframework.org/schema/beans
11: http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
12: http://www.springframework.org/schema/jee
13: http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">
14:
15:
16: <!-- Use Springs JNDI support to look up JMS Connection Factory and Queue definitions from the
17: container. This means that specific connection details are not embedded in the application
18: -->
19: <jee:jndi-lookup id="mqConnectionFactory" jndi-name="java:comp/env/jms/mqConnectionFactory" />
20: <jee:jndi-lookup id="testQueueOne" jndi-name="java:comp/env/jms/testQueueOne" />
21: <jee:jndi-lookup id="testQueueTwo" jndi-name="java:comp/env/jms/testQueueTwo" />
22:
23: <!-- Our message listener implementation that implements the JMS MessageListener interface and implements the
24: onMessage method to process incoming messages
25: -->
26: <bean id="testMessageListener" class="com.blog.spring.jms.TestMessageListener">
27: <property name="testMessageSender" ref ="testMessageSender" />
28: </bean>
29:
30: <!-- DefaultMessageListenerConatiner is the Spring equivalent to an EJB Message Driven Bean.
31: It polls and consumes messages from a JMS queue. The configuration below is as follows
32:
33: 1. connectionFactory - the connection factory definition used to connect to the Message Broker
34: which in our case is Active MQ
35: 2. destination - the Queue which the MessageListener container is listening on from incoming messages
36: 3. messageListener - the implementation class that will actually handle the incoming messages. The
37: DeafultMesssageListener takes messages from the queue and passes them to the message listener for
38: processing. We've defined our message listener above (testMessageListener)
39: 4. concurrentConsumers - this is the number of threads that the DeafultMesaegListenerContainer will
40: spawn to handle incoming messages. The default is 1 but in our application we'll have 2 separate
41: threads processing incoming messages.
42: -->
43: <bean id="poiMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
44: <property name="connectionFactory" ref ="mqConnectionFactory" />
45: <property name="destination" ref ="testQueueOne"/>
46: <property name="messageListener" ref ="testMessageListener"/>
47: <property name="concurrentConsumers" value="2" />
48: </bean>
49:
50: <!-- MessageSender is a simple POJO that we supply with a JMSTemplate and
51: the Queue that we want to send messages to
52: -->
53: <bean id="testMessageSender" class="com.blog.spring.jms.TestMessageSender">
54: <property name="jmsTemplate" ref="jmsTemplate"/>
55: <property name="testQueue" ref="testQueueTwo"/>
56: </bean>
57:
58: <!-- JMSTemplate is a Spring template that allows us to communicate with
59: a message broker via JMS. JMSTemplate takes care of boiler plate code such as exception handling
60: and resource management such as connection pooling. This allows us concentrate on solving the 'business'
61: problem. We supply the JMS template with the connection factory mentioned above
62: -->
63: <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
64: <property name="connectionFactory" ref="mqConnectionFactory" />
65: </bean>
66:
67: </beans>
JMS Resource Configuration
Next we'll configure the ActiveMQ Connection Factory and Message Queues which we referenced on lines 19 to 21 above. These resources are defined in context.xml, a configuration file that Tomcat uses to look up runtime resources. This approach allows us to define connection details on the Servlet Container and outside of the actual application, which ensures that our application is not tightly coupled to the the JMS implementation. The resource lookups in context.xml is the only place where we reference ActiveMQ specifics. The application itself does not contain any reference to ActiveMQ, which means that we could easily switch to another JMS broker without changing any code in our application. To integrate with a different JMS broker all we'd need to do is update the context.xml to lookup Connection Factory and Queue definitions specific to our new JMS implementation, IBM MQSeries for example. Our context.xml is defined below.
1: <?xml version="1.0" encoding="UTF-8"?>
2: <Context>
3:
4: <!--
5: Active MQ Connection Factory manages pooled connections
6: to the ActiveMQ broker. Tomcat will connect with the
7: broker using a TCP connection on port 61616 - this is the
8: default port for ActiveMQ
9: -->
10: <Resource name="jms/mqConnectionFactory"
11: auth="Container"
12: type="org.apache.activemq.ActiveMQConnectionFactory"
13: description="JMS Connection Factory"
14: factory="org.apache.activemq.jndi.JNDIReferenceFactory"
15: brokerURL="tcp://localhost:61616" />
16:
17: <!--
18: This is a reference to the first Queue we defined
19: earlier in the ActiveMQ admin console
20: -->
21: <Resource name="jms/testQueueOne"
22: auth="Container"
23: type="org.apache.activemq.command.ActiveMQQueue"
24: factory="org.apache.activemq.jndi.JNDIReferenceFactory"
25: physicalName="TestQueueOne"/>
26:
27: <!--
28: This is a reference to the second Queue we defined
29: earlier in the ActiveMQ admin console
30: -->
31: <Resource name="jms/testQueueTwo"
32: auth="Container"
33: type="org.apache.activemq.command.ActiveMQQueue"
34: factory="org.apache.activemq.jndi.JNDIReferenceFactory"
35: physicalName="TestQueueTwo"/>
36:
37: </Context>
The final part of our application configuration is web.xml. Here we
simply define the context loader listener and point it at the Spring
configuration file we defined earlier. 1: <?xml version="1.0" encoding="UTF-8"?>
2: <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3: xmlns="http://java.sun.com/xml/ns/javaee"
4: xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
5: xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
6: http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
7: id="WebApp_ID"
8: version="2.5">
9:
10: <!--
11: Main configuration file for this Spring web application.
12: -->
13: <context-param>
14: <param-name>contextConfigLocation</param-name>
15: <param-value>
16: /WEB-INF/config/spring-config.xml
17: </param-value>
18: </context-param>
19:
20: <!--
21: Loads the Spring web application context using the config file defined above.
22: -->
23: <listener>
24: <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
25: </listener>
26:
27: </web-app>
Message Listener
Now we're ready to start writing some code. You may be glad to know that
you that don't have to write very much code to consume messages from a
JMS broker. In fact all we need is a class that implements the
MessageListener interface - we override the onMessage method which is
then invoked by the DefaultMessageListenerContainer (defined earlier)
when a message is read from the queue. Our implementation of onMessage
is very straight forward - we simply log the incoming message and then
pass it to the message sender (defined later). The Message Listener
class is defined below.
MessageSender
Now that our application can consume messages lets take a look at how we
push messages onto a queue. We defined a JMSTemplate in the Spring
configuration earlier - this class provides a convenient way of pushing
messages onto a queue and saves us from having to write boiler plate
code for opening and closing connections, handling JMS exceptions etc.
This allows the developer to simply specify a queue and the message
payload that you want to push onto that queue.1:
2: package com.blog.spring.jms;
3:
4: import javax.jms.JMSException;
5: import javax.jms.Message;
6: import javax.jms.MessageListener;
7: import javax.jms.TextMessage;
8: import org.apache.log4j.Logger;
9:
10: /**
11: * Class handles incoming messages
12: *
13: * @see PointOfIssueMessageEvent
14: */
15: public class TestMessageListener implements MessageListener
16: {
17:
18: private TestMessageSender messageSender_i;
19: private static final Logger logger_c = Logger.getLogger(TestMessageListener.class);
20:
21: /**
22: * Method implements JMS onMessage and acts as the entry
23: * point for messages consumed by Springs DefaultMessageListenerContainer.
24: * When DefaultMessageListenerContainer picks a message from the queue it
25: * invokes this method with the message payload.
26: */
27: public void onMessage(Message message)
28: {
29: logger_c.debug("Received message from queue [" + message +"]");
30:
31: /* The message must be of type TextMessage */
32: if (message instanceof TextMessage)
33: {
34: try
35: {
36: String msgText = ((TextMessage) message).getText();
37: logger_c.debug("About to process message: " + msgText);
38:
39: /* call message sender to put message onto second queue */
40: messageSender_i.sendMessage(msgText);
41:
42: }
43: catch (JMSException jmsEx_p)
44: {
45: String errMsg = "An error occurred extracting message";
46: logger_c.error(errMsg, jmsEx_p);
47: }
48: }
49: else
50: {
51: String errMsg = "Message is not of expected type TextMessage";
52: logger_c.error(errMsg);
53: throw new RuntimeException(errMsg);
54: }
55: }
56:
57: /**
58: * Sets the message sender.
59: *
60: * @param messageSender_p the new message sender
61: */
62: public void setTestMessageSender(TestMessageSender messageSender_p)
63: {
64: this.messageSender_i = messageSender_p;
65: }
66: }
67:
MessageSender
1: package com.blog.spring.jms;
2:
3: import javax.jms.JMSException;
4: import javax.jms.Queue;
5: import org.apache.log4j.Logger;
6: import org.springframework.jms.core.JmsTemplate;
7: import org.springframework.stereotype.Service;
8:
9: /**
10: * The TestMessageSender class uses the injected JMSTemplate to send a message
11: * to a specified Queue. In our case we're sending messages to 'TestQueueTwo'
12: */
13: @Service
14: public class TestMessageSender
15: {
16: private JmsTemplate jmsTemplate_i;
17: private Queue testQueue_i;
18: private static final Logger logger_c = Logger .getLogger(TestMessageSender.class);
19:
20: /**
21: * Sends message using JMS Template.
22: *
23: * @param message_p the message_p
24: * @throws JMSException the jMS exception
25: */
26: public void sendMessage(String message_p) throws JMSException
27: {
28: logger_c.debug("About to put message on queue. Queue[" + testQueue_i.toString() + "] Message[" + message_p + "]");
29: jmsTemplate_i.convertAndSend(testQueue_i, message_p);
30: }
31:
32: /**
33: * Sets the jms template.
34: *
35: * @param template the jms template
36: */
37: public void setJmsTemplate(JmsTemplate tmpl)
38: {
39: this.jmsTemplate_i = tmpl;
40: }
41:
42: /**
43: * Sets the test queue.
44: *
45: * @param queue the new test queue
46: */
47: public void setTestQueue(Queue queue)
48: {
49: this.testQueue_i = queue;
50: }
51: }
Testing the Application
Now that the application is complete lets test it. Using the ActiveMQ admin console we'll put a message onto TestQueueOne. Conveniently you can tell the broker to submit the message as may times as you like. We'll ask the broker to submit our message 100 times so that we have time to see it get picked up and processed in the eclipse console. When we put a message onto TestQueueOne we should see the following
- DefaultMessageListenerContainer will pick up the next message from TestQueuOne and invoke the onMessage method in our Message listener class with the message payload.
- onMessage will log the message payload and then call the TestMessageSender sendMessage with the message we've just received.
- TestMessageSender will use the JMSTemplate to put the message onto TestQueueTwo.
- In the ActiveMQ admin console go to the TestQueueOne definition and click the SendTo link. Add a message and set the 'Number of Messages to Send' to 100. This will ensure that broker puts the message onto the queue 100 times.
Figure 5.0 Send JMS Message |
- Click Send. 100 messages will be put onto the queue and you should see the application log the consumption of each message as shown in the log extract below.
DEBUG: [Sep-16 22:59:29,911] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 96, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:92, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725510, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@855da06, marshalledProperties = org.apache.activemq.util.ByteSequence@3c5cc430, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]
DEBUG: [Sep-16 22:59:29,911] spring.jms.TestMessageListener - About to process message: Test Message!!
DEBUG: [Sep-16 22:59:29,911] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]
DEBUG: [Sep-16 22:59:29,947] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 97, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:93, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725510, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6e544a45, marshalledProperties = org.apache.activemq.util.ByteSequence@5fd83099, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]
DEBUG: [Sep-16 22:59:29,947] spring.jms.TestMessageListener - About to process message: Test Message!!
DEBUG: [Sep-16 22:59:29,948] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]
DEBUG: [Sep-16 22:59:29,986] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 98, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:94, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725510, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@6a5ebdf7, marshalledProperties = org.apache.activemq.util.ByteSequence@7209d9af, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]
DEBUG: [Sep-16 22:59:29,986] spring.jms.TestMessageListener - About to process message: Test Message!!
DEBUG: [Sep-16 22:59:29,986] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]
DEBUG: [Sep-16 22:59:30,022] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 99, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:95, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725483, arrival = 0, brokerInTime = 1347832725511, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@64b2aaa6, marshalledProperties = org.apache.activemq.util.ByteSequence@de1abf0, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]
DEBUG: [Sep-16 22:59:30,022] spring.jms.TestMessageListener - About to process message: Test Message!!
DEBUG: [Sep-16 22:59:30,022] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]
DEBUG: [Sep-16 22:59:30,052] spring.jms.TestMessageListener - Received message from queue [ActiveMQTextMessage {commandId = 100, responseRequired = false, messageId = ID:motown-58007-1347813056932-3:4:1:1:96, originalDestination = null, originalTransactionId = null, producerId = ID:motown-58007-1347813056932-3:4:1:1, destination = queue://TestQueueOne, transactionId = null, expiration = 0, timestamp = 1347832725484, arrival = 0, brokerInTime = 1347832725511, brokerOutTime = 1347832766192, correlationId = , replyTo = null, persistent = false, type = , priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@5adf20ae, marshalledProperties = org.apache.activemq.util.ByteSequence@6edaae1d, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = null}]
DEBUG: [Sep-16 22:59:30,053] spring.jms.TestMessageListener - About to process message: Test Message!!
DEBUG: [Sep-16 22:59:30,053] spring.jms.TestMessageSender - About to put message on queue. Queue[queue://TestQueueTwo] Message[Test Message!!]
- Now take a look at the number of messages on TestQueueTwo - it should now have 100 pending messages. These are the 100 messages that were consumed from TestQueueOne and pushed onto TestQueueTwo by our application. See screenshot below.
Figure 6.0 Messages pushed to TestQueueTwo |
No comments:
Post a Comment