You might ask the obvious question where is WSO2 MB 2.1.0? OK, for now it is on the way. It will be released within two or three months from now on.
What are durable topic subscriptions
Durable queues keep messages around persistently for any suitable consumer to consume them. Durable queues do not need to concern themselves with which consumer is going to consume the messages at some point in the future. There is just one copy of a message that any consumer in the future can consume.
Durable topics however are different as they must logically persist an instance of each suitable message for every durable consumer - since each durable consumer gets their own copy of the message.
In synopsis:
Durable topic subscriptions persists the subscription on behalf of the client subscription when it is not online. In simple words, durable topics keep messages around persistently for any suitable consumer to consume them. Durable topic subscribers are used when an application needs to receive messages that are published even while the application is inactive.
We will see how to setup WSO2 ESB as a client who create durable topic subscriptions to WSO2 MB. The added advantage is if subscriber was down (maybe ESB instance which consume messages from topic is down temporarily) you will not loose messages that are being published to that topic (maybe from an event stream from CEP).
Setup WSO2 Message Broker
- Download WSO2 MB 2.1.0 and extract it to a folder. That folder is referred as <MB_Home>
from here on. - Navigate to
<MB_Home>/repository/conf folder and open carbon.xml file. - Define an offset 1. (We do this as we hope to start WSO2 ESB on the default port)
<Ports> <!-- Ports offset. This entry will set the value of the ports defined below to the define value + Offset. e.g. Offset=2 and HTTPS port=9443 will set the effective HTTPS port to 9445 --> <Offset>0</Offset>
- Now start WSO2 MB using wso2server.sh on Linux or wso2server.bat on Windows.
Setup WSO2 ESB
- Download WSO2 ESB 4.6.0 from here and extract it to a folder. That folder is referred as <ESB_Home> from here on.
- Naviage to <ESB_Home> /repository/conf/axis2 folder and edit axis2.xml file.
- Edit and add JMS Sender parameters
<transportSender name="jms" class="org.apache.axis2.transport.jms.JMSSender"> <parameter name="default" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">ConnectionFactory</parameter> </parameter> </transportSender>
- Edit and add JMS Receiver parameters
<transportReceiver name="jms" class="org.apache.axis2.transport.jms.JMSListener"> <parameter name="myTopicConnectionFactory" locked="false"> <parameter name="java.naming.factory.initial" locked="false">org.wso2.andes.jndi.PropertiesFileInitialContextFactory</parameter> <parameter name="java.naming.provider.url" locked="false">repository/conf/jndi.properties</parameter> <parameter name="transport.jms.ConnectionFactoryJNDIName" locked="false">TopicConnectionFactory</parameter> <parameter name="transport.jms.ConnectionFactoryType" locked="false">topic</parameter> </parameter> </transportReceiver>
- Edit and add JMS Sender parameters
- Navigate to <ESB_Home> /repository/conf/ folfer and edit jndi.properties file. We will point to the MB instance we are running and also define a topic to be used here.
# register some connection factories # connectionfactory.[jndiname] = [ConnectionURL] connectionfactory.ConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673' connectionfactory.TopicConnectionFactory = amqp://admin:admin@clientID/carbon?brokerlist='tcp://localhost:5673' # register some queues in JNDI using the form # queue.[jndiName] = [physicalName] #queue.JMSMS=JMSMS # register some topics in JNDI using the form # topic.[jndiName] = [physicalName] topic.myTopic = myTopic
- Put client libs needed to connect to WSO2 MB to the folder <ESB_Home>/repository/components/lib folder. Get following two jars from <MB_Home>/client-libs and copy.
- andes-client-0.13.wso2v7
- geronimo-jms_1.1_spec-1.1.0.wso2v1
- Start WSO2 ESB by navigating to <ESB_Home>/bin and typing wso2server.sh on Linux or wso2server.bat on Windows.
- Using ESB management console create a proxy for sending messages to the topic.
<proxy xmlns="http://ws.apache.org/ns/synapse" name="StockQuoteProxy" transports="http" startOnLoad="true"> <target> <endpoint> <address uri="jms:/myTopic?&transport.jms.DestinationType=topic"/> </endpoint> <inSequence> <log level="custom"> <property name="STATE" value="message is sent to queue"/> </log> <property name="OUT_ONLY" value="true"/> <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/> </inSequence> <outSequence/> </target> </proxy>
- Using ESB management console create a proxy for consuming messages from the topic.
<proxy xmlns="http://ws.apache.org/ns/synapse" name="JMSQueueListenerProxy" startOnLoad="true"> <target> <inSequence> <property name="OUT_ONLY" value="true"/> <log level="full"/> <drop/> </inSequence> <outSequence/> </target> <parameter name="transport.jms.ContentType"> <rules> <jmsProperty>contentType</jmsProperty> <default>application/xml</default> </rules> </parameter> <parameter name="transport.jms.ConnectionFactory">myTopicConnectionFactory</parameter> <parameter name="transport.jms.DestinationType">topic</parameter> <parameter name="transport.jms.Destination">myTopic</parameter> </proxy>
- you will see following log message at MB's Console
-
[2013-04-21 09:20:36,531] INFO {org.wso2.andes.server.store.CassandraMessageStore} - Created Topic : myTopic [2013-04-21 09:20:36,539] INFO {org.wso2.andes.server.store.CassandraMessageStore} - Registered Subscription TopicNodeQueue_1 for Topic myTopic
Testing the setup
- Send the following message to StockQuoteProxy in ESB using soap UI.
<soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:ser="http://services.samples" xmlns:xsd="http://services.samples/xsd"> <soapenv:Header/> <soapenv:Body> <ser:getQuote> <!--Optional:--> <ser:request> <!--Optional:--> <xsd:symbol>IBM</xsd:symbol> </ser:request> </ser:getQuote> </soapenv:Body> </soapenv:Envelope>
- You will see immediately message being consumed by the listener proxy.
[2013-04-21 09:38:54,548] INFO - LogMediator To: , WSAction: "urn:getQuote", SOAPAction: "urn:getQuote", MessageID: ID:e1835e06-a8ba-3630-8124-31920227607a, Direction: request, Envelope: <?xml version='1.0' encoding='utf-8'?><soapenv:Envelope xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/"><soapenv:Body><soapenv:Envelope xmlns:xsd="http://services.samples/xsd" xmlns:ser="http://services.samples"><soapenv:Body> <ser:getQuote> <!--Optional:--> <ser:request> <!--Optional:--> <xsd:symbol>IBM</xsd:symbol> </ser:request> </ser:getQuote> </soapenv:Body></soapenv:Envelope></soapenv:Body></soapenv:Envelope>
- Now we will make the subscription durable by adding two additional following properties to JMSQueueListenerProxy. Here transport.jms.DurableSubscriberName is the one used to identify the durable topic subscriber.
<parameter name="transport.jms.SubscriptionDurable">true</parameter> <parameter name="transport.jms.DurableSubscriberName">mySub1</parameter>
Following log message will be there at MB's console[2013-04-21 09:34:55,508] INFO {org.wso2.andes.server.cassandra.DefaultClusteringEnabledSubscriptionManager} - Binding Subscription 1 to queue clientID:mySub1
- Now send two more messages as before and you will observe even now it behaves as a normal topic subscription. As soon as you publish the message consumer consumes it.
- Now "cut" JMSQueueListenerProxy from source view and save it to some text file. Save the modified source.
- Now there is not subscriber listening to the messages published for the topic. But do not worry, messages will not be lost. we made it durable prior to closing it!! Now send three more messages to the StockQuoteProxy using soap UI.
[2013-04-21 09:40:05,205] INFO - LogMediator STATE = message is sent to queue [2013-04-21 09:40:06,366] INFO - LogMediator STATE = message is sent to queue [2013-04-21 09:40:07,710] INFO - LogMediator STATE = message is sent to queue
- Then we will register the topic subscriber again. Paste it from the file you kept it before removing from source view. Important thing to note here is that you need to use exactly same "DurableSubscriberName" you used before.
- You will see 3 messages we sent while listener proxy was absent, are immediately consumed. You have not lost any sing message........
Great post Hasitha. I am trying to do something similar as a pilot. The question i have is how third party consumers can subscribe to the message? Is is possible to use ESB listener proxy or they should go directly against the message broker?
ReplyDeleteThanks