Scatter-Gather pattern using an Azure Event Grid

This is another version of the scatter-gather integration pattern which I previously blogged about some time ago.  The pervious version was based on using  service bus topics and Logic Apps which polled for new messages. Whilst this was adequate for a messaging system that did not require immediate request/response times, it was not an ideal solution for another project I was working on. This project required a website to initiate the request to a scatter-gather pattern and to have low latency response times of a few seconds.

With the recently introduced Event Grid service in Azure https://azure.microsoft.com/en-us/services/event-grid/, I was able to achieve request/responses in seconds using the scatter-gather pattern. This is possible because Event Grid provides event based notifications, unlike a service bus architecture which requires regular polling for new messages on the bus.

image

Using the design above enables the client to define how long to wait after the initial request before retrieving the responses from the services via the gatherer Logic App.

Note if you require a durable message delivery, I would still recommend using a service bus which provides peek-lock, FIFO, dead-lettering, transactional support.

Scenario

A website is required to validate a PO and display the customers details, the current stock level and the price when requesting information about a product. The information is to be sourced from 3 different API services, the customer information from CRM, product stock levels from an Inventory service and the product price from a Pricing service.

Approach

The website will send a request to a HTTPS Event Grid endpoint. This will publish the message to the 3 Logic App subscriptions which are all triggered by a HTTP request action.

One of the properties sent with each subscription is a message Id which will be used as the session Id for the service bus when returning the API response messages.

Each Logic App will map the request message to the required API format and wait for the API response message to be placed onto the service bus.

When the website is ready to receive the aggregated responses from the service bus, it will make a HTTP Get request to another Logic App which will then read all the messages from the service bus matching the session Id key and combine them into a single composite message.

Design

API Proxy Logic Apps

To keep the demo simple, each of the proxy Logic Apps will compose a static response message using some of the properties found in the request message. In a practical solution you would map the request message to the correct format required by the API service after the Parse JSON action and add a HTTP Request/Response action after the map to call the API service.

image

The details of each action is shown below.

Take note of each HTTP POST URL address in the logic apps as this will be required when setting up the subscriptions in Event Grid. The Request Body JSON Schema can be left blank.

image

Note an indexer is required for the body contents as it is normally sent as a collection.

image

Here we are composing a static  CRM response message to send back. The contents will be different for each proxy Logic App.

image

The composed message is put onto the service bus using the EventGrid Id as the Session Id. This is to group the messages from the other proxy Logic Apps. A custom property is used to indicate the source of the message and is used latter when aggregating the other response messages from other proxy Logic Apps.

image

The code view of the Send Message service bus connector shown below. Note the custom property “ServiceName”and the value “CRM”

image

The other 2 proxy Logic Apps for checking inventory and pricing are similar in design to this one. The only differences are the composing of the static response message and the value of the Service Name source property in the service bus connector.

Creating an Azure Event Grid

From the Azure Portal search for Event Grid and then click create.

image

Add a suitable name and the resource group.

image

Once the resource has been created, you will need to take a copy of the Access key and Topic Endpoint. This will be used latter when posting messages to this endpoint.

image

After the resource has been created, subscriptions will need to be created for each of the proxy Logic Apps using the HTTP POST URL addresses for the web hook endpoints. This is done by clicking the Event Subscription icon at the top menu bar.

image

We will need to create 3 separate subscriptions, one for each of the proxy Logic Apps. They all share the same Event Type and Prefix Filter, the only difference between them is the Name and the Subscriber Endpoint URL.

image

Once completed, you should have the following 3 subscriptions created.

image

Service Bus Queue

Next a SB queue is required to place the response messages returned from the API services that where called in each of the Micro Service proxy Logic Apps. The Basic pricing tier may be used as we do not require any topics to be created

.image

Once the Service Bus has been created, we need to create a queue with sessions enabled. Sessions are required to group all the messages with the same Id so they can processed together.

image

Message Gatherer Logic App

The purpose of this Logic App is to retrieve all the messages from the service bus matching a session Id and then to aggregate the messages into a single composite message. This Logic App is triggered by the website when it is ready to retrieve the composite responses from the services.

image

Details of each action shape is described below.

The “When a HTTP request is received” trigger is a GET method with the message {id} in the URL as a resource.

image

This variable is used to append all the API response messages.

image

Here we get all the messages from the service bus using the id from the query string of the HTTP Get as the Session Id.

image

We then iterate through all the messages received from the service bus queue and append each message to the responseMsg variable before completing the message in the queue.

image

Code view for the “Append to service response” is shown below. I am using the service bus custom property “ServiceName” that was set in the API Proxy Logic App for the message name.

image

Code view for the “Complete the message in a queue” is shown below.

image

Once all the messages have completed, we close the session in the queue using the Id from the query string of the HTTP Get.

image

The last step is to return the aggregated message as a typed JSON message.

image

Below is code view for the Response Action.

image

Console Application

To keep things simple I will use a console application to mimic a website publishing a request to the scatter-gather service. The same code can be put into a helper project and used by a website.

When publishing events to the Event Grid, it must conform to the following schema. The message to send to the Event Grid is placed in the “data” element.
{
“topic”: string,
“subject”: string,
“id”: string,
“eventType”: string,
“eventTime”: string,
“data”:{
object-unique-to-each-publisher
},
“dataVersion”: string,
“metadataVersion”: string
}
]

More information about the schema and properties can be found here:- https://docs.microsoft.com/en-us/azure/event-grid/event-schema

For the scatter-gather pattern I will be setting the “id” property to a Guid string value which will be used as the session key on the service bus. Both the eventType and subject properties will be used for the subscription filter.

I have created two custom objects to represent the Purchase Order and the EventGrid models.

Purchase Order Object
  1. public class PurchaseOrder
  2. {
  3.     public string OrderNumber { get; set; }
  4.     public string CustomerNumber { get; set; }
  5.     public string ProductCode { get; set; }
  6. }
Event Grid Object
  1. public class GridEvent<T> where T : class
  2.     {
  3.         public string Id { get; set; }
  4.         public string Subject { get; set; }
  5.         public string EventType { get; set; }
  6.         public T Data { get; set; }
  7.         public DateTime EventTime { get; set; }
  8.     }

The console application consists of 2 functions, one to send the request to the Event Grid and another to request the aggregated message from the Message Gatherer Logic App.

The “SendMsgAsync” function below accepts a event grid object as an argument and returns the status of the post action. It adds the parameter into a collection and serialises the object before posting it to the EventGrid. Remember to update the sasKey variable with the value from your EventGrid resource.

Send Message Func
  1. private static async Task<string> SendMsgAsync(GridEvent<PurchaseOrder> gridEvent)
  2.         {
  3.             var payloadList = new List<GridEvent<PurchaseOrder>>();
  4.             payloadList.Add(gridEvent);
  5.             string topicEndpoint = “https://scatter-eventgrid.westus2-1.eventgrid.azure.net/api/events”;
  6.             //add the event grid access key
  7.             string sasKey = “<Event Gris sas key>”;
  8.             // Event grid expects event data as JSON
  9.             var json = JsonConvert.SerializeObject(payloadList);
  10.             // Create request which will be sent to the topic
  11.             var content = new StringContent(json, Encoding.UTF8, “application/json”);
  12.             // Create a HTTP client which we will use to post to the Event Grid Topic
  13.             var httpClient = new HttpClient();
  14.             // Add key in the request headers
  15.             httpClient.DefaultRequestHeaders.Add(“aeg-sas-key”, sasKey);
  16.             // Send request
  17.             Console.WriteLine(“Sending event to Event Grid…”);
  18.             var result = await httpClient.PostAsync(topicEndpoint, content);
  19.             return result.ReasonPhrase;
  20.         }

 

The “ReadMsgAsync” function below accepts the batchId as an argument. It is used to make a HTTP Get to the API Gatherer Logic App to retrieve the composite message. It uses the batchId as a resource location in the call-back URL of the logic app.

You will need to add the {0} token after the …invoke/ resource location as highlighted. https://prod-27.australiasoutheast.logic.azure.com/workflows/16…/triggers/manual/paths/invoke/{0}?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=k-5w..

Read Message Func
  1. private static async Task<string> ReadMsgAsync( string batchId)
  2. {
  3.     var uri = string.Format(“<API Gatherer Logic App Callback url>”, batchId);
  4.     var httpClient = new HttpClient();
  5.     return await httpClient.GetStringAsync(uri);
  6. }

 

In the main function below, we create a sample PO and an EventGrid message object. We then call the SendMsgAsync function to post the message to the EventGrid in Azure. Then after waiting for a second we call the ReadMsgAsync function to read the composite message from the Logic App. Note we are using the batchId as the EventGrid object id to group all the response messages together from the API services.

Console Main Func
  1. static void Main(string[] args)
  2. {
  3.     //create id for for service bus sessions
  4.     var batchId = Guid.NewGuid().ToString();
  5.     var payload = new GridEvent<PurchaseOrder>
  6.     {
  7.         Data = new PurchaseOrder { OrderNumber = “1234”, CustomerNumber = “ABC100”, ProductCode = “PRD2043” },
  8.         EventTime = DateTime.UtcNow,
  9.         EventType = “orders”,
  10.         Id = batchId,
  11.         Subject = “po/validation”
  12.     };
  13.     var rc = SendMsgAsync(payload).Result;
  14.     Console.WriteLine($”Event sent with result:” + rc);
  15.     Thread.Sleep(1000);
  16.     var msg = ReadMsgAsync(batchId).Result;
  17.     Console.WriteLine(“Response Message…”);
  18.     Console.WriteLine(msg);
  19.     Console.ReadLine();
  20. }

Testing

Executing the console application displays the successful post to the EventGrid and the results from the API Gatherer Logic App after a few seconds.

image

Calling the ReadMsgAsync too quickly may not allow the API services to respond in time and some messages may be left out as shown here below.

image

Conclusion

Using the Azure EventGrid is very useful for scenarios where you want to publish a message to several subscribers that are triggered by web-hooks. If the web-hook does not acknowledge receipt of the message, then EventGrid will try again several times using an exponential back off  period from 10 seconds to an hour.

If you request the composite response message from the API services too soon, you may miss some of the responses from the Micro Services. It is advisable to check the composite message for any missing responses before continuing to process the composite message

The monthly running cost of this solution is very minimal when compared to a service bus topic version of this pattern which requires polling at regulars intervals. Where each poll incorporates a trigger cost.

This same pattern can also be used to design an asynchronous request/response messaging system also.

Enjoy…