What is JMS Synchronous Invocation
The classic way of JMS messaging is one way invocation, which means message producer will send the message and forget about it. On the other side there will be a consumer to consume the message. An external software known as JMS provider (Broker) sits in the middle of message producer and consumer and enable them to communicate without knowing about each other and in an asynchronous way.
Two way JMS communication enables the producer to communicate not knowing about the state of consumer but expecting a response.
The endpoint for a JMS client on the message broker which is called a destination can be either a queue or a topic that reside on the JMS broker. When comes to two way JMS communication that is achieved by using a another queue or topic together with the JMSReplyTo header. Before sending the message out, another destination (which is either queue or a topic) is created and set that as the JMSReplyTo header. On the other side the message consumer(or the subscriber) will use JMSReplyTo property to send the response back to the response destination. The message producer will be waiting on the reply destination and will process the message. A natural question that comes is how to coordinate the request and response if we use the same response queue for more than one message, or if we use the response queue for concurrent messages ? In other words how to identify if this response is for that request message that was sent? That is done by using the JMSCorrelationID JMS header. A message consumer or a subscriber will be created using the JMSCorrelationID so that the response can be sent to the destination given by JMSReplyTo. A more sophisticated(select the message based on some properties or based on content etc..) request/reply can be achieved by using the message selector concepts in JMS when creating the JMS consumer.
Scenario Description
- There is an web application which generates messages out of SMS messages sent by clients.
- There is a service which can process the SMS messages automatically.
- SMS reply should be sent back to the client.
- If the SMS processing server is not available at the moment, sent SMS should be preserved and reply should be generated and sent as soon as the server available again.
Message Flow
Following article describes how to perform above JMS Two way communication using WSO2 ESB and WSO2 MB.
Configuring WSO2 Message Broker
There is nothing much to configure in the broker side. Here I have used WSO2 MB 3.0.0 Milestone 2 release which can be found here. As this setup uses a single machine, both ESB and MB servers are stetup together. Thus in order to avoid port conflicts, MB server should be offset, which can be done by editing <MB_HOME>/repository/conf/carbon.xml
Configuring WSO2 ESB
Configuring Axis2.xml
Navigate to <ESB_HOME>/repository/conf/axis2/axis2.xml and enable following parts under transportReceiver section.
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> <!--<parameter name="myTopicConnectionFactory" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter> </parameter> --> <parameter name="myQueueConnectionFactory" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> </parameter> <parameter name="default" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">QueueConnectionFactory</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">queue</parameter> </parameter> </transportReceiver>Enable JMS Transport Sender.
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"/>
Configuring JNDI.properties file
Navigate to <ESB_HOME>/repository/conf/jndi.properties file and edit. It should look like below.
connectionfactory.QueueConnectionFactory =
amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673' # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] queue.SMSStore = SMSStore # register some topics in JNDI using the form # topic.[jndiName] = [physicalName] #topic.MyTopic = example.MyTopic
Note following.
- Default port for JMS messaging is 5672. As we have a offset=1 for MB now the port is 5673.
- SMSStore is the queue in which published messages are stored.
- All topic related params are commented out.
Copying necessary libraries
In order to communicate with WSO2 Message Broker, ESB needs following Java libraries those can be found at /client-lib folder.
Now, as all the necessary bits are in place, both ESB and MB nodes can be started. Make sure there are no error or warning logs mentioned in the logs (or console) during server startup.
Starting Back-end service
There should be a service that accepts messages (SMSes) and generate reply messages. For that SimpleStockQuoteService which comes with ESB can be used.
"/> property here because this not a fire-and-forget messaging scenario. Also we set SMSReceiveNotificationStore as the queue to which reply messages for sent messages are expected to be.
- Navigate to
/samples/axis2Server/src and execute "ant" to build the service. you should have ant installed. - Navigate to
/samples/axis2Server and execute following command to start the service - sh axis2server.sh if on Linux
- axis2server.bat if on Windows
- To test the service you can use SOAPUI with the WSDL url at http://localhost:9000/services/SimpleStockQuoteService?wsdl
Creating proxy services at ESB
Now necessary proxy services can be created at ESB to perform messaging. First, a proxy should be defined to route an incoming HTTP message to SMSStore queue at WSO2 MB (HTTP >> JMS). Let us call this SMSSenderProxy. Note that we DO NOT define<?xml version="1.0" encoding="UTF-8"?> <proxy xmlns="http://ws.apache.org/ns/synapse" name="SMSSenderProxy" transports="https,http" statistics="disable" trace="disable" startOnLoad="true"> <target> <inSequence> <property name="transport.jms.ContentTypeProperty" value="Content-Type" scope="axis2"/> </inSequence> <outSequence> <property name="TRANSPORT_HEADERS" scope="axis2" action="remove"/> <send/> </outSequence> <endpoint> <address uri="jms:/SMSStore?transport.jms.ConnectionFactoryJNDIName=QueueConnectionFactory&java.naming.factory.initial=org.wso2.andes.jndi.PropertiesFileInitialContextFactory&java.naming.provider.url=repository/conf/jndi.properties&transport.jms.DestinationType=queue&transport.jms.ReplyDestination=SMSReceiveNotificationStore"/> </endpoint> </target> <description/> </proxy>
After a message is sent to SMSStore queue, that message should be picked up and sent to the back-end service to process. For that a JMS listener should be configured. ESB can itself act as a JMS listener. Following proxy listens to JMS messages comes to SMSStore and forward them to SimpleStockQuoteService. Let us call it SMSForwardProxy.
<?xml version="1.0" encoding="UTF-8"?> <proxy xmlns="http://ws.apache.org/ns/synapse" name="SMSForwardProxy" transports="jms" statistics="disable" trace="disable" startOnLoad="true"> <target> <inSequence> <send> <endpoint> <address uri="http://localhost:9000/services/SimpleStockQuoteService"/> </endpoint> </send> </inSequence> <outSequence> <send/> </outSequence> </target> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>text/xml</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myQueueConnectionFactory</parameter> <parameter name="transport.jms.DestinationType">queue</parameter> <parameter name="transport.jms.Destination">SMSStore</parameter> <description/> </proxy>
Testing the Scenario
- Launch SOAP UI. Create a project pointing the WSDL exposed by SMSSenderProxy (http://localhost:8280/services/SMSSenderProxy?wsdl)
- Set following payload as the message to be sent. This is a typical payload a SMS has in this scenario.
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd"> <soapenv:Header/> <soapenv:Body> <ser:getQuote> <!--Optional:--> <ser:request> <!--Optional:--> <xsd:symbol>IBM</xsd:symbol> </ser:request> </ser:getQuote> </soapenv:Body> </soapenv:Envelope>
- Now shut down SimpleStockQuoteService service by shutting down the Axis2 server and send a message in the same way above. Now, the message is sent to the SMSStore queue by SMSSender proxy. But the SMS processing server is unavailable. Thus no reply would come to SMSReceiveNotificationStore queue. The internal listener will listen for sometime and it will be closed as no message came, and the client will timeout waiting for the reply.
- This sample SMSForward proxy can be enhanced using "transactions" so that, if the message could not be forwarded, it will not be removed from the queue. It will retry time to time until Back-End service becomes live. In such a scenario, you would see a state SMSStore has messages which are not processed still.
Good stuff.!!!
ReplyDeleteHi Hasitha.
ReplyDeleteThis scenario work OK with ActiveMQ using WSO2 EI 6.1.0 but if I use the internal MB I get a delay in the response over the 5s...
Check this logs:
Log with MB:
[2017-08-07 14:10:14,546] [] INFO - LogMediator ACCION: = Accediendo a inSequence de SenderProxyMB
[2017-08-07 14:10:17,020] [] INFO - LogMediator ACCION: = Accediendo a inSequence del Servicio FordwarProxyMB
[2017-08-07 14:10:17,028] [] INFO - LogMediator ACCION: = Accediendo a outSequence de FordwarProxyMB
[2017-08-07 14:10:19,961] [] INFO - LogMediator ACCION: = Accediendo a outSequence de SenderProxyMB
Log with ActiveMQ:
[2017-08-07 14:10:34,715] [] INFO - LogMediator ACCION: = Accediendo a inSequence de SenderProxy
[2017-08-07 14:10:35,026] [] INFO - LogMediator ACCION: = Accediendo a inSequence del Servicio FordwarProxy
[2017-08-07 14:10:35,116] [] INFO - LogMediator ACCION: = Accediendo a outSequence de FordwarProxy
[2017-08-07 14:10:35,395] [] INFO - LogMediator ACCION: = Accediendo a outSequence de SenderProxy
Any idea why?
Regards.