Itinerary based message routing using Azure Logic Apps and Service Bus Actions

This is another integration pattern used quite extensively in the integration world. It is used when a message is required to be routed to several endpoints in a particular order using some form of routing list. Depending on your business requirements, the message being routed may be enriched or replaced before sending it to the next service endpoint in the list.

image

This is probably one of the easiest integration patterns to implement using Logic Apps and Service Bus Actions. There are probably other methods to implement this pattern, but I wanted to abstract the routing logic away from the Logic App itself and leave it to focus on the business process and not worry about setting up the next routing endpoint.

By using the Actions feature of the service bus I can defined the routing order of the next service endpoint, or in this case the next service bus subscription by setting properties to match the next subscriber filter condition. A service bus action is executed after the filter condition has been met and is used to set the value of either a system or custom property before the message is consumed from the service bus. It is set in a similar way to the T-SQL “Set” command where you set a field to a value.

For this scenario I have a sales order message that is required to be passed to several logic apps in a particular sequence. There is a sales order header logic app, sales order line logic app, sale order payments logic app and finally a sales order completion logic app. The sales order message will look similar to this below.

image

Provisioning Azure Service Bus

The method involves creating a service bus topic called “salesorder” with the following subscriptions.

servicebus

The filter and action rules are setup as in the following table. There are 2 custom properties “MsgType” and “ItineraryLeg” used for the subscriptions. The MsgType is just used to group the messages and the ItineraryLeg  defines the order of the subscribers.

The action is used to specify the next subscriber to send the message to after the current process by setting the ItineraryLeg property.

Seq Subscription Name Rule Filter Rule Action
1 SOHeader MsgType=’salesorder’ and ItineraryLeg = ” set  ItineraryLeg = ‘solines’
2 SOLines MsgType=’salesorder’ and ItineraryLeg = ‘solines’ set  ItineraryLeg = ‘sopayments’
3 SOPayments MsgType=’salesorder’ and ItineraryLeg = ‘sopayments’ set  ItineraryLeg = ‘socompletion’
4 SOCompletion MsgType=’salesorder’ and ItineraryLeg = ‘socompletion’

A sample of the SOHeader rules are shown below:

image

Creating the Logic Apps

Next we will provision the 4 logic apps to create the SO header, process the line items, apply the payments and complete the sales order.

image

The logic apps (SalesOrderHeaderProcessor, SalesOrderLinesProcessor, SalesOrderPaymentsProcessor) are constructed in a similar manner as shown below and the only difference is the topic subscription names for the service bus trigger action.  The Delay action is where you would implement your business process logic on the received message.

image

Expanding the service bus trigger shape has the following properties set. The other logic apps will have different Topic subscription names set.

image

The real smarts of these workflows are the “Republish message” service bus action shape. You will need to add this shape to every logic app that is involved in processing the message. Here we set the ItineraryLog and MsgType property values to the same values in the Trigger service bus action shape. Also I am adding another custom  property called “Tracking”. This is used to track the name of logic app the message was routed to in a chorological order and can be useful for debugging. The content property is just set the received message from the service bus trigger. Depending on your business requirements, you may be required to publish a totally different message.

image

Below is the code behind for the Republish message shape showing how to setup the properties section.

“body”: {
“ContentData”: “@{triggerBody()?[‘ContentData’]}”,
“Properties”: {
“ItineraryLeg”: “@triggerBody()?[‘Properties’][‘ItineraryLeg’]”,
“MsgType”: “@triggerBody()?[‘Properties’][‘MsgType’]”,
“Tracking”: “@concat(coalesce(triggerBody()?[‘Properties’]?[‘Tracking’],’Begin’),’,’,workflow()?[‘name’])”
}
},

The other logic app SalesOrderCompletionProcessor simply pulls the message from the last subscription and sends the Tracking property value to RequestBin.

image

Messages are placed onto the service bus using the MessagePublisher logic app which accepts a json message and initialises the service bus custom properties “ItineraryLeg” and “MsgType”. Here the ItineraryLeg is set to an empty string and the MsgType being set to “salesorder”. The body of the HTTP request trigger is used as the message content for the Send message action shape.

image

Testing

Using PostMan we can POST a message to the MessagePublisher logic app. If you set the delays long enough in the logic apps, you can see the message being published and consumed by the subscribers in the correct order as defined in the service bus actions.

Here is the result of the message being posted to the MessagePublisher logic app and the output from the SalesOrderCompletionProcessor  logic app which sends it to RequestBin.

Begin,SalesOrderHeaderProcessor,SalesOrderLinesProcessor,SalesOrderPaymentsProcessor,SalesOrderCompletionProcessor

Now if I wanted to change the order of the message processing, say I wanted to process the payments before the sales order lines. Here I would simply update the actions on the service bus subscriptions as follows:

Seq Subscription Name Rule Filter Rule Action
1 SOHeader MsgType=’salesorder’ and ItineraryLeg = ” set  ItineraryLeg = ‘sopayments’
3 SOPaymentsSOLines MsgType=’salesorder’ and ItineraryLeg = ‘solines’ set  ItineraryLeg = ‘socompletion’
2 SOPayments MsgType=’salesorder’ and ItineraryLeg = ‘sopayments’ set  ItineraryLeg = ‘solines’
4 SOCompletion MsgType=’salesorder’ and ItineraryLeg = ‘socompletion’

Now the output of the SalesOrderCompletionProcessor looks like this below. You can clearly see the payments logic app being executed after the sales header process.

Begin,SalesOrderHeaderProcessor,SalesOrderPaymentsProcessor,SalesOrderLinesProcessor,SalesOrderCompletionProcessor

In Summary

Use service bus actions to manage the itinerary list to abstract the routing logic away from the normal business process of the logic apps.

Typically you would use Azure Resource templates to setup the service bus subscriptions and rules under TFS control.

Keep a watch out for my next article on Content Based Routing using Azure Logic Apps and Service Bus.

Enjoy…

Integration Scatter-Gather pattern using Azure Logic Apps and Service Bus

Recently I have taken a role as the Integration Architect and tech lead for a project to integrate messaging between MS Dynamics 365  AX/CRM and 3rd party systems. This was a huge solution that took over 10 months to design and develop.  It consisted of developing over 60 Logic Apps.

With a large integration project like this were multiple entities are required to be updated across multiple systems in a transactional manner, I required a process to correlate and aggregate all the responses into one composite message before proceeding onto the next task in the workflow. The scatter-gatherer pattern was chosen as a  good candidate for this type of scenario.

The next problem was how to implement this pattern using Logic Apps. Using what I had learnt from my previous blog on enforcing ordered delivery of messages  https://connectedcircuits.blog//wp-content/uploads/2017/08/26/enforcing-ordered-delivery-using-azure-logic-apps-and-service-bus/ I was able to design the solution below.

image

It is based on a single Logic App to publish the message onto a service bus topic and to retrieve all the responses from another service bus queue. I also wanted the solution to be flexible to decide which services the message should be scattered to by passing in a routing list which determines who to send the request message to.

The main component of the solution is the (Scatter/Gather)  logic app which sends the request message to the the service bus topic after setting subscription property values and a unique batch Id. In this example I have 3 sub-processor logic apps which will process the message in some form or another and then write a response message onto the service bus queue. Because sessions have been enabled on this queue and I am setting the session Id with the same BatchId that was sent with the message, we are able to correlate all the responses from to sub-processors within the same logic app instance that initially published the message onto the service bus topic.

After the Scatter/Gather logic app sends the message onto the service bus topic, it will go into a loop until all the response messages from the Process logic apps are received. It will then return the aggregated responses from the sub-process logic apps.

The whole solution can be broken down into 3 sections, setting up the Azure Service Bus, developing the Scatter-Gatherer Logic App workflow and the Sub-process Logic Apps subscribing to the topic.

Setting up the service bus

Lets start by setting up the Azure Service Bus. A Topic is provisioned to publish the request message with rules created for each subscriber. In this scenario I will have 3 subscriptions with default rules set to a property called MsgProcessor = ‘1’, MsgProcessor = ‘2’ and MsgProcessor = ‘3’ on each subscription respectively.

image

Next a Service Bus Queue is provisioned to receive all the response messages from the subscribers. Sessions must be enabled on this queue to allow a single consumer to process all messages with the same sessionId which in our case will be the Scatter/Gather Logic App.

image

Developing the Scatter/Gather Logic App

The next part to develop is the scatter-gather Logic App which is the real smarts of the solution. Below is the full workflow with all the actions collapsed. We will go through and expand each of the shapes.

image

This  workflow is triggered by a HTTP request, but can also be triggered by some other connectors.

image

Next is a series of variables required by the workflow listed below:

  • BatchId – This is used to group all the messages together and is used as the SessionId and is initialised with a random Guid.
  • CompositeMsg – This holds the aggregated response messages from the Processor logic apps.
  • IntemediateMsg – Used in the loop the temporary store the aggregated message.
  • ScatterCount – The number of sub-processors the request body was published to.
  • ResponseCount – Holds the current number of responses received from the Processing logic apps.

image

Once the variables have been setup, a parallel branch is used to scatter the HTTP Request body contents to the subscribing systems. For this demo I have an option to send the request body content to 3 different sub-processes.

image

To determine which sub-process to send the request body to, I pass a list of the process names to send the request body to as one of the HTTP Request Header properties as shown here:

“X-ScatterList”:”‘Process1′,’Process2’,’Process3′”

The filters on the parallel conditions are setup as below, where is checks if the list in the HTTP header contains the process name.

image

Expanding one of the condition tasks shows an action to send the request onto the Service Bus  Topic and to increment the ScatterCount variable by one.

image

Below is the “Send message processor 1”  service bus connector expanded to show the properties. The two important pieces of information are custom properties BatchId and MsgProcessor. The BatchId will be used to set the sessionId latter and the MsgProcessor is used by the subscription filter on the  service bus Topic.

image

The next step is to increment to Scatter count. This is how we keep track of the number sub-processors that the message was scattered to.

image

Once the request body content has been published to the service bus Topic, we cycle in a loop until we have received all the response messages from the service bus queue or the Until loop times out.

image

The first step in the loop is to get one new message at a time from the queue using the BatchId variable as the session Id. This is to ensure we only get messages off the queue matching this same Id.

image

Then we check if a message was found using the Condition action with this filter: @equals(length(body(‘Get_messages_from_a_queue_(peek-lock)’)), 1)

 

image

If a message was found on the service bus queue then the “If true” branch is executed and if no message was found the “If false” branch is executed setting a delay of a few seconds before iterating again.

Below are the actions inside of the “If true” branch.

image

First we check if this is the first message received off the queue using the following filter: @equals(variables(‘ResponseCount’), 0)

 

image

If it is, then we just set the variable “CompositeMsg” to the service bus queue content data.

image

As we are using the Service Bus Connector that is capable of returning multiple messages, we need to use an indexer to get to the first message. Below is the code view of the above action. Note when the sub-processor logic app puts a message onto the service bus, we base64 encode it, therefore we need to decode it back to string value.

image

Now if this was not the first message received of the service bus, we need to append it the other messages received. This is done with the following actions in the “If false” branch shown below.

image

First we need to copy the contents of the CompositeMsg variable into the “IntermediateMsg” variable. Then concatenate the message received of the service bus and the contents of the “IntemediateMsg” variable,  and then add them to the “CompositeMsg” variable. The syntax of the value for the Append intermediate to composite is shown below:

“value”: “@{concat(variables(‘IntermediateMsg’),’,’,base64ToString(body(‘Get_messages_from_a_queue_(peek-lock)’)?[0][‘ContentData’]))}”

After the message from the queue has been added or appended to the CompositeMsg variable, it is completed to remove it from the queue and the ResponseCount variable is incremented.

image

Once all the response messages have been received from the sub-process logic apps or the “Until loop” times out, we need to close the service bus queue session using the BatchId and return the composite response message.

image

Here I am just sending the composite response message to a RequestBin endpoint. Ideally you would place this onto another service bus and perhaps with the initial HTTP request body to tie everything together.

Sub-Processor Logic Apps

The last part of this solution is the sub-processor logic apps which subscribe to the service bus Topic and processes the message in some form or another before returning a response message onto the service bus queue. Using a separate logic app the manage the post processing of the initial request message provides scalability and separation from the  messaging orchestration components.

image

The properties of the trigger service bus connector is shown below which has a typical setup.

image

Next the delay task is there just to simulate processing of the message received from the topic. This where you would normally call another API endpoint or process the message in some other fashion. Remember the maximum lock duration is 5 minutes and if your process will take longer than this, you will need to renew the lock.

image

After the message has been processed, you will need to compose a response message to put back onto the service bus queue. The schema of the response message should be generic across all the sub-process logic apps to make it easier to parse latter on. For the demo a relatively simple schema will be used consisting of the received BatchId and the sub-processor logic app name.

image

Once the response message has been composed, it is placed onto the service bus queue ensuring the session Id is set to the BatchId that was sent with the message. Remember this queue has been provisioned with “Sessions Enabled”

image

The code view for the connector looks like this:

image

The last task of the workflow is to complete the topic subscription as shown here:

image

Testing the solution

We can use Postman to send the following request to the scatter-gather logic app. Also I am setting the scatter list header property to publish the message to all 3 sub-processor logic apps.

POST /workflows/…/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2FmanualHTTP/1.1
Host: prod-11.australiasoutheast.logic.azure.com:443
Content-Type: application/json
X-ScatterList: ‘Process1′,’Process2′,’Process3’
Cache-Control: no-cache
Postman-Token: 644c5cdb-fd60-4903-b3ec-2d3d6febfe7e

{
“OrderId” : “12”,
“Message” : “Hello1”
}

Using RequestBin we can see the aggregated response messages from all 3 sub-processors.

[{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process1"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process2"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process3"}]

What’s next

Further enhancements can be made by adding flexibility for the timeout of the Until loop as some responses may take hours/days  to send a response back.  When waiting for very long periods for a response to return, you may need to extend the delay period in the “Until loop” to avoid reaching the 5000 iteration limit.

Keep a watch for my next blog where I will show you to implement the Itinerary based routing integration pattern using Azure Logic Apps and  Service Bus.

Enjoy…