Azure APIM Scatter-Gather Pattern Policy

Recently I have been reading about some of the advanced policies available in APIM and the one that caught my eye was the “Wait” policy found here https://docs.microsoft.com/en-us/azure/api-management/api-management-advanced-policies. This policy will execute child policies in parallel and will wait until either one or all of the child polices have completed before continuing. 

I was pondering on a good use case for this policy and the first one came to mind was a scatter-gather pattern shown below. I could easily create this pattern without having to provision any other required Azure services.

 

image

The Scatter-Gather pattern is created by exposing a REST endpoint through APIM which will accept a JSON payload. When a request comes into APIM, a custom policy will forward this request onto the service endpoints defined in the policy and wait until they have all responded before continuing on with the rest of the statements in the parent policy. Once all the destination endpoints have responded, the return status code on each service is then checked for success or an error. If an error does occur, then the error is returned immediately otherwise the message from each service is combined into a composite JSON message which is returned back to the caller as the response message.

For the scenario, I will be sending a price request for a product to 3 suppliers simultaneously. In Azure APIM, I created a method which will accept a JSON price request message as shown below.

image

 

Then I added an Inbound processing policy to that method which is broken down into two parts. Below is the first part of the policy where it sends the inbound request to multiple endpoints in parallel using the <send-request> tag. Named values are used for the service endpoint address defined in the <set-url>{{serviceendpoint_xxx}}</set-url> tags. The response from each service call is inserted into the variable response-variable-name=”response_xxx” which is used in the second part of this policy.

<set-variable name="requestPayload" value="@(context.Request.Body.As&lt;string>(true))" />
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_3" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_3}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>

After the closing </wait> tag,  the response code from each of the services is checked inside a <choose> block. If its not equal to 200 then the response is immediately returned, otherwise each of the responses is wrap inside a JSON object and returned as the composite response message.
 
<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_3"]).StatusCode != 200)">
<return-response response-variable-name="response_3" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@(new JObject(
new JProperty("service_1",((IResponse)context.Variables["response_1"]).Body.As<JObject>()),
new JProperty("service_2",((IResponse)context.Variables["response_2"]).Body.As<JObject>()),
new JProperty("service_3",((IResponse)context.Variables["response_3"]).Body.As<JObject>())
).ToString())</set-body>
</return-response>
</otherwise>
</choose>
 
Below is the whole policy for the method. More service endpoints may be added or removed from this policy as desired.
<policies>
<inbound>
<set-variable name="requestPayload" value="@(context.Request.Body.As&lt;string>(true))" />
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_3" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_3}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>
<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_3"]).StatusCode != 200)">
<return-response response-variable-name="response_3" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@(new JObject(
new JProperty("service_1",((IResponse)context.Variables["response_1"]).Body.As<JObject>()),
new JProperty("service_2",((IResponse)context.Variables["response_2"]).Body.As<JObject>()),
new JProperty("service_3",((IResponse)context.Variables["response_3"]).Body.As<JObject>())
).ToString())</set-body>
</return-response>
</otherwise>
</choose>
</inbound>
<backend>
<base />
</backend>
<outbound>
<base />
</outbound>
<on-error>
<base />
</on-error>
</policies>

 
For testing purposes, I just created 3 simple logic apps triggered by a HTTP request which returned some static data. Using Postman, I sent the following message.
image
 
The response message from APIM is shown below.
 
image
 
Enjoy…

Scatter-Gather pattern using an Azure Event Grid

This is another version of the scatter-gather integration pattern which I previously blogged about some time ago.  The pervious version was based on using  service bus topics and Logic Apps which polled for new messages. Whilst this was adequate for a messaging system that did not require immediate request/response times, it was not an ideal solution for another project I was working on. This project required a website to initiate the request to a scatter-gather pattern and to have low latency response times of a few seconds.

With the recently introduced Event Grid service in Azure https://azure.microsoft.com/en-us/services/event-grid/, I was able to achieve request/responses in seconds using the scatter-gather pattern. This is possible because Event Grid provides event based notifications, unlike a service bus architecture which requires regular polling for new messages on the bus.

image

Using the design above enables the client to define how long to wait after the initial request before retrieving the responses from the services via the gatherer Logic App.

Note if you require a durable message delivery, I would still recommend using a service bus which provides peek-lock, FIFO, dead-lettering, transactional support.

Scenario

A website is required to validate a PO and display the customers details, the current stock level and the price when requesting information about a product. The information is to be sourced from 3 different API services, the customer information from CRM, product stock levels from an Inventory service and the product price from a Pricing service.

Approach

The website will send a request to a HTTPS Event Grid endpoint. This will publish the message to the 3 Logic App subscriptions which are all triggered by a HTTP request action.

One of the properties sent with each subscription is a message Id which will be used as the session Id for the service bus when returning the API response messages.

Each Logic App will map the request message to the required API format and wait for the API response message to be placed onto the service bus.

When the website is ready to receive the aggregated responses from the service bus, it will make a HTTP Get request to another Logic App which will then read all the messages from the service bus matching the session Id key and combine them into a single composite message.

Design

API Proxy Logic Apps

To keep the demo simple, each of the proxy Logic Apps will compose a static response message using some of the properties found in the request message. In a practical solution you would map the request message to the correct format required by the API service after the Parse JSON action and add a HTTP Request/Response action after the map to call the API service.

image

The details of each action is shown below.

Take note of each HTTP POST URL address in the logic apps as this will be required when setting up the subscriptions in Event Grid. The Request Body JSON Schema can be left blank.

image

Note an indexer is required for the body contents as it is normally sent as a collection.

image

Here we are composing a static  CRM response message to send back. The contents will be different for each proxy Logic App.

image

The composed message is put onto the service bus using the EventGrid Id as the Session Id. This is to group the messages from the other proxy Logic Apps. A custom property is used to indicate the source of the message and is used latter when aggregating the other response messages from other proxy Logic Apps.

image

The code view of the Send Message service bus connector shown below. Note the custom property “ServiceName”and the value “CRM”

image

The other 2 proxy Logic Apps for checking inventory and pricing are similar in design to this one. The only differences are the composing of the static response message and the value of the Service Name source property in the service bus connector.

Creating an Azure Event Grid

From the Azure Portal search for Event Grid and then click create.

image

Add a suitable name and the resource group.

image

Once the resource has been created, you will need to take a copy of the Access key and Topic Endpoint. This will be used latter when posting messages to this endpoint.

image

After the resource has been created, subscriptions will need to be created for each of the proxy Logic Apps using the HTTP POST URL addresses for the web hook endpoints. This is done by clicking the Event Subscription icon at the top menu bar.

image

We will need to create 3 separate subscriptions, one for each of the proxy Logic Apps. They all share the same Event Type and Prefix Filter, the only difference between them is the Name and the Subscriber Endpoint URL.

image

Once completed, you should have the following 3 subscriptions created.

image

Service Bus Queue

Next a SB queue is required to place the response messages returned from the API services that where called in each of the Micro Service proxy Logic Apps. The Basic pricing tier may be used as we do not require any topics to be created

.image

Once the Service Bus has been created, we need to create a queue with sessions enabled. Sessions are required to group all the messages with the same Id so they can processed together.

image

Message Gatherer Logic App

The purpose of this Logic App is to retrieve all the messages from the service bus matching a session Id and then to aggregate the messages into a single composite message. This Logic App is triggered by the website when it is ready to retrieve the composite responses from the services.

image

Details of each action shape is described below.

The “When a HTTP request is received” trigger is a GET method with the message {id} in the URL as a resource.

image

This variable is used to append all the API response messages.

image

Here we get all the messages from the service bus using the id from the query string of the HTTP Get as the Session Id.

image

We then iterate through all the messages received from the service bus queue and append each message to the responseMsg variable before completing the message in the queue.

image

Code view for the “Append to service response” is shown below. I am using the service bus custom property “ServiceName” that was set in the API Proxy Logic App for the message name.

image

Code view for the “Complete the message in a queue” is shown below.

image

Once all the messages have completed, we close the session in the queue using the Id from the query string of the HTTP Get.

image

The last step is to return the aggregated message as a typed JSON message.

image

Below is code view for the Response Action.

image

Console Application

To keep things simple I will use a console application to mimic a website publishing a request to the scatter-gather service. The same code can be put into a helper project and used by a website.

When publishing events to the Event Grid, it must conform to the following schema. The message to send to the Event Grid is placed in the “data” element.
{
“topic”: string,
“subject”: string,
“id”: string,
“eventType”: string,
“eventTime”: string,
“data”:{
object-unique-to-each-publisher
},
“dataVersion”: string,
“metadataVersion”: string
}
]

More information about the schema and properties can be found here:- https://docs.microsoft.com/en-us/azure/event-grid/event-schema

For the scatter-gather pattern I will be setting the “id” property to a Guid string value which will be used as the session key on the service bus. Both the eventType and subject properties will be used for the subscription filter.

I have created two custom objects to represent the Purchase Order and the EventGrid models.

Purchase Order Object
  1. public class PurchaseOrder
  2. {
  3.     public string OrderNumber { get; set; }
  4.     public string CustomerNumber { get; set; }
  5.     public string ProductCode { get; set; }
  6. }
Event Grid Object
  1. public class GridEvent<T> where T : class
  2.     {
  3.         public string Id { get; set; }
  4.         public string Subject { get; set; }
  5.         public string EventType { get; set; }
  6.         public T Data { get; set; }
  7.         public DateTime EventTime { get; set; }
  8.     }

The console application consists of 2 functions, one to send the request to the Event Grid and another to request the aggregated message from the Message Gatherer Logic App.

The “SendMsgAsync” function below accepts a event grid object as an argument and returns the status of the post action. It adds the parameter into a collection and serialises the object before posting it to the EventGrid. Remember to update the sasKey variable with the value from your EventGrid resource.

Send Message Func
  1. private static async Task<string> SendMsgAsync(GridEvent<PurchaseOrder> gridEvent)
  2.         {
  3.             var payloadList = new List<GridEvent<PurchaseOrder>>();
  4.             payloadList.Add(gridEvent);
  5.             string topicEndpoint = “https://scatter-eventgrid.westus2-1.eventgrid.azure.net/api/events”;
  6.             //add the event grid access key
  7.             string sasKey = “<Event Gris sas key>”;
  8.             // Event grid expects event data as JSON
  9.             var json = JsonConvert.SerializeObject(payloadList);
  10.             // Create request which will be sent to the topic
  11.             var content = new StringContent(json, Encoding.UTF8, “application/json”);
  12.             // Create a HTTP client which we will use to post to the Event Grid Topic
  13.             var httpClient = new HttpClient();
  14.             // Add key in the request headers
  15.             httpClient.DefaultRequestHeaders.Add(“aeg-sas-key”, sasKey);
  16.             // Send request
  17.             Console.WriteLine(“Sending event to Event Grid…”);
  18.             var result = await httpClient.PostAsync(topicEndpoint, content);
  19.             return result.ReasonPhrase;
  20.         }

 

The “ReadMsgAsync” function below accepts the batchId as an argument. It is used to make a HTTP Get to the API Gatherer Logic App to retrieve the composite message. It uses the batchId as a resource location in the call-back URL of the logic app.

You will need to add the {0} token after the …invoke/ resource location as highlighted. https://prod-27.australiasoutheast.logic.azure.com/workflows/16…/triggers/manual/paths/invoke/{0}?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=k-5w..

Read Message Func
  1. private static async Task<string> ReadMsgAsync( string batchId)
  2. {
  3.     var uri = string.Format(“<API Gatherer Logic App Callback url>”, batchId);
  4.     var httpClient = new HttpClient();
  5.     return await httpClient.GetStringAsync(uri);
  6. }

 

In the main function below, we create a sample PO and an EventGrid message object. We then call the SendMsgAsync function to post the message to the EventGrid in Azure. Then after waiting for a second we call the ReadMsgAsync function to read the composite message from the Logic App. Note we are using the batchId as the EventGrid object id to group all the response messages together from the API services.

Console Main Func
  1. static void Main(string[] args)
  2. {
  3.     //create id for for service bus sessions
  4.     var batchId = Guid.NewGuid().ToString();
  5.     var payload = new GridEvent<PurchaseOrder>
  6.     {
  7.         Data = new PurchaseOrder { OrderNumber = “1234”, CustomerNumber = “ABC100”, ProductCode = “PRD2043” },
  8.         EventTime = DateTime.UtcNow,
  9.         EventType = “orders”,
  10.         Id = batchId,
  11.         Subject = “po/validation”
  12.     };
  13.     var rc = SendMsgAsync(payload).Result;
  14.     Console.WriteLine($”Event sent with result:” + rc);
  15.     Thread.Sleep(1000);
  16.     var msg = ReadMsgAsync(batchId).Result;
  17.     Console.WriteLine(“Response Message…”);
  18.     Console.WriteLine(msg);
  19.     Console.ReadLine();
  20. }

Testing

Executing the console application displays the successful post to the EventGrid and the results from the API Gatherer Logic App after a few seconds.

image

Calling the ReadMsgAsync too quickly may not allow the API services to respond in time and some messages may be left out as shown here below.

image

Conclusion

Using the Azure EventGrid is very useful for scenarios where you want to publish a message to several subscribers that are triggered by web-hooks. If the web-hook does not acknowledge receipt of the message, then EventGrid will try again several times using an exponential back off  period from 10 seconds to an hour.

If you request the composite response message from the API services too soon, you may miss some of the responses from the Micro Services. It is advisable to check the composite message for any missing responses before continuing to process the composite message

The monthly running cost of this solution is very minimal when compared to a service bus topic version of this pattern which requires polling at regulars intervals. Where each poll incorporates a trigger cost.

This same pattern can also be used to design an asynchronous request/response messaging system also.

Enjoy…

Content based message routing using Azure Logic Apps, Function and Service Bus

Content Based Routing (CBR) is another pattern used in the integration world. The contents of the message determines the endpoint of the message.

This article will describe one option to develop this pattern using an Azure Service Bus, an Azure Function and a Logic App.

Basically the service bus will be used to route the message to the correct endpoint using topics and subscriptions. The Azure Function is used to host and execute the business rules to inspect the message contents and set the routing properties. A logic app is used to accept the message and call the function passing the received message as an argument. Once the function executes the business rules, it will return the routing properties in the response body. The routing information is then used to set the properties on the service bus API connector in the Logic App before publishing the message onto the bus.

image

Scenario

To demonstrate a typical use-case of this pattern we have 2 message types, Sales Orders (SO) and Purchase Orders (PO). For the SO I want to send the order to a priority queue if the total sales amount is over a particular value. And for a PO, it should be sent to a pre-approval queue if the order value is under a specified amount.

Here is an example of a SO message to be routed:

image

And an example of a PO being sent:

image

Solution

The real smarts of this solution is the function which will return a common JSON response message to set the values for the Topic name and the custom properties on the service bus connector. The fields of the response message are described below.

  • TopicName – the name of service bus topic to send the message to.
  • CBRFilter_1 – used by the subscription rule to filter the value on. Depending on your own requirements you may need more fields to filter more granular.
  • RuleSetVersion – used by the subscription rule to filter the value on. It’s a good idea to have this field as you may have several versions of this rule in play at any one time.

Let’s start with provisioning the service bus topics and subscriptions for the 2 types of messages. First create 2 topics called purchaseorder and salesorder.

 

image

Now add the following subscriptions and rules for each of the topics.

Topic Name Subscription Name Rule
purchaseorder Approved_V1.00 CBRFilter_1 = ‘Approved’ and RuleSetVersion = ‘1.00’
purchaseorder NotApproved_V1.00 CBRFilter_1 = ‘ApprovedNot’ and RuleSetVersion = ‘1.00’
salesorder HighPriority_V1.00 CBRFilter_1 = ‘PriorityHigh’ and RuleSetVersion = ‘1.00’
salesorder LowPriority_V1.00 CBRFilter_1 = ‘PriorityLow’ and RuleSetVersion = ‘1.00’

Next is the development of the Azure function. This is best developed in Visual Studio where you can include a Unit Test project to each of the rules. Add a new Azure Function project to your solution.

image

After the project has been created, right click on the function and click Add -> New Item. Choose Azure Function, give it a name and select the Http trigger option.

image

Below is code for the HTTP trigger function which includes the class definition for the RoutingProperties object. I am checking for specific elements SalesOrderNumber, PurchaseOrderNumber in the JSON message to determine the type of message and which determines what rule code block to execute. Each rule block code will first set the TopicName and RuleSetVersion properties.

public static class CBRRule

   {

       [FunctionName("CBRRule")]

       public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Function,  "post", Route = null)]HttpRequestMessage req, TraceWriter log)

       {

           log.Info("C# HTTP trigger function processed a request.");

           var routingProperties = new RoutingProperties();


           // Get request body

           JObject data = await req.Content.ReadAsAsync<JObject>();


           //Is this a  sales order message type           

           if (data != null && data["SalesOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "PriorityLow";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "SalesOrder";


               var lineItems = data["Lines"];

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);


               //if the total sales is greater than $1000 send the message to the high priority queue

               if (totalSaleAmount > 1000)

                   routingProperties.CBRFilter_1 = "PriorityHigh";

           }


           //Is this a purchase order message type           

           if (data != null && data["PurchaseOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "ApprovedNot";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "PurchaseOrder";


               var lineItems = data["Lines"];

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);


               //Approve PO if the total order price is less than $500

               if (totalSaleAmount < 500)

                   routingProperties.CBRFilter_1 = "Approved";

           }


           return req.CreateResponse(HttpStatusCode.OK, routingProperties);

       }


       /// <summary>

       /// Response message to set the custom routing properties of the service bus

       /// </summary>

       public class RoutingProperties

       {

           public string TopicName { get; set; }

           public string CBRFilter_1 { get; set; }

           public string RuleSetVersion { get; set; }


           public RoutingProperties()

           {

               this.CBRFilter_1 = "Unknown";

               this.RuleSetVersion = "Unknown";

               this.TopicName = "Unknown";

           }

       }


   }

The business rule for a SO aggregates all the line items and checks if the total amount is greater than 1000, if it is then set the property CBRFilter_1 to “PriorityHigh”.

The business rule for a PO also aggregates all the line items and checks if the total amount is less than 500, if it is then set the property CBRFilter to “Approved”.

With the following input message sent to the function:

clip_image001

The output of the function should look similar to this below

:clip_image001[6]

Now we need to publish the function from Visual Studio to your Azure resource group using the publishing wizard.

clip_image002

The last component of this solution is the Logic App which is triggered by an HTTP Request API and then calls the Azure function created above. The basic flow looks like this below.

clip_image004

The HTTP Request trigger has no request body JSON schema created. The trigger must accept any type of message.

clip_image006

Add an Azure Function after the trigger action and select the method called “CBRRule”

clip_image008

Set the Request Body to the trigger body content and the Method to “POST”

clip_image010

Next add a Service Bus action and set the properties as shown. Both the Queue/Topic name and Properties are set from the function response message

.clip_image012

Here is the code view of the Send Message action showing how the properties are set.

clip_image014

Testing

Using PostMan we can send sample messages to the Logic App and then see them sitting on the service bus waiting to be processed by some other method.

At the moment the service bus should have no messages waiting to be processed as shown

.clip_image015

Using PostMan to send the following PO, we should see this message end up in the purchaseorder/NotApproved subscription.

clip_image016

All going well, the message will arrive at the correct subscription waiting to be processed as shown below using Service Bus Explorer.

clip_image018

Sending the following SO will also route the message to the correct subscriber.

clip_image019

clip_image021

Conclusion

CBR can be easily achievable using an Azure Function to execute your business rules on a message to set up the routing properties. Taking this a step further, I would abstract the business rules for each message type in its own class for manageability.

Also it is advisable to setup a Unit Test project for each of the classes holding the business rules to ensure 100% code testing coverage.

Enjoy…

Integration Scatter-Gather pattern using Azure Logic Apps and Service Bus

Recently I have taken a role as the Integration Architect and tech lead for a project to integrate messaging between MS Dynamics 365  AX/CRM and 3rd party systems. This was a huge solution that took over 10 months to design and develop.  It consisted of developing over 60 Logic Apps.

With a large integration project like this were multiple entities are required to be updated across multiple systems in a transactional manner, I required a process to correlate and aggregate all the responses into one composite message before proceeding onto the next task in the workflow. The scatter-gatherer pattern was chosen as a  good candidate for this type of scenario.

The next problem was how to implement this pattern using Logic Apps. Using what I had learnt from my previous blog on enforcing ordered delivery of messages  https://connectedcircuits.blog//wp-content/uploads/2017/08/26/enforcing-ordered-delivery-using-azure-logic-apps-and-service-bus/ I was able to design the solution below.

image

It is based on a single Logic App to publish the message onto a service bus topic and to retrieve all the responses from another service bus queue. I also wanted the solution to be flexible to decide which services the message should be scattered to by passing in a routing list which determines who to send the request message to.

The main component of the solution is the (Scatter/Gather)  logic app which sends the request message to the the service bus topic after setting subscription property values and a unique batch Id. In this example I have 3 sub-processor logic apps which will process the message in some form or another and then write a response message onto the service bus queue. Because sessions have been enabled on this queue and I am setting the session Id with the same BatchId that was sent with the message, we are able to correlate all the responses from to sub-processors within the same logic app instance that initially published the message onto the service bus topic.

After the Scatter/Gather logic app sends the message onto the service bus topic, it will go into a loop until all the response messages from the Process logic apps are received. It will then return the aggregated responses from the sub-process logic apps.

The whole solution can be broken down into 3 sections, setting up the Azure Service Bus, developing the Scatter-Gatherer Logic App workflow and the Sub-process Logic Apps subscribing to the topic.

Setting up the service bus

Lets start by setting up the Azure Service Bus. A Topic is provisioned to publish the request message with rules created for each subscriber. In this scenario I will have 3 subscriptions with default rules set to a property called MsgProcessor = ‘1’, MsgProcessor = ‘2’ and MsgProcessor = ‘3’ on each subscription respectively.

image

Next a Service Bus Queue is provisioned to receive all the response messages from the subscribers. Sessions must be enabled on this queue to allow a single consumer to process all messages with the same sessionId which in our case will be the Scatter/Gather Logic App.

image

Developing the Scatter/Gather Logic App

The next part to develop is the scatter-gather Logic App which is the real smarts of the solution. Below is the full workflow with all the actions collapsed. We will go through and expand each of the shapes.

image

This  workflow is triggered by a HTTP request, but can also be triggered by some other connectors.

image

Next is a series of variables required by the workflow listed below:

  • BatchId – This is used to group all the messages together and is used as the SessionId and is initialised with a random Guid.
  • CompositeMsg – This holds the aggregated response messages from the Processor logic apps.
  • IntemediateMsg – Used in the loop the temporary store the aggregated message.
  • ScatterCount – The number of sub-processors the request body was published to.
  • ResponseCount – Holds the current number of responses received from the Processing logic apps.

image

Once the variables have been setup, a parallel branch is used to scatter the HTTP Request body contents to the subscribing systems. For this demo I have an option to send the request body content to 3 different sub-processes.

image

To determine which sub-process to send the request body to, I pass a list of the process names to send the request body to as one of the HTTP Request Header properties as shown here:

“X-ScatterList”:”‘Process1′,’Process2’,’Process3′”

The filters on the parallel conditions are setup as below, where is checks if the list in the HTTP header contains the process name.

image

Expanding one of the condition tasks shows an action to send the request onto the Service Bus  Topic and to increment the ScatterCount variable by one.

image

Below is the “Send message processor 1”  service bus connector expanded to show the properties. The two important pieces of information are custom properties BatchId and MsgProcessor. The BatchId will be used to set the sessionId latter and the MsgProcessor is used by the subscription filter on the  service bus Topic.

image

The next step is to increment to Scatter count. This is how we keep track of the number sub-processors that the message was scattered to.

image

Once the request body content has been published to the service bus Topic, we cycle in a loop until we have received all the response messages from the service bus queue or the Until loop times out.

image

The first step in the loop is to get one new message at a time from the queue using the BatchId variable as the session Id. This is to ensure we only get messages off the queue matching this same Id.

image

Then we check if a message was found using the Condition action with this filter: @equals(length(body(‘Get_messages_from_a_queue_(peek-lock)’)), 1)

 

image

If a message was found on the service bus queue then the “If true” branch is executed and if no message was found the “If false” branch is executed setting a delay of a few seconds before iterating again.

Below are the actions inside of the “If true” branch.

image

First we check if this is the first message received off the queue using the following filter: @equals(variables(‘ResponseCount’), 0)

 

image

If it is, then we just set the variable “CompositeMsg” to the service bus queue content data.

image

As we are using the Service Bus Connector that is capable of returning multiple messages, we need to use an indexer to get to the first message. Below is the code view of the above action. Note when the sub-processor logic app puts a message onto the service bus, we base64 encode it, therefore we need to decode it back to string value.

image

Now if this was not the first message received of the service bus, we need to append it the other messages received. This is done with the following actions in the “If false” branch shown below.

image

First we need to copy the contents of the CompositeMsg variable into the “IntermediateMsg” variable. Then concatenate the message received of the service bus and the contents of the “IntemediateMsg” variable,  and then add them to the “CompositeMsg” variable. The syntax of the value for the Append intermediate to composite is shown below:

“value”: “@{concat(variables(‘IntermediateMsg’),’,’,base64ToString(body(‘Get_messages_from_a_queue_(peek-lock)’)?[0][‘ContentData’]))}”

After the message from the queue has been added or appended to the CompositeMsg variable, it is completed to remove it from the queue and the ResponseCount variable is incremented.

image

Once all the response messages have been received from the sub-process logic apps or the “Until loop” times out, we need to close the service bus queue session using the BatchId and return the composite response message.

image

Here I am just sending the composite response message to a RequestBin endpoint. Ideally you would place this onto another service bus and perhaps with the initial HTTP request body to tie everything together.

Sub-Processor Logic Apps

The last part of this solution is the sub-processor logic apps which subscribe to the service bus Topic and processes the message in some form or another before returning a response message onto the service bus queue. Using a separate logic app the manage the post processing of the initial request message provides scalability and separation from the  messaging orchestration components.

image

The properties of the trigger service bus connector is shown below which has a typical setup.

image

Next the delay task is there just to simulate processing of the message received from the topic. This where you would normally call another API endpoint or process the message in some other fashion. Remember the maximum lock duration is 5 minutes and if your process will take longer than this, you will need to renew the lock.

image

After the message has been processed, you will need to compose a response message to put back onto the service bus queue. The schema of the response message should be generic across all the sub-process logic apps to make it easier to parse latter on. For the demo a relatively simple schema will be used consisting of the received BatchId and the sub-processor logic app name.

image

Once the response message has been composed, it is placed onto the service bus queue ensuring the session Id is set to the BatchId that was sent with the message. Remember this queue has been provisioned with “Sessions Enabled”

image

The code view for the connector looks like this:

image

The last task of the workflow is to complete the topic subscription as shown here:

image

Testing the solution

We can use Postman to send the following request to the scatter-gather logic app. Also I am setting the scatter list header property to publish the message to all 3 sub-processor logic apps.

POST /workflows/…/triggers/manual/paths/invoke?api-version=2016-10-01&amp;sp=%2Ftriggers%2FmanualHTTP/1.1
Host: prod-11.australiasoutheast.logic.azure.com:443
Content-Type: application/json
X-ScatterList: ‘Process1′,’Process2′,’Process3’
Cache-Control: no-cache
Postman-Token: 644c5cdb-fd60-4903-b3ec-2d3d6febfe7e

{
“OrderId” : “12”,
“Message” : “Hello1”
}

Using RequestBin we can see the aggregated response messages from all 3 sub-processors.

[{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process1"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process2"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process3"}]

What’s next

Further enhancements can be made by adding flexibility for the timeout of the Until loop as some responses may take hours/days  to send a response back.  When waiting for very long periods for a response to return, you may need to extend the delay period in the “Until loop” to avoid reaching the 5000 iteration limit.

Keep a watch for my next blog where I will show you to implement the Itinerary based routing integration pattern using Azure Logic Apps and  Service Bus.

Enjoy…

Enforcing Ordered Delivery using Azure Logic Apps and Service Bus

When consuming messages from an Azure service bus the order may not be guaranteed due to the brokered based messaging scheme where multiple consumers can consume messages from the bus. Sure you can force the Logic App to execute as a single instance but then you sacrifice performance and scalability. You can also use ReceiveAndDelete but then you loose the transactional nature of the bus. Ultimately to ensure a message is consumed in the correct order using the transactional nature of the bus you would add a sequence number to each message and use this to enforce the ordering.

To achieve ordered delivery using Logic Apps, you would need to ensure all related messages are consumed by the same Logic App instance and for this we use the session Id property on the service bus. Below is the full workflow process to force ordered delivery using Logic Apps and session Id’s on the service bus subscription.

image

This scenario is based on a financial institution which requires all monetary transfers to be processed in an ordered fashion.  The key is choosing a suitable session identifier and with this in mind, the account number was the most suitable candidate as we want a single consumer to process all the transactions for a particular account number.

Here we have created a subscription for a topic called AccountTransfers. Note the Enabled sessions is checked.

image

Once the service bus has been configured, we can now dissect the workflow to see how we can achieve ordered delivery.

The workflow is initiated by a pooling Service Bus Connector. The properties of this connector are shown below. The key point here is to set the Session id to “Next Available”. This forces the Logic App to create a new instance for each unique session id value found on the service bus.

image

The next action “ProcessSBMessage” is used to call another logic app which does the processing of the message found on the bus. Here I am just passing the raw base64 encoded message from the Service Bus Trigger action. Using the pattern “separation of concerns” moves the business logic  away from the process of ensuring ordered delivery.

image

Once the message has been sent to the chained Logic App and a response has been returned, we can complete the message from bus with the following action.

image

Next we go into a loop until the exit condition has been satisfied. I am going to use a counter that is incremented if no messages are found on the service bus. If no more messages are found on the service bus after 30 seconds, the loop will exit.

image

The loop inside starts with another service bus connector trigger which gets the messages from the topic subscription. Here we only want to retrieve one message at a time from the service bus using a peek-lock trigger and using the Session Id from the initial service bus trigger “When a message is received in a topic subscription”.  We then check if a message is found in the output body using the expression “@not(equals(length(body(‘Get_messages_from_a_topic_subscription_(peek-lock)’)), 0))

image

If a message is found, the “If True” branch is executed which again calls the same Logic App as before to process the message. Note the indexer to get to the context data as the service bus connector trigger above returns a collection.

image

Once a successful response is received from the ProcessSBMessage Logic App, the message is completed and the LoopCounter variable is reset to zero. Note the lock token is from the service bus connector trigger within the loop and the Session Id is from the initial service bus connector which started the workflow.

image

Below is the code view for setting the lockToken and SessionId of the “Complete the message” action inside the loop.  Take note of the indexer “[0]” before the LockToken element.

image

If no messages are found on the service bus, the False branch is then executed. This simply has a delay action as not to pool too quickly and increments the LoopCounter.

image

The last step is to close the session when the Until loop exists using the Session Id from the initial service bus connector trigger which started the workflow.

image

Now you are ready to the send messages into the service bus. You should see a Logic App spin up for each unique session Id. Remember to set the session Id property on the service bus to some value before sending the message.

Enjoy…

Robust Cloud Integration with Azure

For the last year I have been busy co-authoring this book on Azure Cloud Integration with my follow co-authors Abhishek Kumarm, Martin Abbott, Gyanendra Kumar Gautam, James Corbould and Ashish Bhanbhani.

Image result for robust cloud integration with azure

It is available on the Packt website here: https://www.packtpub.com/virtualization-and-cloud/robust-cloud-integration-azure

This book will teach you how to design and implement cloud integration using Microsoft Azure. It starts by showing you how to build, deploy, and secure the API app. Next, it introduces you to Logic Apps and helps you quickly start building your integration applications. We’ll then go through the different connectors available for Logic Apps to build your automated business process workflow. Its packed with a lot of information spanning just under 700 pages.

Don’t forget to check out another publication I co-authored back in 2015 with with Mark Brimble, Johann Cooper and Colin Dijkgraaf called SOA Patterns with BizTalk Server 2013 and Microsoft Azure.

SOA Patterns with BizTalk Server 2013 and Microsoft Azure - Second Edition Book Cover

And it is still available from the Packt website here: https://www.packtpub.com/networking-and-servers/soa-patterns-biztalk-server-2013-second-edition

Hope you enjoy reading it, just as I enjoyed writing the content.