Integration Scatter-Gather pattern using Azure Logic Apps and Service Bus

Recently I have taken a role as the Integration Architect and tech lead for a project to integrate messaging between MS Dynamics 365  AX/CRM and 3rd party systems. This was a huge solution that took over 10 months to design and develop.  It consisted of developing over 60 Logic Apps.

With a large integration project like this were multiple entities are required to be updated across multiple systems in a transactional manner, I required a process to correlate and aggregate all the responses into one composite message before proceeding onto the next task in the workflow. The scatter-gatherer pattern was chosen as a  good candidate for this type of scenario.

The next problem was how to implement this pattern using Logic Apps. Using what I had learnt from my previous blog on enforcing ordered delivery of messages  https://connectedcircuits.blog//wp-content/uploads/2017/08/26/enforcing-ordered-delivery-using-azure-logic-apps-and-service-bus/ I was able to design the solution below.

image

It is based on a single Logic App to publish the message onto a service bus topic and to retrieve all the responses from another service bus queue. I also wanted the solution to be flexible to decide which services the message should be scattered to by passing in a routing list which determines who to send the request message to.

The main component of the solution is the (Scatter/Gather)  logic app which sends the request message to the the service bus topic after setting subscription property values and a unique batch Id. In this example I have 3 sub-processor logic apps which will process the message in some form or another and then write a response message onto the service bus queue. Because sessions have been enabled on this queue and I am setting the session Id with the same BatchId that was sent with the message, we are able to correlate all the responses from to sub-processors within the same logic app instance that initially published the message onto the service bus topic.

After the Scatter/Gather logic app sends the message onto the service bus topic, it will go into a loop until all the response messages from the Process logic apps are received. It will then return the aggregated responses from the sub-process logic apps.

The whole solution can be broken down into 3 sections, setting up the Azure Service Bus, developing the Scatter-Gatherer Logic App workflow and the Sub-process Logic Apps subscribing to the topic.

Setting up the service bus

Lets start by setting up the Azure Service Bus. A Topic is provisioned to publish the request message with rules created for each subscriber. In this scenario I will have 3 subscriptions with default rules set to a property called MsgProcessor = ‘1’, MsgProcessor = ‘2’ and MsgProcessor = ‘3’ on each subscription respectively.

image

Next a Service Bus Queue is provisioned to receive all the response messages from the subscribers. Sessions must be enabled on this queue to allow a single consumer to process all messages with the same sessionId which in our case will be the Scatter/Gather Logic App.

image

Developing the Scatter/Gather Logic App

The next part to develop is the scatter-gather Logic App which is the real smarts of the solution. Below is the full workflow with all the actions collapsed. We will go through and expand each of the shapes.

image

This  workflow is triggered by a HTTP request, but can also be triggered by some other connectors.

image

Next is a series of variables required by the workflow listed below:

  • BatchId – This is used to group all the messages together and is used as the SessionId and is initialised with a random Guid.
  • CompositeMsg – This holds the aggregated response messages from the Processor logic apps.
  • IntemediateMsg – Used in the loop the temporary store the aggregated message.
  • ScatterCount – The number of sub-processors the request body was published to.
  • ResponseCount – Holds the current number of responses received from the Processing logic apps.

image

Once the variables have been setup, a parallel branch is used to scatter the HTTP Request body contents to the subscribing systems. For this demo I have an option to send the request body content to 3 different sub-processes.

image

To determine which sub-process to send the request body to, I pass a list of the process names to send the request body to as one of the HTTP Request Header properties as shown here:

“X-ScatterList”:”‘Process1′,’Process2’,’Process3′”

The filters on the parallel conditions are setup as below, where is checks if the list in the HTTP header contains the process name.

image

Expanding one of the condition tasks shows an action to send the request onto the Service Bus  Topic and to increment the ScatterCount variable by one.

image

Below is the “Send message processor 1”  service bus connector expanded to show the properties. The two important pieces of information are custom properties BatchId and MsgProcessor. The BatchId will be used to set the sessionId latter and the MsgProcessor is used by the subscription filter on the  service bus Topic.

image

The next step is to increment to Scatter count. This is how we keep track of the number sub-processors that the message was scattered to.

image

Once the request body content has been published to the service bus Topic, we cycle in a loop until we have received all the response messages from the service bus queue or the Until loop times out.

image

The first step in the loop is to get one new message at a time from the queue using the BatchId variable as the session Id. This is to ensure we only get messages off the queue matching this same Id.

image

Then we check if a message was found using the Condition action with this filter: @equals(length(body(‘Get_messages_from_a_queue_(peek-lock)’)), 1)

 

image

If a message was found on the service bus queue then the “If true” branch is executed and if no message was found the “If false” branch is executed setting a delay of a few seconds before iterating again.

Below are the actions inside of the “If true” branch.

image

First we check if this is the first message received off the queue using the following filter: @equals(variables(‘ResponseCount’), 0)

 

image

If it is, then we just set the variable “CompositeMsg” to the service bus queue content data.

image

As we are using the Service Bus Connector that is capable of returning multiple messages, we need to use an indexer to get to the first message. Below is the code view of the above action. Note when the sub-processor logic app puts a message onto the service bus, we base64 encode it, therefore we need to decode it back to string value.

image

Now if this was not the first message received of the service bus, we need to append it the other messages received. This is done with the following actions in the “If false” branch shown below.

image

First we need to copy the contents of the CompositeMsg variable into the “IntermediateMsg” variable. Then concatenate the message received of the service bus and the contents of the “IntemediateMsg” variable,  and then add them to the “CompositeMsg” variable. The syntax of the value for the Append intermediate to composite is shown below:

“value”: “@{concat(variables(‘IntermediateMsg’),’,’,base64ToString(body(‘Get_messages_from_a_queue_(peek-lock)’)?[0][‘ContentData’]))}”

After the message from the queue has been added or appended to the CompositeMsg variable, it is completed to remove it from the queue and the ResponseCount variable is incremented.

image

Once all the response messages have been received from the sub-process logic apps or the “Until loop” times out, we need to close the service bus queue session using the BatchId and return the composite response message.

image

Here I am just sending the composite response message to a RequestBin endpoint. Ideally you would place this onto another service bus and perhaps with the initial HTTP request body to tie everything together.

Sub-Processor Logic Apps

The last part of this solution is the sub-processor logic apps which subscribe to the service bus Topic and processes the message in some form or another before returning a response message onto the service bus queue. Using a separate logic app the manage the post processing of the initial request message provides scalability and separation from the  messaging orchestration components.

image

The properties of the trigger service bus connector is shown below which has a typical setup.

image

Next the delay task is there just to simulate processing of the message received from the topic. This where you would normally call another API endpoint or process the message in some other fashion. Remember the maximum lock duration is 5 minutes and if your process will take longer than this, you will need to renew the lock.

image

After the message has been processed, you will need to compose a response message to put back onto the service bus queue. The schema of the response message should be generic across all the sub-process logic apps to make it easier to parse latter on. For the demo a relatively simple schema will be used consisting of the received BatchId and the sub-processor logic app name.

image

Once the response message has been composed, it is placed onto the service bus queue ensuring the session Id is set to the BatchId that was sent with the message. Remember this queue has been provisioned with “Sessions Enabled”

image

The code view for the connector looks like this:

image

The last task of the workflow is to complete the topic subscription as shown here:

image

Testing the solution

We can use Postman to send the following request to the scatter-gather logic app. Also I am setting the scatter list header property to publish the message to all 3 sub-processor logic apps.

POST /workflows/…/triggers/manual/paths/invoke?api-version=2016-10-01&sp=%2Ftriggers%2FmanualHTTP/1.1
Host: prod-11.australiasoutheast.logic.azure.com:443
Content-Type: application/json
X-ScatterList: ‘Process1′,’Process2′,’Process3’
Cache-Control: no-cache
Postman-Token: 644c5cdb-fd60-4903-b3ec-2d3d6febfe7e

{
“OrderId” : “12”,
“Message” : “Hello1”
}

Using RequestBin we can see the aggregated response messages from all 3 sub-processors.

[{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process1"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process2"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process3"}]

What’s next

Further enhancements can be made by adding flexibility for the timeout of the Until loop as some responses may take hours/days  to send a response back.  When waiting for very long periods for a response to return, you may need to extend the delay period in the “Until loop” to avoid reaching the 5000 iteration limit.

Keep a watch for my next blog where I will show you to implement the Itinerary based routing integration pattern using Azure Logic Apps and  Service Bus.

Enjoy…

2 Replies to “Integration Scatter-Gather pattern using Azure Logic Apps and Service Bus”

Leave a Reply

Your email address will not be published. Required fields are marked *