Saturday, March 12, 2011

Spring Integration 2: Integrating RabbitMQ and Web Service

In this tutorial we'll integrate RabbitMQ with an existing Spring Integration project. Our purpose is to replace the database with a real message broker. We'll retain the same pattern we employed in the current project to show how easy it is to switch from database to a message broker. To fully appreciate this tutorial, please read first Spring Integration: Integrating JDBC and WS System for a good comparison.

What is Spring Integration?
Spring Integration provides an extension of the Spring programming model to support the well-known Enterprise Integration Patterns. It enables lightweight messaging within Spring-based applications and supports integration with external systems via declarative adapters. Those adapters provide a higher-level of abstraction over Spring's support for remoting, messaging, and scheduling. Spring Integration's primary goal is to provide a simple model for building enterprise integration solutions while maintaining the separation of concerns that is essential for producing maintainable, testable code.

Source: Spring Integration 2 Reference

What is RabbitMQ?
RabbitMQ is based on a proven platform and offers a reliable, highly available, scalable and portable messaging system with predictable and consistent throughput and latency.

RabbitMQ is 100% open source and 100% based on open standard protocols freeing users from dependency on proprietary vendor-supplied libraries.

RabbitMQ is designed from the ground up to interoperate with other messaging systems: it is the leading implementation of AMQP, the open standard for business messaging, and, through adapters, supports XMPP, SMTP, STOMP and HTTP for lightweight web messaging.

Source: http://www.rabbitmq.com

Background

Scenario

A mobile application sends reports as text messages. A receiver application processes these messages and delegates further processing to a publisher. The publisher's responsibility is to publish these messages to a message broker, RabbitMQ.

Purpose

Our job is to develop an integration backend that handles messages and forwards them to a web service.

The whole system is composed of three major systems.
1. A mobile reporting system (3rd party)
2. Integration backend (our system)
3. Web service (3rd party)

The Client

Our client is a small-time computer parts reseller. The reports they send everyday are composed of Sales, Inventory, and Order. To make their data manageable they've decided to enroll in our system. The reports they send contains the following details:

Report Type Details
Sales Branch name, Amount, Remarks
Inventory Branch name, Product name, Beginning inventory, Ending inventory
Order Branch name, Product name, Quantity

Mobile Reporting System

The mobile reporting system is composed of a mobile application (based on J2ME, Android, and Symbian) for sending Sales, Order, and Inventory reports. And a mobile backend that processes reports by storing them as text messages in a MySQL database.

Messages sent by the mobile application follows the following format:
{Unique id};{Branch name};{Keyword};{Data1};{Data2};{Data3…}

Example of this format are the following:
1234567;Branch A;SALES;3000.50;Pending approval
1234568;Branch A;INVENTORY;Printer;30;30
1234569;Branch B;ORDER;Keyboard;50
These are plain Strings where the meaning depends on the client's requirements. It's up to the developer to interpret these data. Remember we have no control over the mobile reporting system. All we have is an access to the message broker which runs under RabbitMQ.

Using RabbitMQ's management panel, we can see how many messages are in the queue:

For more info on RabbitMQ's management panel, please visit the RabbitMQ management plugin page.

Integration Backend

Currently the integration backend is made up normal Spring beans. The system works, but our challenge is to make it simpler by using Spring Integration. As mentioned earlier the integration backend's purpose is to process messages from the message broker and submit the results to a web service.

Web Service

The web service is an external service provided by a third party company. All we know about it is its WSDL, serving as the contract of the service. Similar with the mobile reporting system, we have no control over the web service implementation.

What's included?

In order to fully test-drive this tutorial, we need all three systems. Providing just the integration backend would be difficult to test the application. Consequently, this tutorial includes the following applications:

1. RabbitMQ producer
2. The integration backend (the main content of this tutorial)
3. A web service provider application based on Spring WS 2.

Development

After a brief review of the systems' background, we know begin the development of the application.

The Big Picture

Spring Integration is all about patterns and POJOs (Plain Old Java Objects). So it's important that we see the big picture first before dealing with the details. Below is a graph of the whole integration backend system. The graph is generated using the free SpringSource Tool Suite.


At first glance the system looks complicated. But actually it's quite straightforward. Take a moment to study the graph. Pay attention to where the arrows are pointing at.

Simpler Graphs

To facilitate the understanding of the graph, I've dissected it to simpler ones.

The graph below shows that the data's entry point is via the amqpInbound adapter. A Service Activator handles the messages and converts them to an ArrayList. Then it passes to chain1. This chain contains a message splitter and a filter for sifting out invalid messages. After chain1, data is passed to chain2. This chain contains a header enricher, a content transformer, and a router which routes messages to appropriate channels.


The graph below is the next step in the flow. The router decides where to route the data next. It basically decides which channel should it send the message it contains. Notice we have four channels here:
rejectedMessageChannel
salesChannel
inventoryChannel
orderChannel

After the data has been routed, each valid messages gets transformed to a Java object. Then it's passed to an aggregator. The aggregator's job is to collect all these messages. Once it has collected them, it sends the message to a web service outbound adapter.

The whole system can be summarized as follows:
AMQP adapter >> Message Handler >> Splitter >> Filter >> Transformer >> Router >> Transformer >> Aggregator >> WS adapter

The Finer Details

After a brief overview of the big picture, let's now study the system in-depth.

The AMQP Inbound Adapter


The inbound AMQP adapter's purpose is to listen for messages pushed by RabbitMQ, our message broker. Incoming messages are passed to the amqpChannel as a byte array.
<amqp:inbound-channel-adapter channel="amqpInboundChannel"
queue-name="mobile.queue" connection-factory="rabbitConnectionFactory" />
Here we've declared a queue name mobile.queue. This name is based on the queue name used by our RabbitMQ producer. We've also declared a connection factory rabbitConnectionFactory which we declared in the rabbit-context.xml file.

rabbit-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
 xmlns:p="http://www.springframework.org/schema/p" 
 xsi:schemaLocation="http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
        
        <!-- Provides connection to the RabbitMQ broker -->
 <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"
  p:username="guest" p:password="guest" p:port="5672">
  <constructor-arg value="localhost" />
 </bean>

 <!-- A template for sending messages and performing other commands to RabbitMQ -->
 <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"
  p:connectionFactory-ref="rabbitConnectionFactory" />
 
 <!-- This helps in configuring the AMQP broker, like creating a new queue -->
 <bean id="amqpAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
  <constructor-arg ref="rabbitConnectionFactory"/>
 </bean>

</beans>

What is RabbitTemplate?
As with many other high-level abstractions provided by the Spring Framework and related projects, Spring AMQP provides a "template" that plays a central role. The interface that defines the main operations is called AmqpTemplate

The implementations of that protocol provide their own client libraries, so each implementation of the template interface will depend on a particular client library. Currently, there is only a single implementation: RabbitTemplate.

Source: Spring AMQP - Reference Documentation

Let's run our application and enable DEBUG logging to see what the amqp:inbound-channel-adapter does behind the scenes:
WAIT FOR MESSAGE
[10:56:18] (SimpleMessageListenerContainer.java:receiveAndExecute:377) Waiting for message from consumer.
[10:56:18] (BlockingQueueConsumer.java:nextMessage:131) Retrieving delivery for Consumer: tag=[amq.ctag-UvMqIuYyFN5Flxs6sFip2A==], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@localhost:5672/,1), acknowledgeMode=AUTO local queue size=0

WAIT FOR MESSAGE
[10:56:19] (SimpleMessageListenerContainer.java:receiveAndExecute:377) Waiting for message from consumer.
[10:56:19] (BlockingQueueConsumer.java:nextMessage:131) Retrieving delivery for Consumer: tag=[amq.ctag-UvMqIuYyFN5Flxs6sFip2A==], channel=Cached Rabbit Channel: 
AMQChannel(amqp://guest@localhost:5672/,1), acknowledgeMode=AUTO local queue size=0

WAIT FOR MESSAGE
[10:56:20] (SimpleMessageListenerContainer.java:receiveAndExecute:377) Waiting for message from consumer.
[10:56:20] (BlockingQueueConsumer.java:nextMessage:131) Retrieving delivery for Consumer: tag=[amq.ctag-UvMqIuYyFN5Flxs6sFip2A==], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@localhost:5672/,1), acknowledgeMode=AUTO local queue size=0
Notice how the inbound AMQP adapter waits for incoming messages.

When the are messages to be processed, here's what we see instead:
AN AMQP CONNECTION RECEIVES A MESSAGE
[AMQP Connection localhost:5672 10:56:59] (BlockingQueueConsumer.java:handleDelivery:185) Storing delivery for Consumer: tag=[amq.ctag-UvMqIuYyFN5Flxs6sFip2A==], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@localhost:5672/,1), acknowledgeMode=AUTO local queue size=0

PRINT OUT THE MESSAGE
[10:56:59] (BlockingQueueConsumer.java:handle:104) Received message: (Body:'1234567;Branch A;SALES;3000.50;Pending approval'; ID:null; Content:text/plain; Headers:{keyword=SALES}; Exchange:; RoutingKey:mobile.queue; Reply:null; DeliveryMode:PERSISTENT; DeliveryTag:1)

SEND TO amqpInboundChannel
[10:56:59] (AbstractMessageChannel.java:preSend:224) preSend on channel 'amqpInboundChannel', message: [Payload=[B@1caefb0][Headers={timestamp=1299855419548, id=cc375bd3-5c17-4b0a-8c4b-0325c8ecdd27, keyword=SALES}]

The Message Handler (Service Activator)

All messages passed to the amqpChannel are in the form of byte array. To process this type we use a Service Activator that accepts a byte array. This service is a simple POJO that returns an ArrayList

MessageHandler.java
package org.krams.tutorial.si;

import java.util.ArrayList;

import org.apache.log4j.Logger;

/**
 *  Accepts a byte array and converts it as an ArrayList
 *  
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public class MessageHandler {

 protected Logger logger = Logger.getLogger("integration");
  
 public ArrayList<String> handleMessage(byte[] data) {
  logger.debug("Received: " + new String(data));
  
  ArrayList<String> arrayList = new ArrayList<String>();
  arrayList.add(new String(data));
  
  return arrayList;
 }
 
}
This POJO accept a byte array and returns an ArrayList. The reason it returns an ArrayList is because the next object in the graph, the Splitter, expects an ArrayList.

Enabling DEBUG logging shows the following output:
SERVICE ACTIVATOR RECEIVES THE MESSAGE
[10:56:59] (AbstractMessageHandler.java:handleMessage:72) ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@15c458c] received message: [Payload=[B@1caefb0][Headers={timestamp=1299855419548, id=cc375bd3-5c17-4b0a-8c4b-0325c8ecdd27, keyword=SALES}]

PRINT OUT THE DATA
[10:56:59] (MessageHandler.java:handleMessage:17) Received: 1234567;Branch A;SALES;3000.50;Pending approval

SEND TO THE NEXT CHANNEL
[10:56:59] (AbstractReplyProducingMessageHandler.java:sendReplyMessage:157) handler 'ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@15c458c]' sending reply Message: [Payload=[1234567;Branch A;SALES;3000.50;Pending approval]][Headers={timestamp=1299855419562, id=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, keyword=SALES}]

The Splitter

The Splitter's purpose is to split a list of messages returned by the Message Handler. We need to split the messages so that we can process each message individually.Remember each message corresponds to a particular record type.
<chain input-channel="messageChannel" output-channel="filteredChannel">
  <splitter ref="splitterBean"/>
  ...
</chain>

<beans:bean id="splitterBean" class="org.krams.tutorial.si.MessageSplitter" />
We're referencing splitterBean that points to a custom MessageSplitter class:

MessageSplitter.java
package org.krams.tutorial.si;

import java.util.ArrayList;
import org.apache.log4j.Logger;
import org.springframework.integration.Message;
import org.springframework.integration.splitter.AbstractMessageSplitter;

/**
 *  Retrieves the message payload and returns it an array of messages. 
 *  
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public class MessageSplitter extends AbstractMessageSplitter {

 protected static Logger logger = Logger.getLogger("integration");

 @Override
 protected ArrayList<?> splitMessage(Message<?> message) {
  
  ArrayList<?> messages = (ArrayList<?>) message.getPayload();
  
  logger.debug("Total messages: " + messages.size());
  for (Object mess: messages) {
   logger.debug(mess.toString());
  }
  
  return messages;
 }

}
Notice this Splitter implementation extends from a base class AbstractMessageSplitter for providing standard splitting behavior.

What is AbstractMessageSplitter?
The API for performing splitting consists of one base class, AbstractMessageSplitter, which is a MessageHandler implementation, encapsulating features which are common to splitters, such as filling in the appropriate message headers CORRELATION_ID, SEQUENCE_SIZE, and SEQUENCE_NUMBER on the messages that are produced. This enables tracking down the messages and the results of their processing (in a typical scenario, these headers would be copied over to the messages that are produced by the various transforming endpoints), and use them, for example, in a Composed Message Processor scenario.

Source: Spring Integration 2 - 5.3 Splitter

Enabling DEBUG logging shows the following output:
SPLITTER RECEIVES THE MESSAGE
[10:56:59] (AbstractMessageHandler.java:handleMessage:72) splitterBean received message: [Payload=[1234567;Branch A;SALES;3000.50;Pending approval]][Headers={timestamp=1299855419562, id=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, keyword=SALES}]

PRINT OUT TOTAL NUMBER OF MESSAGES
[10:56:59] (MessageSplitter.java:splitMessage:23) Total messages: 1

PRINT OUT DATA
[10:56:59] (MessageSplitter.java:splitMessage:25) 1234567;Branch A;SALES;3000.50;Pending approval

SEND TO THE NEXT CHANNEL
[10:56:59] (AbstractReplyProducingMessageHandler.java:sendReplyMessage:157) handler 'splitterBean' sending reply Message: [Payload=1234567;Branch A;SALES;3000.50;Pending approval][Headers={timestamp=1299855419566, id=1c34e950-7276-4c04-b407-946dfe8565c5, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, sequenceSize=1, keyword=SALES, sequenceNumber=1}]

The Filter

The Filter's purpose is to sift out invalid messages. These are messages that have invalid keywords. Remember valid keywords are SALES, INVENTORY, and ORDER. Valid messages will be sent to the next flow with a header property valid = true. Invalid ones will be redirected to a discard channel with a header property valid = false.
<chain input-channel="messageChannel" output-channel="filteredChannel">
  ...
  <filter ref="filterBean" method="filter"  discard-channel="rejectedMessagesChannel" />
</chain>

<beans:bean id="filterBean" class="org.krams.tutorial.si.ProductFilter" />

ProductFilter.java
package org.krams.tutorial.si;

import org.apache.log4j.Logger;
import org.springframework.integration.Message;

/**
 *  Filters messages based on their keyword. If an item is invalid,
 *  it gets dropped from the normal process.
 *  <p>
 *  Valid keywords are SALES, INVENTORY, ORDER
 *  
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public class ProductFilter {

 protected static Logger logger = Logger.getLogger("integration");
 
 public Boolean filter(Message<?> content) {
  logger.debug(content);
  
  if (content.getHeaders().get("keyword").toString().equalsIgnoreCase(ApplicationConstants.TYPE_SALES)) {
         return true;
        } 
  
  if (content.getHeaders().get("keyword").toString().equalsIgnoreCase(ApplicationConstants.TYPE_INVENTORY)) {
   return true;
        } 
  
  if (content.getHeaders().get("keyword").toString().equalsIgnoreCase(ApplicationConstants.TYPE_ORDER)) {
   return true;
        }
  
  logger.debug("Invalid keyword found");
  return false;
 }
}

Enabling DEBUG logging shows the following output:
FILTER RECEIVES THE MESSAGE
[10:56:59] (AbstractMessageHandler.java:handleMessage:72) org.springframework.integration.filter.MessageFilter@1d4340c received message: [Payload=1234567;Branch A;SALES;3000.50;Pending approval][Headers={timestamp=1299855419566, id=1c34e950-7276-4c04-b407-946dfe8565c5, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, sequenceSize=1, keyword=SALES, sequenceNumber=1}]

PRINT OUT THE MESSAGE
[10:56:59] (ProductFilter.java:filter:19) [Payload=1234567;Branch A;SALES;3000.50;Pending approval][Headers={timestamp=1299855419566, id=1c34e950-7276-4c04-b407-946dfe8565c5, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, sequenceSize=1, keyword=SALES, sequenceNumber=1}]

SEND TO THE NEXT CHANNEL
[10:56:59] (AbstractReplyProducingMessageHandler.java:sendReplyMessage:157) handler 'org.springframework.integration.filter.MessageFilter@1d4340c' sending reply Message: [Payload=1234567;Branch A;SALES;3000.50;Pending approval][Headers={timestamp=1299855419566, id=1c34e950-7276-4c04-b407-946dfe8565c5, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, sequenceSize=1, keyword=SALES, sequenceNumber=1}]

The Content Transformer

The Transformer's purpose is to retrieve the value of "content" key from the Message object. The content value is then converted to a String array. For this application we have two types of transformer: a ContentTransformer and Mapper. We'll discuss first the ContentTransformer
<chain input-channel="filteredChannel">
  ...
  <transformer ref="contentTransformerBean" method="transform"/>
  ...
</chain>

<beans:bean id="contentTransformerBean" class="org.krams.tutorial.si.ContentTransformer" />

ContentTransformer.java
package org.krams.tutorial.si;

import org.apache.log4j.Logger;

/**
 *  Retrieves the <b>content</b> entry and returns it as an array of Strings. 
 *  This assumes the content is delimited by semicolon.
 *  
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public class ContentTransformer {

 protected static Logger logger = Logger.getLogger("integration");
 private static final String DELIMITER = ";";
 
 public String[] transform(String content) {
  logger.debug("Original data: " + content);
  
  String[] contentArray = content.split(DELIMITER);
  
  logger.debug("Tranformed data: " + contentArray);
  return contentArray;
 }
}

Enabling DEBUG logging shows the following output:
PRINT OUT ORIGINAL DATA
[10:56:59] (ContentTransformer.java:transform:17) Original data: 1234567;Branch A;SALES;3000.50;Pending approval

PRINT OUT TRANSFORMED DATA
[10:56:59] (ContentTransformer.java:transform:21) Tranformed data: [Ljava.lang.String;@e5f0d2

The Router

The Router's purpose is to redirect each String array to a specific channel. Redirection is based on the keyword found on the String array. For example, a String array containing the SALES keyword will be sent to the salesChannel.
<chain input-channel="filteredChannel">
  ...
  <router ref="productRouterBean" method="route" />
</chain>

<beans:bean id="productRouterBean" class="org.krams.tutorial.si.ProductRouter" />

ProductRouter.java
package org.krams.tutorial.si;

import org.apache.log4j.Logger;
import org.springframework.integration.Message;

/**
 *  Routes messages based on their keyword. Invalid entries
 *  are routed to unknownChannel
 *  
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public class ProductRouter {

 protected static Logger logger = Logger.getLogger("integration");
 
 public String route(Message<?> content) {
  
  if (content.getHeaders().get("keyword").toString().equalsIgnoreCase(ApplicationConstants.TYPE_SALES)) {
      logger.debug("Routing to salesChannel");
         return "salesChannel";
         
        } else if (content.getHeaders().get("keyword").toString().equalsIgnoreCase(ApplicationConstants.TYPE_INVENTORY)) {
         logger.debug("Routing to inventoryChannel");
         return "inventoryChannel";
         
        } else if (content.getHeaders().get("keyword").toString().equalsIgnoreCase(ApplicationConstants.TYPE_ORDER)) {
         logger.debug("Routing to orderChannel");
         return "orderChannel";
         
        } else  {
         logger.debug("Routing to unknownChannel");
         return "unknownChannel";
        } 
    }
}

Enabling DEBUG logging shows the following output:
ROUTER RECEIVES MESSAGE
[10:56:59] (AbstractMessageHandler.java:handleMessage:72) org.springframework.integration.router.MethodInvokingRouter@18eabf6 received message: [Payload=[Ljava.lang.String;@e5f0d2][Headers={timestamp=1299855419571, id=24ac9450-65e3-4117-a9a8-6445a8846e2a, valid=true, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, sequenceSize=1, keyword=SALES, sequenceNumber=1}]

ROUTE TO APPROPRIATE CHANNEL
[10:56:59] (ProductRouter.java:route:19) Routing to salesChannel

SEND TO THE NEXT CHANNEL
[10:56:59] (AbstractMessageChannel.java:preSend:224) preSend on channel 'salesChannel', message: [Payload=[Ljava.lang.String;@e5f0d2][Headers={timestamp=1299855419571, id=24ac9450-65e3-4117-a9a8-6445a8846e2a, valid=true, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, sequenceSize=1, keyword=SALES, sequenceNumber=1

The Mapper Transformers

The second set of Transformers' purpose is to map a String array to a Java object. We've declared an interface IMapper and provided three implementations corresponding to each of the recognized keywords.
<transformer input-channel="salesChannel" ref="salesMapper"
  method="map" output-channel="aggregateChannel" />
<transformer input-channel="inventoryChannel" ref="inventoryMapper"
  method="map" output-channel="aggregateChannel" />
<transformer input-channel="orderChannel" ref="orderMapper"
  method="map" output-channel="aggregateChannel" />

<beans:bean id="salesMapper" class="org.krams.tutorial.si.SalesMapper" />
<beans:bean id="inventoryMapper" class="org.krams.tutorial.si.InventoryMapper" />
<beans:bean id="orderMapper" class="org.krams.tutorial.si.OrderMapper" />

IMapper.java
package org.krams.tutorial.si;

/**
 *  Maps a String array to a Java object
 *  
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public interface IMapper {

 public Object map(String[] content);

}

SalesMapper.java
package org.krams.tutorial.si;

import org.apache.log4j.Logger;
import org.krams.tutorial.oxm.Sales;

/**
 *  Concrete class for mapping sales records
 *  
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public class SalesMapper implements IMapper {

 protected static Logger logger = Logger.getLogger("integration");
 
 public Object map(String[] content) {
  logger.debug("Mapping content: " + content);
  
  Sales sales = new Sales();
  sales.setId(content[0]);
  sales.setBranch(content[1]);
  sales.setKeyword(content[2]);
  sales.setAmount(Double.valueOf(content[3]));
  sales.setRemarks(content[4]);
  
  return sales;
 }
}

Enabling DEBUG logging shows the following output:
TRANSFORMER RECEIVES THE MESSAGE
[10:56:59] (AbstractMessageHandler.java:handleMessage:72) org.springframework.integration.transformer.MessageTransformingHandler@cc5002 received message: [Payload=[Ljava.lang.String;@e5f0d2][Headers={timestamp=1299855419571, id=24ac9450-65e3-4117-a9a8-6445a8846e2a, valid=true, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, sequenceSize=1, keyword=SALES, sequenceNumber=1}]

MAP THE DATA
[10:56:59] (SalesMapper.java:map:16) Mapping content: [Ljava.lang.String;@e5f0d2

SEND TO THE NEXT CHANNEL
[10:56:59] (AbstractMessageChannel.java:preSend:224) preSend on channel 'aggregateChannel', message: [Payload=org.krams.tutorial.oxm.Sales@198ee2f][Headers={timestamp=1299855419575, id=8544b1dd-8265-46ff-964c-0ebde735c508, valid=true, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, sequenceSize=1, keyword=SALES, sequenceNumber=1}]

The Aggregator

The Aggregator's purpose is to collect and combine all the splitted and processed messages. At this point the original messages are now Java objects instead of String arrays. Once all the messages had been collected, the aggregator will create an AddListRequest object and assigning all records to it. This AddListRequest object is the one that we'll send to an external web service.
<aggregator input-channel="aggregateChannel"
  output-channel="wsChannel" ref="productAggregatorBean" method="send"
  release-strategy="productAggregatorBean" release-strategy-method="release"
  correlation-strategy="productAggregatorBean" correlation-strategy-method="correlate"
  send-partial-result-on-expiry="false" />

<beans:bean id="productAggregatorBean" class="org.krams.tutorial.si.ProductAggregator" />

ProductAggregator.java
package org.krams.tutorial.si;

import java.util.List;
import org.apache.log4j.Logger;
import org.krams.tutorial.oxm.AddListRequest;
import org.krams.tutorial.oxm.Inventory;
import org.krams.tutorial.oxm.Order;
import org.krams.tutorial.oxm.Record;
import org.krams.tutorial.oxm.Sales;
import org.springframework.integration.Message;

/**
 *  Consolidates all messages before sending them to the next 
 *  process. In our case, the next process is a web service call.
 *  <p>
 *  This aggregator has the following behavior:
 *  <pre>
 *  1. Correlates messages based on the correlation id.
 *  The correlation id is auto-generated by Spring. 
 *  
 *  2. Messages are released once all the total messages have reached
 *  the total sequence size. The sequence size is  auto-generated by Spring. 
 *  
 *  3. Once released, the messages are added to a Record instance which
 *  is added to an instance of AddListRequest. This AddListRequest is the final
 *  message we send to the web service. JAXB will marshall this object to XML
 *  before sending to an external web service.
 *  </pre>
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public class ProductAggregator {

 protected static Logger logger = Logger.getLogger("integration");
 
 @SuppressWarnings("unchecked")
 public AddListRequest  send(List<Message> reqlist) {
  logger.debug("Total messages to send: " + reqlist.size()); 
  
  AddListRequest request = new AddListRequest();
  
  for (Message mess: reqlist) {
   // Verify first if record is valid. A filter is run for each record
   // Invalid records have the property valid = false
   if ( Boolean.valueOf(mess.getHeaders().get("valid").toString()) ) {
    Record record = new Record();
    
    if ( mess.getHeaders().get("keyword").toString().equalsIgnoreCase(ApplicationConstants.TYPE_SALES)) {
     record.setSales((Sales) mess.getPayload());
     request.getRecord().add(record);
     logger.debug("Added sales");
    }
    
    if ( mess.getHeaders().get("keyword").toString().equalsIgnoreCase(ApplicationConstants.TYPE_INVENTORY)) {
     record.setInventory((Inventory) mess.getPayload());
     request.getRecord().add(record);
     logger.debug("Added inventory");
    }
    
    if ( mess.getHeaders().get("keyword").toString().equalsIgnoreCase(ApplicationConstants.TYPE_ORDER)) {
     record.setOrder((Order) mess.getPayload());
     request.getRecord().add(record);
     logger.debug("Added order");
    }
   }
  }
  
  return request;
 }
 
 public boolean release(List<Message<?>> messages) {
  logger.debug("Message size: " + messages.size());
  logger.debug("Sequence size: " + messages.get(0).getHeaders().getSequenceSize());
  
  if (messages.size() == messages.get(0).getHeaders().getSequenceSize()) {
   logger.debug("Now releasing ...");
   return true;
  }
     logger.debug("Pending release ...");
     return false;
   }
 
 public String correlate(Message<?> message) {
  logger.debug("Correlate by: " + message.getHeaders().getCorrelationId().toString());
  logger.debug("Get keyword header: " + message.getHeaders().get("keyword").toString());
  return message.getHeaders().getCorrelationId().toString();
 }
}

Enabling DEBUG logging shows the following output:
AGGREGATOR RECEIVES THE MESSAGE
[10:56:59] (AbstractMessageHandler.java:handleMessage:72) org.springframework.integration.aggregator.CorrelatingMessageHandler#0 received message: [Payload=org.krams.tutorial.oxm.Sales@198ee2f][Headers={timestamp=1299855419575, id=8544b1dd-8265-46ff-964c-0ebde735c508, valid=true, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, sequenceSize=1, keyword=SALES, sequenceNumber=1}]

FIND KEYWORD HEADER
[10:56:59] (ProductAggregator.java:correlate:84) Get keyword header: SALES

PRINT OUT SEQUENCE SIZE
[10:56:59] (ProductAggregator.java:release:72) Sequence size: 1

RELEASE THE MESSAGES
[10:56:59] (ProductAggregator.java:release:75) Now releasing ...
[10:56:59] (CorrelatingMessageHandler.java:completeGroup:303) Completing group with correlationKey [a9ab2a35-5717-4c6a-8c04-d2efa614c46f]
[10:56:59] (ProductAggregator.java:send:37) Total messages to send: 1
[10:56:59] (ProductAggregator.java:send:50) Added sales

SEND TO THE NEXT CHANNEL
[10:56:59] (AbstractMessageChannel.java:preSend:224) preSend on channel 'wsChannel', message: [Payload=org.krams.tutorial.oxm.AddListRequest@352d87][Headers={timestamp=1299855419592, id=50b7c116-eaea-4481-9638-bfc18d992e6b, valid=true, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, keyword=SALES}]

The Outbound Web Service Adapter



The outbound WS adapter's purpose is to make a web service call. Before sending the message, the adapter will first marshall the objects using JAXB. After sending, it waits for a reply. The reply is then unmarshalled as an AddListResponse object.
<ws:outbound-gateway id="marshallingGateway"
  request-channel="wsChannel" uri="http://localhost:8080/spring-ws-si/krams/ws"
  marshaller="jaxbMarshaller" unmarshaller="jaxbMarshaller"
  reply-channel="replyChannel" />

Enabling DEBUG logging shows the following output:
CREATE HTTP CONNECTION
[10:56:59] (WebServiceAccessor.java:createConnection:110) Opening [org.springframework.ws.transport.http.HttpUrlConnection@77a748] to [http://localhost:8080/spring-ws-si/krams/ws]

SEND THE WEB SERVICE REQUEST
[10:56:59] (WebServiceTemplate.java:sendRequest:584) Sent request [SaajSoapMessage {http://krams915.blogspot.com/ws/schema/oss}addListRequest]

RECEIVED RESPONSE
[10:56:59] (WebServiceTemplate.java:logResponse:642) Received response [SaajSoapMessage {http://krams915.blogspot.com/ws/schema/oss}addListResponse] for request [SaajSoapMessage {http://krams915.blogspot.com/ws/schema/oss}addListRequest]

SEND TO THE NEXT CHANNEL
[10:56:59] (AbstractReplyProducingMessageHandler.java:sendReplyMessage:157) handler 'org.springframework.integration.ws.MarshallingWebServiceOutboundGateway#0' sending reply Message: [Payload=org.krams.tutorial.oxm.AddListResponse@e7fd03][Headers={timestamp=1299855419970, id=02cbc55c-9607-4d0e-96ae-fc99b746846b, valid=true, correlationId=a9ab2a35-5717-4c6a-8c04-d2efa614c46f, keyword=SALES}]

To verify if the integration backend has really sent the message, let's access our web service's front-end by visiting the following URL:
http://localhost:8080/spring-ws-si/krams/main/records

Here's a screenshot of the results:

It's a success! We've managed to submit the messages using Spring Integration.

Remarks about the Web Service
In order for the web service to be successful, you need to declare a JAXB Marshaller as follows:
<bean id="jaxbMarshaller" class="org.springframework.oxm.jaxb.Jaxb2Marshaller" 
  p:contextPath="org.krams.tutorial.oxm" />

Also, you need to generate the Java proxy classes (or JAX-WS portable artifacts) based on the web service's WSDL. For this tutorial, I've included the artifacts already under the package org.krams.tutorial.oxm


If you need to learn how to generate such artifacts from scratch, please visit the following tutorial: Spring WS - MVC: Implementing a Client Tutorial

The Whole Configuration

Let's post the whole configuration file:

spring-integration-context.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans 
 xmlns="http://www.springframework.org/schema/integration" 
 xmlns:beans="http://www.springframework.org/schema/beans"
 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xmlns:p="http://www.springframework.org/schema/p"
 xmlns:ws="http://www.springframework.org/schema/integration/ws"
 xmlns:stream="http://www.springframework.org/schema/integration/stream"
 xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
 xsi:schemaLocation="http://www.springframework.org/schema/beans 
      http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
   http://www.springframework.org/schema/integration
   http://www.springframework.org/schema/integration/spring-integration-2.0.xsd 
   http://www.springframework.org/schema/integration/ws
            http://www.springframework.org/schema/integration/ws/spring-integration-ws-2.0.xsd              
            http://www.springframework.org/schema/integration/stream
            http://www.springframework.org/schema/integration/stream/spring-integration-stream-2.0.xsd      
            http://www.springframework.org/schema/integration/amqp
            http://www.springframework.org/schema/integration/amqp/spring-integration-amqp-2.0.xsd">

 <beans:bean id="ampqHandler" class="org.krams.tutorial.si.MessageHandler" />
 <beans:bean id="splitterBean" class="org.krams.tutorial.si.MessageSplitter" />
 <beans:bean id="contentTransformerBean" class="org.krams.tutorial.si.ContentTransformer" />
 <beans:bean id="productRouterBean" class="org.krams.tutorial.si.ProductRouter" />
 <beans:bean id="filterBean" class="org.krams.tutorial.si.ProductFilter" />
 <beans:bean id="productAggregatorBean" class="org.krams.tutorial.si.ProductAggregator" />
 <beans:bean id="salesMapper" class="org.krams.tutorial.si.SalesMapper" />
 <beans:bean id="inventoryMapper" class="org.krams.tutorial.si.InventoryMapper" />
 <beans:bean id="orderMapper" class="org.krams.tutorial.si.OrderMapper" />

 <amqp:inbound-channel-adapter channel="amqpInboundChannel"
  queue-name="mobile.queue" connection-factory="rabbitConnectionFactory" />
 
 <channel id="amqpInboundChannel">
  <interceptors>
   <wire-tap channel="logger" />
  </interceptors>
 </channel>

 <service-activator input-channel="amqpInboundChannel"
  ref="ampqHandler" method="handleMessage" output-channel="messageChannel" />
 
 <channel id="messageChannel">
  <interceptors>
   <wire-tap channel="logger" />
  </interceptors>
 </channel>

 <logging-channel-adapter id="logger" level="ERROR" />
 
 <chain input-channel="messageChannel" output-channel="filteredChannel">
  <splitter ref="splitterBean"/>
  <filter ref="filterBean" method="filter"  discard-channel="rejectedMessagesChannel" />
 </chain>

 <publish-subscribe-channel id="rejectedMessagesChannel" />
 
 <header-enricher input-channel="rejectedMessagesChannel" output-channel="aggregateChannel">
  <header name="valid" value="false" />
 </header-enricher>

 <channel id="filteredChannel">
  <queue capacity="10" />
 </channel>

 <chain input-channel="filteredChannel">
  <header-enricher>
   <header name="valid" value="true" />
  </header-enricher>
  <transformer ref="contentTransformerBean" method="transform"/>
  <router ref="productRouterBean" method="route" />
  </chain>

 <channel id="salesChannel">
  <queue capacity="10" />
 </channel>

 <channel id="inventoryChannel">
  <queue capacity="10" />
 </channel>

 <channel id="orderChannel">
  <queue capacity="10" />
 </channel>

 <channel id="unknownChannel">
  <interceptors>
   <wire-tap channel="logger" />
  </interceptors>
 </channel>

 <transformer input-channel="salesChannel" ref="salesMapper"
  method="map" output-channel="aggregateChannel" />
 <transformer input-channel="inventoryChannel" ref="inventoryMapper"
  method="map" output-channel="aggregateChannel" />
 <transformer input-channel="orderChannel" ref="orderMapper"
  method="map" output-channel="aggregateChannel" />

 <channel id="aggregateChannel">
  <queue capacity="10" />
 </channel>

 <aggregator input-channel="aggregateChannel"
  output-channel="wsChannel" ref="productAggregatorBean" method="send"
  release-strategy="productAggregatorBean" release-strategy-method="release"
  correlation-strategy="productAggregatorBean" correlation-strategy-method="correlate"
  send-partial-result-on-expiry="false" />

 <channel id="wsChannel">
  <queue capacity="10" />
 </channel>

 <ws:outbound-gateway id="marshallingGateway"
  request-channel="wsChannel" uri="http://localhost:8080/spring-ws-si/krams/ws"
  marshaller="jaxbMarshaller" unmarshaller="jaxbMarshaller"
  reply-channel="replyChannel" />

 <stream:stdout-channel-adapter id="replyChannel" />

 <poller id="poller" default="true" fixed-rate="1000" />

</beans:beans>

RabbitMQ Producer

After discussing thoroughly the integration backend, we'll take a glimpse of the RabbitMQ producer. This is a simple Java project whose purpose is to publish messages.

Here's our client that publishes messages:

Client.java
package org.krams.tutorial.rabbit;

import org.springframework.amqp.core.MessageProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * Sample client for sending messages to RabbitMQ
 * 
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public class Client {

 public static void main(String[] args) {
  ApplicationContext applicationContext =
            new ClassPathXmlApplicationContext("applicationContext.xml", Client.class);
  
  // Retrieve the MessageSender bean
  MessageSender sender = (MessageSender) applicationContext.getBean("messageSender");
  
  // Create a new MessageProperties
  // Assign custom header and content type
  MessageProperties properties = new MessageProperties();
     properties.setHeader("keyword", "SALES");
     properties.setContentType("text/plain");
     // Send the message
  sender.send("1234567;Branch A;SALES;3000.50;Pending approval", properties);
  
  // Create a new MessageProperties
  // Assign custom header and content type
  properties = new MessageProperties();
     properties.setHeader("keyword", "INVENTORY");
     properties.setContentType("text/plain");
     // Send the message
  sender.send("1234568;Branch A;INVENTORY;Printer;30;10", properties);
  
  // Create a new MessageProperties
  // Assign custom header and content type
  properties = new MessageProperties();
     properties.setHeader("keyword", "ORDER");
     properties.setContentType("text/plain");
     // Send the message
  sender.send("1234569;Branch B;ORDER;Keyboard;50", properties);
 }
}
Notice the actual sending of the message is delegated to an instance of MessageSender which further delegates the actual sending to an instance of RabbitTemplate.

MessageSender.java
package org.krams.tutorial.rabbit;

import javax.annotation.Resource;

import org.apache.log4j.Logger;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;

/**
 * Performs the actual sending of messages
 * 
 *  @author Krams at {@link http://krams915@blogspot.com}
 */
public class MessageSender {

 protected Logger logger = Logger.getLogger("client");
 
    @Resource(name="rabbitTemplate")
    private RabbitTemplate rabbitTemplate;
    
    @Resource(name="amqpAdmin")
    AmqpAdmin amqpAdmin;
    
    public static final String QUEUE_NAME = "mobile.queue";
    
    public void send(String text, MessageProperties properties) {
     // Create a queue
  Queue customQueue = new Queue(QUEUE_NAME);
  customQueue.setDurable(true);
  
  // Assign to broker
  amqpAdmin.declareQueue(customQueue);
  
  // Wrapped our custom text and properties as a Message
     Message message = new Message(text.getBytes(), properties);
     
     // Send the Message object
     // Here we provided a blank exchange: The default exchange
     rabbitTemplate.send("", QUEUE_NAME, message);

     logger.debug("Message sent: " + text);
    }

}

Run the Application


Setup RabbitMQ

Before proceeding with the included applications, make sure you have a running installation of RabbitMQ! For details please visit the RabbitMQ site at http://www.rabbitmq.com/

Run the RabbitMQ producer

Run the client producer to start publishing messages. Our sample application publishes three messages. Feel free to modify it to publish more messages:

When you run the application, it prints out the following log statements:
[10:56:59] (MessageSender.java:send:44) Message sent: 1234567;Branch A;SALES;3000.50;Pending approval
[10:56:59] (MessageSender.java:send:44) Message sent: 1234568;Branch A;INVENTORY;Printer;30;10
[10:56:59] (MessageSender.java:send:44) Message sent: 1234569;Branch B;ORDER;Keyboard;50
These sends three messages to the RabbitMQ.

Run the web service

To run the web service I use a new instance of Tomcat under port 8080. If you prefer to use a different port, make sure to modify the corresponding URI web service entry in the tutorial application.

<ws:outbound-gateway id="marshallingGateway"
  request-channel="wsChannel" uri="http://localhost:8080/spring-ws-si/krams/ws"
  marshaller="jaxbMarshaller" unmarshaller="jaxbMarshaller"
  reply-channel="replyChannel" />

If you need to test the web service, I suggest using soapUI. The URI to the web service is:
http://localhost:8080/spring-ws-si/krams/ws

With soapUI you can send the following XML document to test the service:
<?xml version="1.0" encoding="UTF-8"?>
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:oss="http://krams915.blogspot.com/ws/schema/oss">
   <soapenv:Header/>
   <soapenv:Body>
      <oss:addListRequest>
         <oss:record>
            <oss:sales>
               <oss:id>1234567</oss:id>
               <oss:keyword>SALES</oss:keyword>
               <oss:branch>Branch A</oss:branch>
               <oss:amount>3000.50</oss:amount>
               <oss:remarks>Pending approval</oss:remarks>
            </oss:sales>
         </oss:record>
         <oss:record>
            <oss:inventory>
               <oss:id>1234568</oss:id>
               <oss:keyword>INVENTORY</oss:keyword>
               <oss:branch>Branch A</oss:branch>
               <oss:product>Printer</oss:product>
               <oss:beginning>30</oss:beginning>
               <oss:ending>10</oss:ending>
            </oss:inventory>
         </oss:record>
         <oss:record>
            <oss:order>
               <oss:id>1234569</oss:id>
               <oss:keyword>ORDER</oss:keyword>
               <oss:branch>Branch B</oss:branch>
               <oss:product>Keyboard</oss:product>
               <oss:quantity>50</oss:quantity>
            </oss:order>
         </oss:record>
      </oss:addListRequest>
   </soapenv:Body>
</soapenv:Envelope>
Since this tutorial is about Spring Integration, I will not discuss in detail how we've implemented the web service. Anyway with Spring Integration all that matters is we know the WSDL contract because more than likely we're relying on a third-party web service.

If you need to learn how to create a web service provider and client from scratch, please check the Tutorials section. Or start with the following blog: Spring WS 2 and Spring 3 MVC Integration Tutorial

Run the integration backend

To run the integration backend I use another instance of Tomcat under port 8081. When you run the application, it will run behind the scenes. To see what's happening behind the hood, I suggest to run the application within Eclipse or via SpringSource Tool Suite. Alternatively change the logger properties to change the log directory.

Remarks
Certainly the system can be improved and optimized. Furthermore unit and integration tests are either missing or incomplete. Exception catching is quite at its infancy still. However these are left out for the purpose of simplifying the tutorial.

The integration backend has been setup so that we can easily add Spring MVC support. You might notice some extra beans in the configuration file pertaining to MVC.

Issue with Filters
Also there's an issue about Filter's not modifying the sequence size after an invalid message has been dropped. This issue is only worrisome if our Aggregator's Release Strategy relies on the header's sequence size. Unfortunately, that's exactly our Release Strategy. As a workaround we have to pass the dropped messages to the Aggregator for the purpose of completing the sequence size as suggested by Mark Fisher. For more information, see the following post: Filter: Sequence Size is Not Modified.

Invalid Messages
What happens to invalid messages? Certainly we can't return them to the message broker because we'll just receive it again. Possibly we can log it and store it on an audit table. This isn't difficult to do with Spring Integration. We just use a jdbc outbound adapter. Of course, you have to make sure your audit table exists! For an example of this jdbc outbound adapter, please
see Spring Integration: Integrating JDBC and WS System

Sandbox Warning
Take note the AMQP adapter we used here for Spring Integration is still under sandbox. It means it hasn't been officially released. We had to build it from source. Anyway the project at the end of this tutorial includes all the files you'll need to run the application, including the amqp adapter.

Conclusion

That's it. We've managed to build a simple integration backend using Spring Integration 2. We've successfully integrated messages from a message broker via RabbitMQ and send them to a third-party web service. We're able to retain and reuse our original pattern design without affecting the other components of the system.

Download the project
You can access the project site at Google's Project Hosting at http://code.google.com/p/spring-integration-2-tutorials/

You can download the project as a Maven build. Look for the spring-si-amqp-ws.zip in the Download sections.

For the RabbitMQ producer, click here: spring-rabbitmq-producer.zip
For the web service, click here: spring-ws-si.zip

You can run the project directly using an embedded server via Maven.
For Tomcat: mvn tomcat:run
For Jetty: mvn jetty:run

If you want to learn more about Spring MVC and integration with other technologies, feel free to read my other tutorials in the Tutorials section.
StumpleUpon DiggIt! Del.icio.us Blinklist Yahoo Furl Technorati Simpy Spurl Reddit Google I'm reading: Spring Integration 2: Integrating RabbitMQ and Web Service ~ Twitter FaceBook

Subscribe by reader Subscribe by email Share