WSO2 ESB uses Apache Synapse as its mediation engine. Synapse has a layered architecture to mediate messages through it.
Following is a high level component diagram of Synapse design.
This post walks you through main steps done in synapse to mediate messages. It is more of a code reference that gives a high level understanding of message flow so that one can debug the code and understand.
You can clone the code from GitHub from here. Let us look at the message flow inside synapse core.
Project can be built using Maven.
Following is a high level component diagram of Synapse design.
This post walks you through main steps done in synapse to mediate messages. It is more of a code reference that gives a high level understanding of message flow so that one can debug the code and understand.
Contents
Where is the code?
Proxy Configuration Used for Debugging
Inflow of a message
Entry point
Creating synapse mc out of axis2 mc
Check if message is coming from Inbound Endpoint
Set service level log (per proxy logs)
Honour isSoapConversionRequired parameter
Execute synapse handlers
Check for mandatory sequence
Start mediating message with SequenceMediator
Building the message
Iterate through mediators and exit where necessary
Forwarding message to the Back-end in in-sequence
Send message to endpoint (send mediator debug)
Axis2SynapseEnvironment.send() Method (to send out any message from synapse)
Axis2Sender.sendOn(endpoint, synCtx) method (send message to an endpoint)
Axis2FlexibleMEPClient.send() method
Register the callback for response
Write to wire so that message goes to BE service
Outflow of a message
SynapseCallbackReceiver
handleMessage() method
Inject message to synapse environment (Axis2SynapseEnvironment)
Using send mediator in out sequence
Dispatching message to Axis2 to send back
Where is the code?
You can clone the code from GitHub from here. Let us look at the message flow inside synapse core.
Project can be built using Maven.
Proxy Configuration Used for Debugging
<?xml version="1.0" encoding="UTF-8"?> <proxy xmlns="http://ws.apache.org/ns/synapse" name="debugProxy" transports="http https" startOnLoad="true" trace="disable"> <description/> <target inSequence="backend_call_seq"> <outSequence> <send/> </outSequence> <faultSequence> <log> <property name="FAULT_OCCURRED" value="BBB"/> </log> </faultSequence> </target> </proxy>
<?xml version="1.0" encoding="UTF-8"?> <sequence xmlns="http://ws.apache.org/ns/synapse" name="backend_call_seq"> <log level="full"> <property name="INCOMING_MSG" value="AAA"/> </log> <send> <endpoint> <address uri="http://localhost:9000/services/SimpleStockQuoteService"/> </endpoint> </send> </sequence>
Inflow of a message
Entry point
The entry points of the message into synapse engine from Axis2 layer are
1. ProxyServiceMessageReceiver - when a message hits a proxy service
2. SynapseMessageReceiver - when a message hits a ESB API/main sequence
In this example ProxyServiceMessageReceiver is hit as a proxy is invoked.
public void receive(org.apache.axis2.context.MessageContext mc) throws AxisFault {..}
Creating synapse mc out of axis2 mc
MessageContext synCtx = MessageContextCreatorForAxis2.getSynapseMessageContext(mc);
return new Axis2MessageContext(axisMsgCtx, synapseConfiguration, synapseEnvironment)
Here synapseEnvironment is received by Axis2MessageContext —> axis2Config —> synapseEnv
Check if message is coming from Inbound Endpoint
proxy.getParameterMap().get(SynapseConstants.INBOUND_PROXY_SERVICE_PARAM);
Object inboundMsgCtxParam = mc.getProperty(SynapseConstants.IS_INBOUND);
Set service level log (per proxy logs)
Log serviceLog = LogFactory.getLog(SynapseConstants.SERVICE_LOGGER_PREFIX + name);
((Axis2MessageContext) synCtx).setServiceLog(serviceLog);
Honour isSoapConversionRequired parameter
/**
* Keeps the synapse property "soap.format.conversion.enable" value.
*/
private static boolean isSoapConversionRequired = SOAPUtils.isSoapConversionRequired();
if (isSoapConversionRequired) {
synCtx.setProperty(SynapseConstants.IS_CLIENT_DOING_REST, mc.isDoingREST());
synCtx.setProperty(SynapseConstants.IS_CLIENT_DOING_SOAP11, mc.isSOAP11());
}
Execute synapse handlers
There are four extension points when message gets into synapse and going out of synapse in inflow and outflow to intercept a message.
Check for mandatory sequence
mandatory sequence is a sequence that get executed for every incoming message.
Mediator mandatorySeq = synCtx.getConfiguration().getMandatorySequence();
Start mediating message with SequenceMediator
/**
* If this mediator refers to another named Sequence, execute that. Else
* execute the list of mediators (children) contained within this. If a referenced
* named sequence mediator instance cannot be found at runtime, an exception is
* thrown. This may occur due to invalid configuration of an erroneous runtime
* change of the synapse configuration. It is the responsibility of the
* SynapseConfiguration builder to ensure that dead references are not present.
*
* @param synCtx the synapse message
* @return as per standard mediator result
*/
public boolean mediate(MessageContext synCtx) {..}
We keep a stack to trace which sequence is being executed. This is done by ContinuationStackManager.
// Add a new SeqContinuationState as we branched to new Sequence.
boolean skipAddition = ContinuationStackManager.isSkipSeqContinuationStateAddition(synCtx);
if (!skipAddition) {
if (dynamic && registryKey != null) {
ContinuationStackManager.addSeqContinuationState(synCtx, registryKey, sequenceType);
} else {
ContinuationStackManager.addSeqContinuationState(synCtx, name, sequenceType);
}
}Then message goes to super.mediate(). Means AbstractListMediator.mediate()
Building the message
If in sequence has at least one mediator which is content aware, message is build. Content aware property is set at the sequence init, looking at the list of mediators it bares.
if (contentAware) {
try {
if (synLog.isTraceOrDebugEnabled()) {
synLog.traceOrDebug("Building message. Sequence <" + getType() + "> is content aware");
}
RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext(),false);
} catch (Exception e) {
handleException("Error while building message", e, synCtx);
}
}
Following is what happens inside axis2 when above method is called.
Pipe pipe = (Pipe)messageContext.getProperty("pass-through.pipe”);
InputStream in = pipe.getInputStream();
try {
element = messageBuilder.getDocument(messageContext, (InputStream)(bufferedInputStream != null?bufferedInputStream:in));
} catch (Exception var6) {
consumeAndDiscardMessage(messageContext);
messageContext.setProperty("message.builder.invoked", Boolean.TRUE);
handleException("Error while building Passthrough stream", var6);
}
if(element != null) {
messageContext.setEnvelope(TransportUtils.createSOAPEnvelope(element));
messageContext.setProperty("__RELAY_FORMATTERS_MAP", messageBuilder.getFormatters());
messageContext.setProperty("message.builder.invoked", Boolean.TRUE);
earlyBuild = messageContext.getProperty("relay_early_build") != null?((Boolean)messageContext.getProperty("relay_early_build")).booleanValue():earlyBuild;
if(!earlyBuild) {
processAddressing(messageContext);
}
}
Iterate through mediators and exit where necessary
for (int i = mediatorPosition; i < mediators.size(); i++) {
// ensure correct trace state after each invocation of a mediator
synCtx.setTracingState(myEffectiveTraceState);
if (!mediators.get(i).mediate(synCtx)) {
returnVal = false;
break;
}
}
Every mediator returns a boolean. If flow should stop there, it will return false and rest of mediators even defined will not get executed. After executing all mediators in a seq we do this.
if (result && !skipAddition) {
// if flow completed remove the previously added SeqContinuationState
ContinuationStackManager.removeSeqContinuationState(synCtx, sequenceType);
}
Pop error handler from the faultStack also. Result (true/false) is ultimately returned to ProxyServiceMessageReceiver.
Forwarding message to the Back-end in in-sequence
After ProxyServiceMessageReceiver receives the result of mediation in insequence, if success, it forwards the message to back-end specified at "target".
// if inSequence returns true, forward message to endpoint
if(inSequenceResult) {
if (proxy.getTargetEndpoint() != null) {
Endpoint endpoint = synCtx.getEndpoint(proxy.getTargetEndpoint());
if (endpoint != null) {
traceOrDebug(traceOn, "Forwarding message to the endpoint : "
+ proxy.getTargetEndpoint());
endpoint.send(synCtx);
} else {
handleException("Unable to find the endpoint specified : " +
proxy.getTargetEndpoint(), synCtx);
}
} else if (proxy.getTargetInLineEndpoint() != null) {
traceOrDebug(traceOn, "Forwarding the message to the anonymous " +
"endpoint of the proxy service");
proxy.getTargetInLineEndpoint().send(synCtx);
}
}
If no EP is defined
Axi2Engine’s InvocationResponse receive(MessageContext msgContext) completes with
InvocationResponse.CONTINUE;
If a EP is defined from here on it is sending message out to an endpoint. Following details are code points when send mediator is debug.
Send message to endpoint (send mediator debug)
reset states
if (keySet != null) {
keySet.remove(SynapseConstants.RECEIVING_SEQUENCE);
keySet.remove(SynapseConstants.CONTINUATION_CALL);
keySet.remove(EndpointDefinition.DYNAMIC_URL_VALUE);
}
set receiving sequence
synCtx.setProperty(SynapseConstants.RECEIVING_SEQUENCE,
receivingSequence.evaluateValue(synCtx));
if no endpoints are defined, send where implicitly stated
synCtx.getEnvironment().send(null, synCtx);
if an EP is defined, send to it
endpoint.send(synCtx);
this calls
AbstractEndpoint.send()
Add fault handler to the stack
In Endpoints also, we build the message is endpoint is contentaware
if (contentAware) {
try {
RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext(),false);
axis2Ctx.setProperty(RelayConstants.FORCE_RESPONSE_EARLY_BUILD, Boolean.TRUE);
if(forceBuildMC){
((Axis2MessageContext) synCtx).getAxis2MessageContext().getEnvelope().build();
}
} catch (Exception e) {
handleException("Error while building message", e);
}
}
// Send the message through this endpoint
synCtx.getEnvironment().send(definition (EndpointDefinition), synCtx);
Axis2SynapseEnvironment.send() Method (to send out any message from synapse)
If it is response we need to send back. Otherwise we are sending a request to a BE.
//if it is response we need to send back
if (synCtx.isResponse()) {
//if an endpoint is defined
if (endpoint != null) {
//if transport is different from http, we need to build looking at content type
if (isTransportSwitching(synCtx, endpoint)) {
buildMessage(synCtx);
}
//send the message to EP
Axis2Sender.sendOn(endpoint, synCtx);
} else {
String proxyName = (String) synCtx.getProperty(SynapseConstants.PROXY_SERVICE);
boolean serviceModuleEngaged = false;
if (proxyName != null) {
ProxyService proxyService = synapseConfig.getProxyService(proxyName);
serviceModuleEngaged = proxyService.isModuleEngaged();
}
if (serviceModuleEngaged || isTransportSwitching(synCtx, null)) {
buildMessage(synCtx);
}
//Build message in the case of inbound jms dual channel
Boolean isInboundJMS = (Boolean)synCtx.getProperty(SynapseConstants.INBOUND_JMS_PROTOCOL);
if (isInboundJMS != null && isInboundJMS) {
buildMessage(synCtx);
}
//this is where response is sent back to client
Axis2Sender.sendBack(synCtx);
}
//we are sending a request to a BE
} else {
// If this request is related to session affinity endpoints - For client initiated session
Dispatcher dispatcher =
(Dispatcher) synCtx.getProperty(
SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_DISPATCHER);
if (dispatcher != null) {
if (!dispatcher.isServerInitiatedSession()) {
dispatcher.updateSession(synCtx);
}
}
synCtx.setProperty(SynapseConstants.SENDING_REQUEST, true);
if (endpoint == null || isTransportSwitching(synCtx, endpoint)) {
buildMessage(synCtx);
}
Axis2Sender.sendOn(endpoint, synCtx);
}
Axis2Sender.sendOn(endpoint, synCtx) method (send message to an endpoint)
>>invoke synapse handlers
// Invoke Synapse Handlers
Iterator iterator =
synapseInMessageContext.getEnvironment().getSynapseHandlers().iterator();
while (iterator.hasNext()) {
SynapseHandler handler = iterator.next();
if (!handler.handleRequestOutFlow(synapseInMessageContext)) {
return;
}
}
>> Following is where we give out message for axis2 to send out
Axis2FlexibleMEPClient.send(
// The endpoint where we are sending to
endpoint,
// The Axis2 Message context of the Synapse MC
synapseInMessageContext);
Axis2FlexibleMEPClient.send() method
This is the place where message is dispatched out from synapse into axis2 layer. This can be considered as a wrapped method in synapse to use axis2 to send messages.
save the original message context without altering it, so we can tie the response
MessageContext originalInMsgCtx
= ((Axis2MessageContext) synapseOutMessageContext).getAxis2MessageContext();
create a new MessageContext to be sent out as this should not corrupt the original. we need to create the response to the original message later on
MessageContext axisOutMsgCtx = cloneForSend(originalInMsgCtx, preserveAddressingProperty);
set all the details of the endpoint only to the cloned message context so that we can use the original message context for resending through different endpoints. Following is the outline of the method.
if (SynapseConstants.FORMAT_POX.equals(endpoint.getFormat())) {
} else if (SynapseConstants.FORMAT_GET.equals(endpoint.getFormat())) {
} else if (SynapseConstants.FORMAT_SOAP11.equals(endpoint.getFormat())) {
} else if (SynapseConstants.FORMAT_SOAP12.equals(endpoint.getFormat())) {
} else if (SynapseConstants.FORMAT_REST.equals(endpoint.getFormat())) {
} else {
processWSDL2RESTRequestMessageType(originalInMsgCtx, axisOutMsgCtx);
}
wsAddressingEnabled
preserveHeaderProperty
outOnlyMessage
wsSecurityEnabled
Register the callback for response
This is VERY important as this one is the one that gets when a response comes for the request by Backend Service.
// always set a callback as we decide if the send it blocking or non blocking within
// the MEP client. This does not cause an overhead, as we simply create a 'holder'
// object with a reference to the outgoing synapse message context
// synapseOutMessageContext
AsyncCallback callback = new AsyncCallback(axisOutMsgCtx, synapseOutMessageContext);
if (!outOnlyMessage) {
if (endpoint != null) {
// set the timeout time and the timeout action to the callback, so that the
// TimeoutHandler can detect timed out callbacks and take appropriate action.
if (!endpoint.isDynamicTimeoutEndpoint()) {
long endpointTimeout = endpoint.getEffectiveTimeout();
callback.setTimeOutOn(System.currentTimeMillis() + endpointTimeout);
callback.setTimeOutAction(endpoint.getTimeoutAction());
callback.setTimeoutDuration(endpointTimeout);
} else {
long endpointTimeout = endpoint.evaluateDynamicEndpointTimeout(synapseOutMessageContext);
callback.setTimeOutOn(System.currentTimeMillis() + endpointTimeout);
callback.setTimeOutAction(endpoint.getTimeoutAction());
callback.setTimeoutDuration(endpointTimeout);
}
} else {
long globalTimeout = synapseOutMessageContext.getEnvironment().getGlobalTimeout();
callback.setTimeOutOn(System.currentTimeMillis() + globalTimeout);
callback.setTimeoutDuration(globalTimeout);
}
}
//if not rest
axisOutMsgCtx.setTo(new EndpointReference(address));
}
mepClient.setCallback(callback);
Write to wire so that message goes to BE service
mepClient.execute(true);
Following diagram depicts above flow in summery (Thanks Pamod Silvester for the diagram!)
Outflow of a message
When a message is sent to an endpoint using send mediator or call mediator, if ESB expects a response it registers a callback when it is sent out (please refer "Register the callback for response" section above). When response comes into ESB it is handled by this callback.
SynapseCallbackReceiver
When response comes from the BE, it is handled by SynapseCallbackReceiver.
/**
* This is the message receiver that receives the responses for outgoing messages sent out
* by Synapse. It holds a callbackStore that maps the [unique] messageID of each message to
* a callback object that gets executed on timeout or when a response is received (before timeout)
*
* The AnonymousServiceFactory uses this MessageReceiver for all Anonymous services created by it.
* This however - effectively - is a singleton class
*/
public class SynapseCallbackReceiver extends CallbackReceiver {..}
public void receive(MessageContext messageCtx) throws AxisFault {
handleMessage(messageID, messageCtx, ((AsyncCallback) callback).getSynapseOutMsgCtx(),
(AsyncCallback)callback);
}
handleMessage() method
handleMessage() {
Object messageType = axisOutMsgCtx.getProperty(
org.apache.axis2.Constants.Configuration.MESSAGE_TYPE);
if (!HTTPConstants.MEDIA_TYPE_X_WWW_FORM.equals(messageType)) {
// copy the message type property that's used by the out message to the
// response message
response.setProperty(org.apache.axis2.Constants.Configuration.MESSAGE_TYPE,
messageType);
}
// create the synapse message context for the response
Axis2MessageContext synapseInMessageContext =
new Axis2MessageContext(
response,
synapseOutMsgCtx.getConfiguration(),
synapseOutMsgCtx.getEnvironment());
synapseInMessageContext.setResponse(true);
//error handling
if (Constants.VALUE_TRUE.equals(errorOnSOAPFault) && successfulEndpoint != null) {
//build the message
try {
RelayUtils.buildMessage(((Axis2MessageContext) synapseInMessageContext).getAxis2MessageContext(),true);
} catch (Exception e) {
// handleException("Error while building message", e, synapseInMessageContext);
}
//see if it is SOAP fault and take action
if ((synapseInMessageContext.getEnvelope() != null) && synapseInMessageContext.getEnvelope().hasFault()) {
…
}
}
}
Inject message to synapse environment (Axis2SynapseEnvironment)
synapseOutMsgCtx.getEnvironment().injectMessage(synapseInMessageContext);
public boolean injectMessage(final MessageContext synCtx) {
if isContinuationCall - in call() mediator responses, execute mediators after the call mediator
else if receivingSequence - if there is a receive sequence, dispatch message there
else if outSequence - if out sequence execute mediators there
else if - just send back using info in synCtx
}
Using send mediator in out sequence
Just assume we have a send mediator in out seq. Then call stack would be
Ultimately it hits following section in send mediator
// if no endpoints are defined, send where implicitly stated
if (endpoint == null) {
synCtx.getEnvironment().send(null, synCtx);
}
Here is it is response for inbound transport message, it uses InboundResponseSender to send back (looked up using synapseContext)
Dispatching message to Axis2 to send back
If the request arrives through a conventional transport listener
AxisEngine.send(messageContext);
Following diagram depicts above flow in summery (Thanks Pamod Silvester for the diagram!)
Very informative. Thanks for sharing
ReplyDelete