Synapse Mediation Engine - Message Flow Outlined

WSO2 ESB uses Apache Synapse as its mediation engine. Synapse has a layered architecture to mediate messages through it.

Following is a high level component diagram of Synapse design.


This post walks you through main steps done in synapse to mediate messages. It is more of a code reference that gives a high level understanding of message flow so that one can debug the code and understand.

Contents

Where is the code?
Proxy Configuration Used for Debugging 
Inflow of a message
    Entry point
    Creating synapse mc out of axis2 mc
    Check if message is coming from Inbound Endpoint 
    Set service level log (per proxy logs)
    Honour isSoapConversionRequired parameter
    Execute synapse handlers
    Check for mandatory sequence
    Start mediating message with SequenceMediator
    Building the message
    Iterate through mediators and exit where necessary
    Forwarding message to the Back-end in in-sequence
    Send message to endpoint (send mediator debug)
    Axis2SynapseEnvironment.send() Method (to send out any message from synapse)
    Axis2Sender.sendOn(endpoint, synCtx) method (send message to an endpoint)
    Axis2FlexibleMEPClient.send() method 
    Register the callback for response
    Write to wire so that message goes to BE service

Outflow of a message

    SynapseCallbackReceiver
    handleMessage() method
    Inject message to synapse environment (Axis2SynapseEnvironment)
    Using send mediator in out sequence 
    Dispatching message to Axis2 to send back

Where is the code?


You can clone the code from GitHub from here. Let us look at the message flow inside synapse core.
Project can be built using Maven.


Proxy Configuration Used for Debugging 

<?xml version="1.0" encoding="UTF-8"?>
<proxy xmlns="http://ws.apache.org/ns/synapse"
       name="debugProxy"
       transports="http https"
       startOnLoad="true"
       trace="disable">
   <description/>
   <target inSequence="backend_call_seq">
      <outSequence>
         <send/>
      </outSequence>
      <faultSequence>
         <log>
            <property name="FAULT_OCCURRED" value="BBB"/>
         </log>
      </faultSequence>
   </target>
</proxy>


<?xml version="1.0" encoding="UTF-8"?>
<sequence xmlns="http://ws.apache.org/ns/synapse" name="backend_call_seq">
   <log level="full">
      <property name="INCOMING_MSG" value="AAA"/>
   </log>
   <send>
      <endpoint>
         <address uri="http://localhost:9000/services/SimpleStockQuoteService"/>
      </endpoint>
   </send>
</sequence>


Inflow of a message

Entry point


The entry points of the message into synapse engine from Axis2 layer are

1. ProxyServiceMessageReceiver - when a message hits a proxy service
2. SynapseMessageReceiver - when a message hits a ESB API/main sequence

In this example ProxyServiceMessageReceiver is hit as a proxy is invoked. 

public void receive(org.apache.axis2.context.MessageContext mc) throws AxisFault {..}

Creating synapse mc out of axis2 mc


MessageContext synCtx = MessageContextCreatorForAxis2.getSynapseMessageContext(mc); return new Axis2MessageContext(axisMsgCtx, synapseConfiguration, synapseEnvironment)

Here synapseEnvironment is received by Axis2MessageContext —> axis2Config —> synapseEnv 

Check if message is coming from Inbound Endpoint 


bject inboundServiceParam =
                proxy.getParameterMap().get(SynapseConstants.INBOUND_PROXY_SERVICE_PARAM);
        Object inboundMsgCtxParam = mc.getProperty(SynapseConstants.IS_INBOUND);


Set service level log (per proxy logs)

Log serviceLog = LogFactory.getLog(SynapseConstants.SERVICE_LOGGER_PREFIX + name);
         ((Axis2MessageContext) synCtx).setServiceLog(serviceLog);


Honour isSoapConversionRequired parameter


    /**
     * Keeps the synapse property "soap.format.conversion.enable" value.
     */
    private static boolean isSoapConversionRequired = SOAPUtils.isSoapConversionRequired();

        if (isSoapConversionRequired) {
            synCtx.setProperty(SynapseConstants.IS_CLIENT_DOING_REST, mc.isDoingREST());
            synCtx.setProperty(SynapseConstants.IS_CLIENT_DOING_SOAP11, mc.isSOAP11());
        }


Execute synapse handlers

There are four extension points when message gets into synapse and going out of synapse in inflow and outflow to intercept a message. 

List handlers = synCtx.getEnvironment().getSynapseHandlers()


Check for mandatory sequence


mandatory sequence is a sequence that get executed for every incoming message.

Mediator mandatorySeq = synCtx.getConfiguration().getMandatorySequence();


Start mediating message with SequenceMediator


    /**
     * If this mediator refers to another named Sequence, execute that. Else
     * execute the list of mediators (children) contained within this. If a referenced
     * named sequence mediator instance cannot be found at runtime, an exception is
     * thrown. This may occur due to invalid configuration of an erroneous runtime
     * change of the synapse configuration. It is the responsibility of the
     * SynapseConfiguration builder to ensure that dead references are not present.
     *
     * @param synCtx the synapse message
     * @return as per standard mediator result
     */
    public boolean mediate(MessageContext synCtx) {..}


We keep a stack to trace which sequence is being executed. This is done by ContinuationStackManager.


// Add a new SeqContinuationState as we branched to new Sequence.
                boolean skipAddition = ContinuationStackManager.isSkipSeqContinuationStateAddition(synCtx);
                if (!skipAddition) {
                    if (dynamic && registryKey != null) {
                        ContinuationStackManager.addSeqContinuationState(synCtx, registryKey, sequenceType);
                    } else {
                        ContinuationStackManager.addSeqContinuationState(synCtx, name, sequenceType);
                    }
                }

Then message goes to super.mediate(). Means AbstractListMediator.mediate()


Building the message

 If in sequence has at least one mediator which is content aware, message is build. Content aware property is set at the sequence init, looking at the list of mediators it bares.


 if (contentAware) {
                try {
                    if (synLog.isTraceOrDebugEnabled()) {
                        synLog.traceOrDebug("Building message. Sequence <" + getType() + "> is content aware");
                    }
                    RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext(),false);
                } catch (Exception e) {
                    handleException("Error while building message", e, synCtx);
                }

            }

Following is what happens inside axis2 when above method is called.



Pipe pipe = (Pipe)messageContext.getProperty("pass-through.pipe”);
InputStream in = pipe.getInputStream();

       try {
            element = messageBuilder.getDocument(messageContext, (InputStream)(bufferedInputStream != null?bufferedInputStream:in));
        } catch (Exception var6) {
            consumeAndDiscardMessage(messageContext);
            messageContext.setProperty("message.builder.invoked", Boolean.TRUE);
            handleException("Error while building Passthrough stream", var6);
        }

        if(element != null) {
            messageContext.setEnvelope(TransportUtils.createSOAPEnvelope(element));
            messageContext.setProperty("__RELAY_FORMATTERS_MAP", messageBuilder.getFormatters());
            messageContext.setProperty("message.builder.invoked", Boolean.TRUE);
            earlyBuild = messageContext.getProperty("relay_early_build") != null?((Boolean)messageContext.getProperty("relay_early_build")).booleanValue():earlyBuild;
            if(!earlyBuild) {
                processAddressing(messageContext);
            }
        }

Iterate through mediators and exit where necessary


  for (int i = mediatorPosition; i < mediators.size(); i++) {
                // ensure correct trace state after each invocation of a mediator
                synCtx.setTracingState(myEffectiveTraceState);
                if (!mediators.get(i).mediate(synCtx)) {
                    returnVal = false;
                    break;
                }
            }


Every mediator returns a boolean. If flow should stop there, it will return false and rest of mediators even defined will not get executed. After executing all mediators in a seq we do this. 


if (result && !skipAddition) {
                    // if flow completed remove the previously added SeqContinuationState
                    ContinuationStackManager.removeSeqContinuationState(synCtx, sequenceType);
                }


Pop error handler from the faultStack also. Result (true/false) is ultimately returned to ProxyServiceMessageReceiver.  


Forwarding message to the Back-end in in-sequence

After ProxyServiceMessageReceiver receives the result of mediation in insequence, if success, it forwards the message to back-end specified at "target". 


            // if inSequence returns true, forward message to endpoint
            if(inSequenceResult) {
                if (proxy.getTargetEndpoint() != null) {
                    Endpoint endpoint = synCtx.getEndpoint(proxy.getTargetEndpoint());

                    if (endpoint != null) {
                        traceOrDebug(traceOn, "Forwarding message to the endpoint : "
                            + proxy.getTargetEndpoint());
                        endpoint.send(synCtx);

                    } else {
                        handleException("Unable to find the endpoint specified : " +
                            proxy.getTargetEndpoint(), synCtx);
                    }

                } else if (proxy.getTargetInLineEndpoint() != null) {
                    traceOrDebug(traceOn, "Forwarding the message to the anonymous " +
                        "endpoint of the proxy service");
                    proxy.getTargetInLineEndpoint().send(synCtx);
                }
            }


If no EP is defined 

Axi2Engine’s InvocationResponse receive(MessageContext msgContext) completes with
InvocationResponse.CONTINUE;

If a EP is defined from here on  it is sending message out to an endpoint. Following details are code points when send mediator is debug. 


Send message to endpoint (send mediator debug)

reset states


if (keySet != null) {
            keySet.remove(SynapseConstants.RECEIVING_SEQUENCE);
            keySet.remove(SynapseConstants.CONTINUATION_CALL);
            keySet.remove(EndpointDefinition.DYNAMIC_URL_VALUE);

set receiving sequence


synCtx.setProperty(SynapseConstants.RECEIVING_SEQUENCE,
                    receivingSequence.evaluateValue(synCtx));


if no endpoints are defined, send where implicitly stated

synCtx.getEnvironment().send(null, synCtx);



if an EP is defined, send to it

endpoint.send(synCtx);


this calls 

AbstractEndpoint.send()


Add fault handler to the stack


In Endpoints also, we build the message is endpoint is contentaware


 if (contentAware) {
            try {
                RelayUtils.buildMessage(((Axis2MessageContext) synCtx).getAxis2MessageContext(),false);
                axis2Ctx.setProperty(RelayConstants.FORCE_RESPONSE_EARLY_BUILD, Boolean.TRUE);
                if(forceBuildMC){
                 ((Axis2MessageContext) synCtx).getAxis2MessageContext().getEnvelope().build();
                }
            } catch (Exception e) {
                handleException("Error while building message", e);
            }
        }



// Send the message through this endpoint
        synCtx.getEnvironment().send(definition (EndpointDefinition), synCtx);



Axis2SynapseEnvironment.send() Method (to send out any message from synapse)


If it is response we need to send back. Otherwise we are sending a request to a BE. 



//if it is response we need to send back 
if (synCtx.isResponse()) {
            
             //if an endpoint is defined 
            if (endpoint != null) {

                //if transport is different from http, we need to build looking at content type
                if (isTransportSwitching(synCtx, endpoint)) {
                    buildMessage(synCtx);
                }

                //send the message to EP
                Axis2Sender.sendOn(endpoint, synCtx);
            } else {
                String proxyName = (String) synCtx.getProperty(SynapseConstants.PROXY_SERVICE);
                boolean serviceModuleEngaged = false;
                if (proxyName != null) {
                    ProxyService proxyService = synapseConfig.getProxyService(proxyName);
                    serviceModuleEngaged = proxyService.isModuleEngaged();
                }

                if (serviceModuleEngaged || isTransportSwitching(synCtx, null)) {
                    buildMessage(synCtx);
                }

                //Build message in the case of inbound jms dual channel
                Boolean isInboundJMS = (Boolean)synCtx.getProperty(SynapseConstants.INBOUND_JMS_PROTOCOL);
                if (isInboundJMS != null && isInboundJMS) {
                    buildMessage(synCtx);
                }

                //this is where response is sent back to client
                Axis2Sender.sendBack(synCtx);

            }
           
        //we are sending a request to a BE
        } else {
            // If this request is related to session affinity endpoints - For client initiated session
            Dispatcher dispatcher =
                    (Dispatcher) synCtx.getProperty(
                            SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_DISPATCHER);
            if (dispatcher != null) {
                if (!dispatcher.isServerInitiatedSession()) {
                    dispatcher.updateSession(synCtx);
                }
            }

            synCtx.setProperty(SynapseConstants.SENDING_REQUEST, true);
            if (endpoint == null || isTransportSwitching(synCtx, endpoint)) {
                buildMessage(synCtx);
            }
            Axis2Sender.sendOn(endpoint, synCtx);
        }



Axis2Sender.sendOn(endpoint, synCtx) method (send message to an endpoint)



>>invoke synapse handlers

            // Invoke Synapse Handlers
            Iterator iterator =
                    synapseInMessageContext.getEnvironment().getSynapseHandlers().iterator();
            while (iterator.hasNext()) {
                SynapseHandler handler = iterator.next();
                if (!handler.handleRequestOutFlow(synapseInMessageContext)) {
                    return;
                }
            }

>> Following is where we give out message for axis2 to send out

            Axis2FlexibleMEPClient.send(
                    // The endpoint where we are sending to
                    endpoint,
                    // The Axis2 Message context of the Synapse MC
                    synapseInMessageContext); 



 Axis2FlexibleMEPClient.send() method 


This is the place where message is dispatched out from synapse into axis2 layer. This can be considered as a wrapped method in synapse to use axis2 to send messages. 

save the original message context without altering it, so we can tie the response


MessageContext originalInMsgCtx
                = ((Axis2MessageContext) synapseOutMessageContext).getAxis2MessageContext();


create a new MessageContext to be sent out as this should not corrupt the original. we need to create the response to the original message later on


 MessageContext axisOutMsgCtx = cloneForSend(originalInMsgCtx, preserveAddressingProperty);


set all the details of the endpoint only to the cloned message context so that we can use the original message context for resending through different endpoints. Following is the outline of the method. 


if (SynapseConstants.FORMAT_POX.equals(endpoint.getFormat())) {
} else if (SynapseConstants.FORMAT_GET.equals(endpoint.getFormat())) {
} else if (SynapseConstants.FORMAT_SOAP11.equals(endpoint.getFormat())) {
} else if (SynapseConstants.FORMAT_SOAP12.equals(endpoint.getFormat())) {
} else if (SynapseConstants.FORMAT_REST.equals(endpoint.getFormat())) {
} else {
                processWSDL2RESTRequestMessageType(originalInMsgCtx, axisOutMsgCtx);
}

wsAddressingEnabled
preserveHeaderProperty
outOnlyMessage
wsSecurityEnabled

Register the callback for response


This is VERY important as this one is the one that gets when a response comes for the request by Backend Service. 


// always set a callback as we decide if the send it blocking or non blocking within
        // the MEP client. This does not cause an overhead, as we simply create a 'holder'
        // object with a reference to the outgoing synapse message context
        // synapseOutMessageContext
        AsyncCallback callback = new AsyncCallback(axisOutMsgCtx, synapseOutMessageContext);
        if (!outOnlyMessage) {
            if (endpoint != null) {
                // set the timeout time and the timeout action to the callback, so that the
                // TimeoutHandler can detect timed out callbacks and take appropriate action.
                if (!endpoint.isDynamicTimeoutEndpoint()) {
                    long endpointTimeout = endpoint.getEffectiveTimeout();
                    callback.setTimeOutOn(System.currentTimeMillis() + endpointTimeout);
                    callback.setTimeOutAction(endpoint.getTimeoutAction());
                    callback.setTimeoutDuration(endpointTimeout);
                } else {
                    long endpointTimeout = endpoint.evaluateDynamicEndpointTimeout(synapseOutMessageContext);
                    callback.setTimeOutOn(System.currentTimeMillis() + endpointTimeout);
                    callback.setTimeOutAction(endpoint.getTimeoutAction());
                    callback.setTimeoutDuration(endpointTimeout);
                }
            } else {
                long globalTimeout = synapseOutMessageContext.getEnvironment().getGlobalTimeout();
                callback.setTimeOutOn(System.currentTimeMillis() + globalTimeout);
                callback.setTimeoutDuration(globalTimeout);
            }

        }

              //if not rest
              axisOutMsgCtx.setTo(new EndpointReference(address));
  }


        mepClient.setCallback(callback);

Write to wire so that message goes to BE service

mepClient.execute(true);



Following diagram depicts above flow in summery (Thanks Pamod Silvester for the diagram!)


Outflow of a message


When a message is sent to an endpoint using send mediator or call mediator, if ESB expects a response it registers a callback when it is sent out (please refer "Register the callback for response" section above). When response comes into ESB it is handled by this callback. 

SynapseCallbackReceiver


When response comes from the BE, it is handled by SynapseCallbackReceiver.


/**
* This is the message receiver that receives the responses for outgoing messages sent out
* by Synapse. It holds a callbackStore that maps the [unique] messageID of each message to
* a callback object that gets executed on timeout or when a response is received (before timeout)
*
* The AnonymousServiceFactory uses this MessageReceiver for all Anonymous services created by it.
* This however - effectively - is a singleton class
*/
public class SynapseCallbackReceiver extends CallbackReceiver {..}



public void receive(MessageContext messageCtx) throws AxisFault {
                            handleMessage(messageID, messageCtx, ((AsyncCallback) callback).getSynapseOutMsgCtx(),
                        (AsyncCallback)callback);
}

 handleMessage() method


handleMessage() {

Object messageType = axisOutMsgCtx.getProperty(
                    org.apache.axis2.Constants.Configuration.MESSAGE_TYPE);
            if (!HTTPConstants.MEDIA_TYPE_X_WWW_FORM.equals(messageType)) {
                 // copy the message type property that's used by the out message to the
                 // response message
                response.setProperty(org.apache.axis2.Constants.Configuration.MESSAGE_TYPE,
                    messageType);
            }

            // create the synapse message context for the response
            Axis2MessageContext synapseInMessageContext =
                    new Axis2MessageContext(
                            response,
                            synapseOutMsgCtx.getConfiguration(),
                            synapseOutMsgCtx.getEnvironment());
            synapseInMessageContext.setResponse(true);

//error handling
if (Constants.VALUE_TRUE.equals(errorOnSOAPFault) && successfulEndpoint != null) {
          //build the message 
                try {
                    RelayUtils.buildMessage(((Axis2MessageContext) synapseInMessageContext).getAxis2MessageContext(),true);
                } catch (Exception e) {
                   // handleException("Error while building message", e, synapseInMessageContext);
                }

//see if it is SOAP fault and take action 
if ((synapseInMessageContext.getEnvelope() != null) && synapseInMessageContext.getEnvelope().hasFault()) {
}

}



Inject message to synapse environment (Axis2SynapseEnvironment)



synapseOutMsgCtx.getEnvironment().injectMessage(synapseInMessageContext);


public boolean injectMessage(final MessageContext synCtx) {

if isContinuationCall - in call() mediator responses, execute mediators after the call mediator
else if receivingSequence - if there is a receive sequence, dispatch message there
else if outSequence - if out sequence execute mediators there
else if - just send back using info in synCtx

}

Using send mediator in out sequence 



Just assume we have a send mediator in out seq. Then call stack would be


Ultimately it hits following section in send mediator


// if no endpoints are defined, send where implicitly stated
        if (endpoint == null) {
    synCtx.getEnvironment().send(null, synCtx);
}


Here is it is response for inbound transport message, it uses InboundResponseSender to send back (looked up using synapseContext) 

Dispatching message to Axis2 to send back



If the request arrives through a conventional transport listener

AxisEngine.send(messageContext); 


Following diagram depicts above flow in summery (Thanks Pamod Silvester for the diagram!)


Hasitha Hiranya

No comments:

Post a Comment

Instagram