JMS Synchronous Invocations : Dual Channel HTTP-to-JMS - WSO2 ESB + WSO2 Message Broker

What is JMS  Synchronous Invocation


The classic way of JMS messaging is one way invocation, which means message producer will send the message and forget about it. On the other side there will be a consumer to consume the message. An external software known as JMS provider (Broker) sits in the middle of message producer and consumer and enable them to communicate without knowing about each other and in an asynchronous way. 

Two way JMS communication enables the producer to communicate not knowing about the state of consumer but expecting a response. 

The endpoint for a JMS client on the message broker which is called a destination can be either a queue or a topic that reside on the JMS broker. When comes to two way JMS communication that is achieved by using a another queue or topic together with the JMSReplyTo header. Before sending the message out, another destination (which is either queue or a topic) is created and set that as the JMSReplyTo header. On the other side the message consumer(or the subscriber) will use JMSReplyTo property to send the response back to the response destination. The message producer will be waiting on the reply destination and will process the message. A natural question that comes is how to coordinate the request and response if we use the same response queue for more than one message, or if we use the response queue for concurrent messages ? In other words how to identify if this response is for that request message that was sent? That is done by using the JMSCorrelationID JMS header. A message consumer or a subscriber will be created using the JMSCorrelationID so that the response can be sent to the destination given by JMSReplyTo. A more sophisticated(select the message based on some properties or based on content etc..) request/reply can be achieved by using the message selector concepts in JMS when creating the JMS consumer.

Scenario Description 


  1. There is an web application which generates messages out of SMS messages sent by clients. 
  2. There is a service which can process the SMS messages automatically. 
  3. SMS reply should be sent back to the client. 
  4. If the SMS processing server is not available at the moment, sent SMS should be preserved and reply should be generated and sent as soon as the server available again. 

Message Flow 


Following article describes how to perform above JMS Two way communication using WSO2 ESB and WSO2 MB

Configuring WSO2 Message Broker


There is nothing much to configure in the broker side. Here I have used WSO2 MB 3.0.0 Milestone 2 release which can be found here As this setup uses a single machine, both ESB and MB servers are stetup together. Thus in order to avoid port conflicts, MB server should be offset, which can be done by editing <MB_HOME>/repository/conf/carbon.xml 

Configuring WSO2 ESB 


Configuring Axis2.xml 

Navigate to <ESB_HOME>/repository/conf/axis2/axis2.xml and enable following parts under transportReceiver section. 
    <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
            <!--<parameter name="myTopicConnectionFactory" locked="false">
               <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter>
            </parameter> -->
    
            <parameter name="myQueueConnectionFactory" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
               <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
            </parameter>
    
            <parameter name="default" locked="false">
                <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
                <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
                <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
                <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
            </parameter>
        </transportReceiver>
    
    
    
Enable JMS Transport Sender. 
 <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>


Configuring JNDI.properties file

Navigate to <ESB_HOME>/repository/conf/jndi.properties file and edit. It should look like below. 

connectionfactory.QueueConnectionFactory =
 amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'

# register some queues in JNDI using the form
# queue.[jndiName] = [physicalName]
queue.SMSStore = SMSStore

# register some topics in JNDI using the form
# topic.[jndiName] = [physicalName]
#topic.MyTopic = example.MyTopic

Note following. 

  • Default port for JMS messaging is 5672. As we have a offset=1 for MB now the port is 5673. 
  • SMSStore is the queue in which published messages are stored. 
  • All topic related params are commented out. 

Copying necessary libraries 

In order to communicate with WSO2 Message Broker, ESB needs following Java libraries those can be found at /client-lib folder. 

Now, as all the necessary bits are in place, both ESB and MB nodes can be started. Make sure there are no error or warning logs mentioned in the logs (or console) during server startup. 

Starting Back-end service 


There should be a service that accepts messages (SMSes) and generate reply messages. For that SimpleStockQuoteService which comes with ESB can be used. 


  • Navigate to /samples/axis2Server/src and execute "ant" to build the service. you should have ant installed. 
  • Navigate to /samples/axis2Server and execute following command to start the service 
    •  sh axis2server.sh if on Linux
    • axis2server.bat if on Windows
  • To test the service you can use SOAPUI with the  WSDL url at http://localhost:9000/services/SimpleStockQuoteService?wsdl


Creating proxy services at ESB 

Now necessary proxy services can be created at ESB to perform messaging. First, a proxy should be defined to route an incoming HTTP message to SMSStore queue at WSO2 MB (HTTP >> JMS). Let us call this SMSSenderProxy. Note that we DO NOT define "/>  property here because this not a fire-and-forget messaging scenario. Also we set SMSReceiveNotificationStore  as the queue to which reply messages for sent messages are expected to be. 

 <?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="SMSSenderProxy"
       transports="https,http"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <property name="transport.jms.ContentTypeProperty"
                   value="Content-Type"
                   scope="axis2"/>
      </inSequence>
      <outSequence>
         <property name="TRANSPORT_HEADERS" scope="axis2" action="remove"/>
         <send/>
      </outSequence>
      <endpoint>
         <address uri="jms:/SMSStore?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&amp;java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&amp;java.naming.provider.url=repository/conf/jndi.properties&amp;transport.jms.DestinationType=queue&amp;transport.jms.ReplyDestination=SMSReceiveNotificationStore"/>
      </endpoint>
   </target>
   <description/>
</proxy>

After a message is sent to SMSStore queue, that message should be picked up and sent to the back-end service to process. For that a JMS listener should be configured. ESB can itself act as a JMS listener. Following proxy listens to JMS messages comes to SMSStore and forward them to SimpleStockQuoteService. Let us call it SMSForwardProxy. 

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="SMSForwardProxy"
       transports="jms"
       statistics="disable"
       trace="disable"
       startOnLoad="true">
   <target>
      <inSequence>
         <send>
            <endpoint>
               <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
            </endpoint>
         </send>
      </inSequence>
      <outSequence>
         <send/>
      </outSequence>
   </target>
   <parameter name="transport.jms.ContentType">
      <rules>
         <jmsProperty>contentType</jmsProperty>
         <default>text/xml</default>
      </rules>
   </parameter>
   <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
   <parameter name="transport.jms.DestinationType">queue</parameter>
   <parameter name="transport.jms.Destination">SMSStore</parameter>
   <description/>
</proxy>
                                

Testing the Scenario 


  • Launch SOAP UI. Create a project pointing the WSDL exposed by SMSSenderProxy (http://localhost:8280/services/SMSSenderProxy?wsdl)
  • Set following payload as the message to be sent. This is a typical payload a SMS has in this scenario. 
  •    
    <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
        <soapenv:Header/>
        <soapenv:Body>
           <ser:getQuote>
              <!--Optional:-->
              <ser:request>
                 <!--Optional:-->
                 <xsd:symbol>IBM</xsd:symbol>
              </ser:request>
           </ser:getQuote>
        </soapenv:Body>
     </soapenv:Envelope>
    
    
    
  • As soon as the message is sent, you will see following lines at WSO2 Message Broker console. This means, a message listener is registered to the ReplyTo destination specified and ESB waits until a message is received there. 



  • BackEnd service will process the message and send the reply to ESB. You should see following line at Axis2 Server console. 




  • A reply SMS for the sent SMS is received as below to the SOAPUI. 


  • Now shut down SimpleStockQuoteService service by shutting down the Axis2 server and send a message in the same way above. Now, the message is sent to the SMSStore queue by SMSSender proxy. But the SMS processing server is unavailable. Thus no reply would come to SMSReceiveNotificationStore queue. The internal listener will listen for sometime and it will be closed as no message came, and the client will timeout waiting for the reply. 
  • This sample SMSForward proxy can be enhanced using "transactions" so that, if the message could not be forwarded, it will not be removed from the queue. It will retry time to time until Back-End service becomes live. In such a scenario, you would see a state SMSStore has messages which are not processed still. 



Hasitha Hiranya

2 comments:

  1. Hi Hasitha.
    This scenario work OK with ActiveMQ using WSO2 EI 6.1.0 but if I use the internal MB I get a delay in the response over the 5s...

    Check this logs:


    Log with MB:
    [2017-08-07 14:10:14,546] [] INFO - LogMediator ACCION: = Accediendo a inSequence de SenderProxyMB
    [2017-08-07 14:10:17,020] [] INFO - LogMediator ACCION: = Accediendo a inSequence del Servicio FordwarProxyMB
    [2017-08-07 14:10:17,028] [] INFO - LogMediator ACCION: = Accediendo a outSequence de FordwarProxyMB
    [2017-08-07 14:10:19,961] [] INFO - LogMediator ACCION: = Accediendo a outSequence de SenderProxyMB

    Log with ActiveMQ:
    [2017-08-07 14:10:34,715] [] INFO - LogMediator ACCION: = Accediendo a inSequence de SenderProxy
    [2017-08-07 14:10:35,026] [] INFO - LogMediator ACCION: = Accediendo a inSequence del Servicio FordwarProxy
    [2017-08-07 14:10:35,116] [] INFO - LogMediator ACCION: = Accediendo a outSequence de FordwarProxy
    [2017-08-07 14:10:35,395] [] INFO - LogMediator ACCION: = Accediendo a outSequence de SenderProxy

    Any idea why?

    Regards.

    ReplyDelete

Instagram