Azure APIM Scatter-Gather Pattern Policy

Recently I have been reading about some of the advanced policies available in APIM and the one that caught my eye was the “Wait” policy found here https://docs.microsoft.com/en-us/azure/api-management/api-management-advanced-policies. This policy will execute child policies in parallel and will wait until either one or all of the child polices have completed before continuing. 

I was pondering on a good use case for this policy and the first one came to mind was a scatter-gather pattern shown below. I could easily create this pattern without having to provision any other required Azure services.

 

image

The Scatter-Gather pattern is created by exposing a REST endpoint through APIM which will accept a JSON payload. When a request comes into APIM, a custom policy will forward this request onto the service endpoints defined in the policy and wait until they have all responded before continuing on with the rest of the statements in the parent policy. Once all the destination endpoints have responded, the return status code on each service is then checked for success or an error. If an error does occur, then the error is returned immediately otherwise the message from each service is combined into a composite JSON message which is returned back to the caller as the response message.

For the scenario, I will be sending a price request for a product to 3 suppliers simultaneously. In Azure APIM, I created a method which will accept a JSON price request message as shown below.

image

 

Then I added an Inbound processing policy to that method which is broken down into two parts. Below is the first part of the policy where it sends the inbound request to multiple endpoints in parallel using the <send-request> tag. Named values are used for the service endpoint address defined in the <set-url>{{serviceendpoint_xxx}}</set-url> tags. The response from each service call is inserted into the variable response-variable-name=”response_xxx” which is used in the second part of this policy.

<set-variable name="requestPayload" value="@(context.Request.Body.As&lt;string>(true))" />
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_3" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_3}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>

After the closing </wait> tag,  the response code from each of the services is checked inside a <choose> block. If its not equal to 200 then the response is immediately returned, otherwise each of the responses is wrap inside a JSON object and returned as the composite response message.
 
<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_3"]).StatusCode != 200)">
<return-response response-variable-name="response_3" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@(new JObject(
new JProperty("service_1",((IResponse)context.Variables["response_1"]).Body.As<JObject>()),
new JProperty("service_2",((IResponse)context.Variables["response_2"]).Body.As<JObject>()),
new JProperty("service_3",((IResponse)context.Variables["response_3"]).Body.As<JObject>())
).ToString())</set-body>
</return-response>
</otherwise>
</choose>
 
Below is the whole policy for the method. More service endpoints may be added or removed from this policy as desired.
<policies>
<inbound>
<set-variable name="requestPayload" value="@(context.Request.Body.As&lt;string>(true))" />
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_3" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_3}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>
<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_3"]).StatusCode != 200)">
<return-response response-variable-name="response_3" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@(new JObject(
new JProperty("service_1",((IResponse)context.Variables["response_1"]).Body.As<JObject>()),
new JProperty("service_2",((IResponse)context.Variables["response_2"]).Body.As<JObject>()),
new JProperty("service_3",((IResponse)context.Variables["response_3"]).Body.As<JObject>())
).ToString())</set-body>
</return-response>
</otherwise>
</choose>
</inbound>
<backend>
<base />
</backend>
<outbound>
<base />
</outbound>
<on-error>
<base />
</on-error>
</policies>

 
For testing purposes, I just created 3 simple logic apps triggered by a HTTP request which returned some static data. Using Postman, I sent the following message.
image
 
The response message from APIM is shown below.
 
image
 
Enjoy…

Scatter-Gather pattern using an Azure Event Grid

This is another version of the scatter-gather integration pattern which I previously blogged about some time ago.  The pervious version was based on using  service bus topics and Logic Apps which polled for new messages. Whilst this was adequate for a messaging system that did not require immediate request/response times, it was not an ideal solution for another project I was working on. This project required a website to initiate the request to a scatter-gather pattern and to have low latency response times of a few seconds.

With the recently introduced Event Grid service in Azure https://azure.microsoft.com/en-us/services/event-grid/, I was able to achieve request/responses in seconds using the scatter-gather pattern. This is possible because Event Grid provides event based notifications, unlike a service bus architecture which requires regular polling for new messages on the bus.

image

Using the design above enables the client to define how long to wait after the initial request before retrieving the responses from the services via the gatherer Logic App.

Note if you require a durable message delivery, I would still recommend using a service bus which provides peek-lock, FIFO, dead-lettering, transactional support.

Scenario

A website is required to validate a PO and display the customers details, the current stock level and the price when requesting information about a product. The information is to be sourced from 3 different API services, the customer information from CRM, product stock levels from an Inventory service and the product price from a Pricing service.

Approach

The website will send a request to a HTTPS Event Grid endpoint. This will publish the message to the 3 Logic App subscriptions which are all triggered by a HTTP request action.

One of the properties sent with each subscription is a message Id which will be used as the session Id for the service bus when returning the API response messages.

Each Logic App will map the request message to the required API format and wait for the API response message to be placed onto the service bus.

When the website is ready to receive the aggregated responses from the service bus, it will make a HTTP Get request to another Logic App which will then read all the messages from the service bus matching the session Id key and combine them into a single composite message.

Design

API Proxy Logic Apps

To keep the demo simple, each of the proxy Logic Apps will compose a static response message using some of the properties found in the request message. In a practical solution you would map the request message to the correct format required by the API service after the Parse JSON action and add a HTTP Request/Response action after the map to call the API service.

image

The details of each action is shown below.

Take note of each HTTP POST URL address in the logic apps as this will be required when setting up the subscriptions in Event Grid. The Request Body JSON Schema can be left blank.

image

Note an indexer is required for the body contents as it is normally sent as a collection.

image

Here we are composing a static  CRM response message to send back. The contents will be different for each proxy Logic App.

image

The composed message is put onto the service bus using the EventGrid Id as the Session Id. This is to group the messages from the other proxy Logic Apps. A custom property is used to indicate the source of the message and is used latter when aggregating the other response messages from other proxy Logic Apps.

image

The code view of the Send Message service bus connector shown below. Note the custom property “ServiceName”and the value “CRM”

image

The other 2 proxy Logic Apps for checking inventory and pricing are similar in design to this one. The only differences are the composing of the static response message and the value of the Service Name source property in the service bus connector.

Creating an Azure Event Grid

From the Azure Portal search for Event Grid and then click create.

image

Add a suitable name and the resource group.

image

Once the resource has been created, you will need to take a copy of the Access key and Topic Endpoint. This will be used latter when posting messages to this endpoint.

image

After the resource has been created, subscriptions will need to be created for each of the proxy Logic Apps using the HTTP POST URL addresses for the web hook endpoints. This is done by clicking the Event Subscription icon at the top menu bar.

image

We will need to create 3 separate subscriptions, one for each of the proxy Logic Apps. They all share the same Event Type and Prefix Filter, the only difference between them is the Name and the Subscriber Endpoint URL.

image

Once completed, you should have the following 3 subscriptions created.

image

Service Bus Queue

Next a SB queue is required to place the response messages returned from the API services that where called in each of the Micro Service proxy Logic Apps. The Basic pricing tier may be used as we do not require any topics to be created

.image

Once the Service Bus has been created, we need to create a queue with sessions enabled. Sessions are required to group all the messages with the same Id so they can processed together.

image

Message Gatherer Logic App

The purpose of this Logic App is to retrieve all the messages from the service bus matching a session Id and then to aggregate the messages into a single composite message. This Logic App is triggered by the website when it is ready to retrieve the composite responses from the services.

image

Details of each action shape is described below.

The “When a HTTP request is received” trigger is a GET method with the message {id} in the URL as a resource.

image

This variable is used to append all the API response messages.

image

Here we get all the messages from the service bus using the id from the query string of the HTTP Get as the Session Id.

image

We then iterate through all the messages received from the service bus queue and append each message to the responseMsg variable before completing the message in the queue.

image

Code view for the “Append to service response” is shown below. I am using the service bus custom property “ServiceName” that was set in the API Proxy Logic App for the message name.

image

Code view for the “Complete the message in a queue” is shown below.

image

Once all the messages have completed, we close the session in the queue using the Id from the query string of the HTTP Get.

image

The last step is to return the aggregated message as a typed JSON message.

image

Below is code view for the Response Action.

image

Console Application

To keep things simple I will use a console application to mimic a website publishing a request to the scatter-gather service. The same code can be put into a helper project and used by a website.

When publishing events to the Event Grid, it must conform to the following schema. The message to send to the Event Grid is placed in the “data” element.
{
“topic”: string,
“subject”: string,
“id”: string,
“eventType”: string,
“eventTime”: string,
“data”:{
object-unique-to-each-publisher
},
“dataVersion”: string,
“metadataVersion”: string
}
]

More information about the schema and properties can be found here:- https://docs.microsoft.com/en-us/azure/event-grid/event-schema

For the scatter-gather pattern I will be setting the “id” property to a Guid string value which will be used as the session key on the service bus. Both the eventType and subject properties will be used for the subscription filter.

I have created two custom objects to represent the Purchase Order and the EventGrid models.

Purchase Order Object
  1. public class PurchaseOrder
  2. {
  3.     public string OrderNumber { get; set; }
  4.     public string CustomerNumber { get; set; }
  5.     public string ProductCode { get; set; }
  6. }
Event Grid Object
  1. public class GridEvent<T> where T : class
  2.     {
  3.         public string Id { get; set; }
  4.         public string Subject { get; set; }
  5.         public string EventType { get; set; }
  6.         public T Data { get; set; }
  7.         public DateTime EventTime { get; set; }
  8.     }

The console application consists of 2 functions, one to send the request to the Event Grid and another to request the aggregated message from the Message Gatherer Logic App.

The “SendMsgAsync” function below accepts a event grid object as an argument and returns the status of the post action. It adds the parameter into a collection and serialises the object before posting it to the EventGrid. Remember to update the sasKey variable with the value from your EventGrid resource.

Send Message Func
  1. private static async Task<string> SendMsgAsync(GridEvent<PurchaseOrder> gridEvent)
  2.         {
  3.             var payloadList = new List<GridEvent<PurchaseOrder>>();
  4.             payloadList.Add(gridEvent);
  5.             string topicEndpoint = “https://scatter-eventgrid.westus2-1.eventgrid.azure.net/api/events”;
  6.             //add the event grid access key
  7.             string sasKey = “<Event Gris sas key>”;
  8.             // Event grid expects event data as JSON
  9.             var json = JsonConvert.SerializeObject(payloadList);
  10.             // Create request which will be sent to the topic
  11.             var content = new StringContent(json, Encoding.UTF8, “application/json”);
  12.             // Create a HTTP client which we will use to post to the Event Grid Topic
  13.             var httpClient = new HttpClient();
  14.             // Add key in the request headers
  15.             httpClient.DefaultRequestHeaders.Add(“aeg-sas-key”, sasKey);
  16.             // Send request
  17.             Console.WriteLine(“Sending event to Event Grid…”);
  18.             var result = await httpClient.PostAsync(topicEndpoint, content);
  19.             return result.ReasonPhrase;
  20.         }

 

The “ReadMsgAsync” function below accepts the batchId as an argument. It is used to make a HTTP Get to the API Gatherer Logic App to retrieve the composite message. It uses the batchId as a resource location in the call-back URL of the logic app.

You will need to add the {0} token after the …invoke/ resource location as highlighted. https://prod-27.australiasoutheast.logic.azure.com/workflows/16…/triggers/manual/paths/invoke/{0}?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=k-5w..

Read Message Func
  1. private static async Task<string> ReadMsgAsync( string batchId)
  2. {
  3.     var uri = string.Format(“<API Gatherer Logic App Callback url>”, batchId);
  4.     var httpClient = new HttpClient();
  5.     return await httpClient.GetStringAsync(uri);
  6. }

 

In the main function below, we create a sample PO and an EventGrid message object. We then call the SendMsgAsync function to post the message to the EventGrid in Azure. Then after waiting for a second we call the ReadMsgAsync function to read the composite message from the Logic App. Note we are using the batchId as the EventGrid object id to group all the response messages together from the API services.

Console Main Func
  1. static void Main(string[] args)
  2. {
  3.     //create id for for service bus sessions
  4.     var batchId = Guid.NewGuid().ToString();
  5.     var payload = new GridEvent<PurchaseOrder>
  6.     {
  7.         Data = new PurchaseOrder { OrderNumber = “1234”, CustomerNumber = “ABC100”, ProductCode = “PRD2043” },
  8.         EventTime = DateTime.UtcNow,
  9.         EventType = “orders”,
  10.         Id = batchId,
  11.         Subject = “po/validation”
  12.     };
  13.     var rc = SendMsgAsync(payload).Result;
  14.     Console.WriteLine($”Event sent with result:” + rc);
  15.     Thread.Sleep(1000);
  16.     var msg = ReadMsgAsync(batchId).Result;
  17.     Console.WriteLine(“Response Message…”);
  18.     Console.WriteLine(msg);
  19.     Console.ReadLine();
  20. }

Testing

Executing the console application displays the successful post to the EventGrid and the results from the API Gatherer Logic App after a few seconds.

image

Calling the ReadMsgAsync too quickly may not allow the API services to respond in time and some messages may be left out as shown here below.

image

Conclusion

Using the Azure EventGrid is very useful for scenarios where you want to publish a message to several subscribers that are triggered by web-hooks. If the web-hook does not acknowledge receipt of the message, then EventGrid will try again several times using an exponential back off  period from 10 seconds to an hour.

If you request the composite response message from the API services too soon, you may miss some of the responses from the Micro Services. It is advisable to check the composite message for any missing responses before continuing to process the composite message

The monthly running cost of this solution is very minimal when compared to a service bus topic version of this pattern which requires polling at regulars intervals. Where each poll incorporates a trigger cost.

This same pattern can also be used to design an asynchronous request/response messaging system also.

Enjoy…