Get JMS Local Transactions To Work With WSO2 ESB+WSO2 Message Broker Setup


What are JMS Transactions?


The theories and concepts behind JMS Transactions can be found in my earlier post titled "

JMS TRANSACTIONS, DELIVERY RELIABILITY AND REAL WORLD USE CASE"

This post is like a continuation of that post.


A Real World Scenario 

I would like to highlight the scenario to use JMS transactions described in my previous post above as an entering to this article. 

In real world messaging systems there can be bulk message processing part triggered around time that there is a less number of "user inputs" to the system (maybe at mid-night :D).

  1. Get  user records from database collected within the day.
  2.  Process them chunk by chunk.
  3. Write processed data to a database.
  4. There cannot be message duplication as one record should be processed only once and written to the database. Otherwise it would seem user has requested same thing twice (or many times).
  5. There cannot be a message lost because what you process is the user requests. If a lost happens it is analogous to that user has not made any request. 

In such a situation we need a JMS broker in the middle and also JMS transactions to handle message rollback if the database write or processing failure happened. To make the system more robust we need to use "Broker Failover" allowing the system to use a backup JMS broker if current broker is crashed or unreachable.

We will see how to Setup WSO2 ESB for a situation like above using WSO2 Message Broker as the JMS provider. But following example does not involve all above, rather it introduce the core configurations. 


Setup WSO2 Message Broker

  1. Download WSO2 MB 2.1.0 and extract it to a folder. That folder is referred as <MB_Home>  from here on.
  2. Navigate to <MB_Home&gt/repository/conf folder and open carbon.xml file.
  3. Define an offset 1. (We do this as we hope to start WSO2 ESB on the default port)
        
            <Ports>
            <!-- Ports offset. This entry will set the value of the ports defined below to
             the define value + Offset.
             e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445
             -->
             <Offset>0</Offset>
    
  4. Now start WSO2 MB using wso2server.sh on Linux or wso2server.bat on Windows.


Setup WSO2 ESB

  1. Download WSO2 ESB 4.6.0 from here and extract it to a folder. That folder is referred as <ESB_Home> from here on. 
  2. Naviage to <ESB_Home> /repository/conf/axis2 folder and edit axis2.xml file. 
    • Edit and add JMS Sender parameters
      <transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender">
          <parameter name="default" locked="false">
       <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
       <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
       <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">ConnectionFactory</parameter>
          </parameter>
      </transportSender>
      
    • Edit and add JMS Receiver parameters
      <transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener">
          <parameter name="myQueueConnectionFactory" locked="false">
              <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter>
              <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter>
       <parameter name="transport.jms.SessionTransacted">true</parameter>
              <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter>
              <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter>
          </parameter>
      </transportReceiver>
      
      Note that we have following parameter which says WSO2 ESB to create a "transacted session to the Message Broker".
      <parameter name="transport.jms.SessionTransacted">true</parameter>
      
  3. Navigate to <ESB_Home> /repository/conf/ folfer and edit jndi.properties file. We will point to the MB instance we are running and also define a topic to be used here.
    # register some connection factories
    # connectionfactory.[jndiname] = [ConnectionURL]
    connectionfactory.ConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'
    connectionfactory.QueueConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673'
    
    # register some queues in JNDI using the form
    # queue.[jndiName] = [physicalName]
    queue.myQueue = myQueue
    
    # register some topics in JNDI using the form
    # topic.[jndiName] = [physicalName]
    #topic.MyTopic = example.MyTopic
    
  4. Put client libs needed to connect to WSO2 MB to the folder <ESB_Home>/repository/components/lib folder. Get following two jars from <MB_Home>/client-libs and copy.
    • andes-client-0.13.wso2v7
    • geronimo-jms_1.1_spec-1.1.0.wso2v1
  5. Start WSO2 ESB by navigating to <ESB_Home>/bin and typing wso2server.sh on Linux or wso2server.bat on Windows.
  6. Using ESB management console create a proxy for sending messages to the queue This is called "StockQuoteProxy".
    <proxy name="StockQuoteProxy" transports="http" startOnLoad="true">
    <target>
        <endpoint>
            <address uri="jms:/myQueue?&amp;transport.jms.DestinationType=queue"/>
        </endpoint>
        <inSequence>
            <log level="custom">
                <property name="STATE" value="message is sent to queue"/>
            </log>
            <property name="OUT_ONLY" value="true"/>
            <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
        </inSequence>
        <outSequence/>
    </target>
    </proxy>
    
  7. The next step is to create a listener proxy for the JMS queue we use. As we defined transaction property in axis2.xml file there is no need to define it here in service level. When the message is received we send it to SimpleStockQuoteService service.
    <proxy name="JMSQueueListenerProxy" transports="jms" startOnLoad="true">
    <target>
        <inSequence onError="CustomFaultSequence">
            <property name="OUT_ONLY" value="true"/>
            <log level="custom">
                <property name="Transaction Action" value="Committed"/>
            </log>
            <send>
                <endpoint>
                    <address uri="http://localhost:9000/services/SimpleStockQuoteService" format="pox"/>
                </endpoint>
            </send>
        </inSequence>
        <outSequence/>
    </target>
    <parameter name="transport.jms.ContentType">
        <rules>
            <jmsProperty>contentType</jmsProperty>
            <default>application/xml</default>
        </rules>
    </parameter>
    <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter>
    <parameter name="transport.jms.DestinationType">queue</parameter>
    <parameter name="transport.jms.Destination">myQueue</parameter>
    </proxy>
    

    Now you will see a log getting printed in MB's console indicating a subscription was created. Note that in the inSequence of the listener proxy we have a fault sequence called "CustomFaultSequence". The rationale is if something happens during mediation in inSequence this gets called and there we rollback the transaction. Following is the CustomFaultSequence.
    <sequence xmlns="http://ws.apache.org/ns/synapse" name="CustomFaultSequence">
        <log level="custom" separator=",">
            <property name="text" value="***FAult Handler***"/>
        </log>
        <property name="SET_ROLLBACK_ONLY" value="true" scope="axis2"/>
        <drop/>
    </sequence>
    

    Note the "SET_ROLLBACK_ONLY" property. This is mandatory for transactional rollback.
  8. Now go to <ESB_Home>/samples/axis2Server/src/SimpleStockQuoteService and run ant to build SimpleStockQuoteService service. Then you can start the service by navigating to <ESB_Home>/samples/axis2Server/ and typing axis2server.sh (or axis2server.bat for Windows).
  9. Now our setup is ready.


Test "Commit" Scenario

  1. Using SOAP UI send following message to "StockQuoteProxy". You will see a log saying that message  was sent to the queue in ESB's console.
    <soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd">
       <soapenv:Header/>
       <soapenv:Body>
          <ser:getQuote>
             <!--Optional:-->
             <ser:request>
                <!--Optional:-->
                <xsd:symbol>IBM</xsd:symbol>
             </ser:request>
          </ser:getQuote>
       </soapenv:Body>
    </soapenv:Envelope>
    
  2. As soon as the message is sent, it will be picked up by the listener proxy and forward the message to "SimpleStockQuoteService". If you log into MB's management console and check the message count it will be zero, as the sent message is consumed instantly. A log will get printed at SimpleStockQuoteService console as well.


Test "Rollback" Scenario

  1. In order to test this we have to make a synapse exception in in-sequence and trigger "CustomFaultSequence". For demo purpose we can explicitly call CustomFaultSequence within in-sequence of JMSQueueListenerProxy.
     <sequence key="CustomFaultSequence"/>
    
  2. Now send a message to "StockQuoteProxy" usin SOAP UI as described above.
  3. CustomFaultSequence sequence will get called now and message will be rollbacked. If you check the message count of the queue it will be still '1'.


Will the roll-backed Message get redelivered?

There might be a requirement to get roll-backed message to be redelivered again so that ESB can try to process the message again. Those messages will have "redelivered" JMS header value. WSO2 MB 2.1.0 supports this feature. You can configure how many times broker should try to redeliver the message before it is dropped (dead lettered) using qipd-config file of Message Broker.
Navigate to <MB_Home>/repository/conf/advanced folder and edit "qpid-config" file to have desired number of message redelivers.
 
<!--Broker will drop the message after the configured number of delivery attempts for each message.-->
     <maximumNumberOfMessageDeliveryAttempts>10</maximumNumberOfMessageDeliveryAttempts>


 You can adjust this demo to suit your messaging requirement, and use WSO2 Message Broker's JMS transaction capability to build a robust messaging system without message duplications and without message lost.

Hasitha Hiranya

2 comments:

  1. HI ,
    I have one query same like above ,could you please look into this..
    http://stackoverflow.com/questions/19860431/in-wso2-esb-4-7-0-can-we-do-jms-rollback-in-receiving-sequenc

    ReplyDelete

Instagram