Ensuring Ordered Delivery of Messages using Azure Functions

Typically when using Azure Functions to consume messages from a Service Bus (SB), the ordering is not guaranteed although the SB is First-In-First-Out (FIFO). This is due to competing consumers, where multiple instances of the function are competing for messages of the service bus.

An example where out of ordering can happen is when a function instance takes longer to process a message than other instances therefore affecting the process ordering. This is represented in the sequence diagram below, where the function instance 1 took longer to update the same record in a database than instance 2.

image

One option to enforce ordered delivery is to configure the Azure Function to spin up only one instance. The only problem with this solution is it won’t scale very well. A more scalable option is to use sessions.This allows you to have multiple instances of a function executing giving you a higher message throughput.

To enforce message ordering several properties must be set. The property Requires Session must be enabled on the SB queues and topic subscriptions. Messages sent onto the SB must set the context property SessionId to unique value from other non related messages. Some examples of a session Id could be the account number, customer number, batch Id, etc. Azure Functions need to have the IsSessionsEnabled property set to enabled on the SB input binding.

This feature for Azure Functions to use SB sessions only came GA as of mid 2019. Enabling sessions on the Azure Function places a lock on all messages that have the same session Id causing the locked messages to be consumed by that one function instance that placed the lock.

Typical Scenario

A warehouse needs to track the progress of an order from when its first received to when it gets dispatched. Throughout each stage (Ordered, Picked, Packaged, Dispatched) of the ordering process, the status of the order must be updated. This involves placing a new message onto the service bus every time the order status needs to get updated. An Azure function will then pull the messages from the service bus and update the order status in a database where the customer can view the current state of their order.

To simulate the warehouse tracking system, a console app will be used to create messages for each status change (Ordered, Picked, Packaged, Dispatched), for several hundred orders. The session Id of each status message will be set to the order number. The app will then send the messages to a SB Topic where it will have two subscriptions, one with sessions enabled and the other disabled. This is so we can compare the ordering of messages being received with and without sessions enabled.

Order message generater
  1. class Program
  2.   {
  3.       private static string connectionString = ConfigurationManager.AppSettings[“ServiceBusConnectionString”];
  4.       private static string topicName = ConfigurationManager.AppSettings[“TopicName”];
  5.       private static int orders = 100;
  6.       private static int messagePerSession = 4;
  7.       static async Task Main(string[] args)
  8.       {
  9.           Console.WriteLine(“Creating Service Bus sender….”);
  10.           var taskList = new List<Task>();
  11.           var sender = new MessageSender(connectionString, topicName);
  12.           //create an order
  13.           for (int order = 0; order < orders; order++)
  14.           {
  15.               var orderNumber = $”OrderId-{order.ToString()};
  16.               var messageList = new List<Message>();
  17.               //simulate a status update in the correct order
  18.               for (int m = 0; m < messagePerSession; m++)
  19.               {
  20.                   var status = string.Empty;
  21.                   switch (m)
  22.                   {
  23.                       case 0 :
  24.                           status = “1 – Ordered”;
  25.                           break;
  26.                       case 1:
  27.                           status = “2 – Picked”;
  28.                           break;
  29.                       case 2:
  30.                           status = “3 – Packaged”;
  31.                           break;
  32.                       case 3:
  33.                           status = “4 – Dispatched”;
  34.                           break;
  35.                   }
  36.                   var message = new Message(Encoding.UTF8.GetBytes($”Status-{status}))
  37.                   {
  38.                       //set the service bus SessionId property to the current order number
  39.                       SessionId = orderNumber
  40.                   };
  41.                   messageList.Add(message);
  42.               }
  43.               //send the list of status update messages for the order to the service bus
  44.               taskList.Add(sender.SendAsync(messageList));
  45.           }
  46.           Console.WriteLine(“Sending all messages…”);
  47.           await Task.WhenAll(taskList);
  48.           Console.WriteLine(“All messages sent.”);
  49.       }
  50.   }

Two Azure functions will be created, where one has sessions enabled and the other disabled. The functions will have a random delay created from 1 to 10 seconds to simulate some business logic which may be calling out to an external service before updating the order status. Instead of the function writing to a database, each status update message received will be written to an Azure Table storage to create an audit log of when a status update message was processed.

Below is the source code for the function which will process the messages on the service bus using sessions. Note the IsSessionEnabled property is set to true on the ServiceBusTrigger input binding. The randomiser is to simulate some business logic that could vary in time to process a message.

Azure Function using sessions
  1. public static class MsgOrderingSessions
  2.     {
  3.         [FunctionName(“MsgOrderingSessions”)]
  4.         [return: Table(“OrdersSession”, Connection = “StorageConnectionAppSetting”)]
  5.         public static OrderEntity Run([ServiceBusTrigger(“orders”, “OrdersSession”, Connection = “SbConnStr”, IsSessionsEnabled = true)]
  6.               Message sbMesssage, ILogger log)
  7.         {
  8.             log.LogInformation($”C# ServiceBus topic trigger function processed message: {Encoding.UTF8.GetString(sbMesssage.Body)});
  9.             Random random = new Random();
  10.             int randNumb = random.Next(1000, 10000);
  11.             System.Threading.Thread.Sleep(randNumb);
  12.             return new OrderEntity { PartitionKey = $”{sbMesssage.SessionId} – {DateTime.Now.Ticks} , RowKey = Guid.NewGuid().ToString(), Text = Encoding.UTF8.GetString(sbMesssage.Body) };
  13.         }
  14.     }

Below is the source code for the function which does not use sessions. Here the IsSessionEnabled is set to false.

Azure Function no sessions
  1. public static class MsgOrderingNoSession
  2.     {
  3.         [FunctionName(“MsgOrderingNoSessions”)]
  4.         [return: Table(“OrdersNoSession”, Connection = “StorageConnectionAppSetting”)]
  5.         public static OrderEntity Run([ServiceBusTrigger(“orders”, “OrdersNoSession”, Connection = “SbConnStr”, IsSessionsEnabled = false)]
  6.               Message sbMesssage, ILogger log)
  7.         {
  8.             log.LogInformation($”C# ServiceBus topic trigger function processed message: {Encoding.UTF8.GetString(sbMesssage.Body)});
  9.             Random random = new Random();
  10.             int randNumb = random.Next(1000, 10000);
  11.             System.Threading.Thread.Sleep(randNumb);
  12.             return new OrderEntity { PartitionKey = $”{sbMesssage.SessionId} – {DateTime.Now.Ticks}, RowKey = Guid.NewGuid().ToString(), Text = Encoding.UTF8.GetString(sbMesssage.Body) };
  13.         }       
  14.     }

Below is the settings for the service bus topic which has 2 subscriptions and one of them has Requires Session checked.

image

Running the console app creates 400 messages on both subscriptions, 4 status update messages per 1 order.

image

Conclusion

The Azure function which had the ServiceBusTrigger, IsSessionsEnabled = false inserted the rows out of order due to multiple function instances competing for the next message on the service bus.

image

Now the Azure Function which had IsSessionsEnabled = true and read messages from a service bus subscription which also had the Requires Session flag enabled, the messages were processed in the correct sequence as they were placed onto the service bus.

image

When using sessions, there is a slight performance hit depending on the number of function instances executing. In this example both functions where running under the consumption plan which spun up 6 instances. As you can see the number of messages waiting on each of the subscriptions below, the subscription which had sessions disabled are processing the messages a lot faster.

When sessions are used, each function instance places a locked on all messages having the same session Id which are processed one after another. As there were only 6 instances available, only a maximum of six orders could be processed at one time.

image

Enjoy…

Always subscribe to Dead-lettered messages when using an Azure Service Bus

I typically come across solutions that incorporate a service bus where monitoring of the dead-letter queues are simply ignored. Upon questioning the reason, the normal excuse for not monitoring the DLQ (dead-letter queue) is quote “I don’t require it because I have added exception handling therefore no messages will end up on the DLQ”. From experience there will be scenarios where the exception logic did not capture an edge case scenario and the message will end up onto the DLQ without anyone knowing about. 

A simple solution is to have a single process to monitor all the DQL messages and raise an alert when one occurs. Below is one of my building blocks which a typically incorporate when there is a service bus involved in a solution.

image

To use the DLQ building block to centrally capture any DLQ message, simply set the “ForwardDeadLetteredMessagesTo” property on each of the messaging queues to a common DLQ handler queue as shown below.

image

Now when a message gets dead lettered, it will end up in this common queue which is monitored by an Azure Function. Note the current NuGet version v3.0.4 of the ServiceBus DLL has the DeadLetterSource property and is not currently available in the Logic App Service Bus connector. The function writes the DLQ meta data, any custom properties and the message payload to a blobstore file. By using an Azure Storage Account V2 for the blobstore, a new blob creation event will be fired to any interested subscribers which in this case is a Logic App.

Azure Function Code

Below is the code for the Azure Function. Here I am using the IBinder interface to allow me to set the folder path and file name imperatively. The connection strings (ServiceBusConn, StorageAccountConn) are defined in the Application settings of the  Azure App Service.

Code Snippet
  1. using Microsoft.Azure.ServiceBus;
  2. using Microsoft.Azure.WebJobs;
  3. using Microsoft.Extensions.Logging;
  4. using Newtonsoft.Json;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.IO;
  8. using System.Threading.Tasks;
  9.  
  10. namespace ServiceBusDLQMonitoring
  11. {
  12.     public static class Function1
  13.     {
  14.         [FunctionName("Function1")]
  15.  
  16.         public static async Task RunAsync([ServiceBusTrigger("dlq-processor", Connection = "ServiceBusConn")] Message dlqQueue,
  17.             Binder blobBinder,          
  18.             ILogger log)
  19.         {
  20.             log.LogInformation($"C# ServiceBus queue trigger function processed message: {dlqQueue.MessageId}");
  21.  
  22.             //set the filename and blob path
  23.             var blobFile = Guid.NewGuid().ToString() + ".json";
  24.             var path = string.Concat("dlq/",DateTime.UtcNow.ToString("yyyy/MM/dd"),"/" ,dlqQueue.SystemProperties.DeadLetterSource,"/", blobFile);
  25.  
  26.             var dlqMsq = new DLQMessage
  27.             {
  28.                 DeadletterSource = dlqQueue.SystemProperties.DeadLetterSource,
  29.                 MessageId = dlqQueue.MessageId,
  30.                 SequenceNumber = dlqQueue.SystemProperties.SequenceNumber,
  31.                 SessionId = dlqQueue.SessionId,
  32.                 UserProperties = dlqQueue.UserProperties,
  33.                 DeadLetterReason = dlqQueue.UserProperties["DeadLetterReason"].ToString(),
  34.                 EnqueuedDttmUTC = dlqQueue.SystemProperties.EnqueuedTimeUtc,
  35.                 ContentType = dlqQueue.ContentType,
  36.                 DeliveryCount = dlqQueue.SystemProperties.DeliveryCount,
  37.                 Label = dlqQueue.Label,
  38.                 MsgBase64Encoded = Convert.ToBase64String(dlqQueue.Body, Base64FormattingOptions.None)
  39.             };
  40.  
  41.             var blobAttributes = new Attribute[]
  42.             {
  43.                 new BlobAttribute(path),
  44.                 new StorageAccountAttribute("StorageAccountConn")                
  45.             };
  46.  
  47.             using (var writer = await blobBinder.BindAsync<TextWriter>(blobAttributes))
  48.             {              
  49.               writer.Write(JsonConvert.SerializeObject(dlqMsq,Formatting.Indented));
  50.             }
  51.         }
  52.     }
  53.  
  54.     public class DLQMessage
  55.     {       
  56.         public string DeadletterSource { get; set; }
  57.         public long SequenceNumber { get; set; }
  58.         public string MessageId { get; set; }
  59.         public string SessionId { get; set; }
  60.         public string Label { get; set; }
  61.         public string DeadLetterReason { get; set; }
  62.         public DateTime EnqueuedDttmUTC { get; set; }
  63.         public int DeliveryCount { get; set; }        
  64.         public string ContentType { get; set; }
  65.         public IDictionary<string,object> UserProperties { get; set; }
  66.         public string MsgBase64Encoded { get; set; }
  67.     }
  68. }

 

Logic App Implementation

A Logic App is used to the retrieve the message from the blob store when it is triggered by the EventGrid  HTTP webhook. The basic workflow is shown below and can be expanded to suit your own requirements.

image

The expression for the ‘Get blob content using path’ action is @{replace(triggerBody()[0][‘data’][‘url’],’https://dqlmessages.blob.core.windows.net/’,”)}. Here I am just replacing the Domain name with an empty string as I only want the resource location.

The Parse JSON action has the following schema. This makes it easier to reference the properties downstream.

{

    "properties": {

        "ContentType": {

            "type": [

                "string",

                "null"

            ]

        },

        "DeadLetterReason": {

            "type": "string"

        },

        "DeadletterSource": {

            "type": "string"

        },

        "DeliveryCount": {

            "type": "integer"

        },

        "EnqueuedDttmUTC": {

            "type": "string"

        },

        "Label": {

            "type": [

                "string",

                "null"

            ]

        },

        "MessageId": {

            "type": "string"

        },

        "MsgBase64Encoded": {

            "type": [

                "string",

                "null"

            ]

        },

        "SequenceNumber": {

            "type": "integer"

        },

        "SessionId": {

            "type": [

                "string",

                "null"

            ]

        },

        "UserProperties": {

            "type": "any"

        }

    },

    "type": "object"

}

 

The last action ‘Set variable MsgBody’  has the value set to: “@{base64ToString(body(‘Parse_JSON’)?[‘MsgBase64Encoded’])}”

Blob creation event configuration

Next is to setup a subscription to the Blob creation event. Click on Events under the Storage account for the DLQ messages as shown below.

image

Then click on the +Event Subscription to add a new subscription.

image

Setup the subscription Basic details with a suitable subscription name and the blob storage account resource name. Uncheck the ‘Subscribe to all event types’ and select Blob Created event. Set the endpoint details to Web Hook and the URL of the Logic App Http trigger endpoint address.

image

Under Filters, enable subject filtering. Add the following prefix ‘/blobServices/default/containers/’ to the name of the DLQ container (in this example its called ‘dlq’) and add it to the ‘Subject Begins With’ textbox.  Then in the ‘Subject Ends With’, set it to the filename extension ‘.json’. Now click the Create button at the bottom of the page to create the subscription.

image

Sample Logic App output

Once everything is wired up and if there are any messages that have been placed onto the DLQ, you should see some logic app runs. Below is an example of the outputs from the last two actions.

image

All you need to do now is extend the Logic App to deliver the DLQ alert to somewhere.

Enjoy…

Providing DR By Using An Azure Service Bus In Two Regions

This is a Disaster Recovery option I used on one of my projects. A client required a DR solution which guaranteed not to lose any messages during the DR failover process. In essence, no messages were allowed to be lost in mid flight whilst the DR process was taking place.

Solution

The approach I took was to use an Azure Service Bus  in a primary Azure region and another service bus in a secondary Azure  DR region. The publisher would then send the same message to both service bus endpoints. For my solution I used Azure APIM to post the same message to both primary and secondary regions, thereby simplifying the code for the publisher. 

image

The primary region would simply process the messages as per normal using a Logic App to poll for new messages on the service bus. However in the DR region,  the equivalent Logic App would be set in a disabled state. The messages in the secondary region would remain on the bus until the Time-To-Live (TTL) threshold is reached before either being moved onto the Deadletter queue, or dropped from the queue altogether by setting the EnableDeadLetteringOnMessageExpiration property to false. Using this technique provides automatic pruning of the older messages that would have definitely been processed by the primary region before failing over.

The value chosen for the TTL property is determined by how long it would take to failover to the DR region and the time to detect an issue being realised in the primary region.

Failing over would simply involve enable the Logic App the DR region and failing back would involve disabling this Logic App again. 

Pros

  • No messages are lost during the failover process.
  • Low monetary transaction cost by the DR environment as no Logic Apps are being triggered during the normal process flow through the primary region.
  • Simplistic DR design which involves just another another queue.
  • Simple failover process.

Cons

  • There is a delay before any new messages would be processed while  the older messages on the service bus are reprocessed first.
  • The backend system processing the messages must be idempotent, meaning the same message maybe processed by the backend system multiple times and produce the same outcome.
  • Requires the publisher to send the same message to 2 different endpoints. This may be mitigated by using APIM to manage sending the messages to both service bus endpoints.

Enjoy…

Scatter-Gather pattern using an Azure Event Grid

This is another version of the scatter-gather integration pattern which I previously blogged about some time ago.  The pervious version was based on using  service bus topics and Logic Apps which polled for new messages. Whilst this was adequate for a messaging system that did not require immediate request/response times, it was not an ideal solution for another project I was working on. This project required a website to initiate the request to a scatter-gather pattern and to have low latency response times of a few seconds.

With the recently introduced Event Grid service in Azure https://azure.microsoft.com/en-us/services/event-grid/, I was able to achieve request/responses in seconds using the scatter-gather pattern. This is possible because Event Grid provides event based notifications, unlike a service bus architecture which requires regular polling for new messages on the bus.

image

Using the design above enables the client to define how long to wait after the initial request before retrieving the responses from the services via the gatherer Logic App.

Note if you require a durable message delivery, I would still recommend using a service bus which provides peek-lock, FIFO, dead-lettering, transactional support.

Scenario

A website is required to validate a PO and display the customers details, the current stock level and the price when requesting information about a product. The information is to be sourced from 3 different API services, the customer information from CRM, product stock levels from an Inventory service and the product price from a Pricing service.

Approach

The website will send a request to a HTTPS Event Grid endpoint. This will publish the message to the 3 Logic App subscriptions which are all triggered by a HTTP request action.

One of the properties sent with each subscription is a message Id which will be used as the session Id for the service bus when returning the API response messages.

Each Logic App will map the request message to the required API format and wait for the API response message to be placed onto the service bus.

When the website is ready to receive the aggregated responses from the service bus, it will make a HTTP Get request to another Logic App which will then read all the messages from the service bus matching the session Id key and combine them into a single composite message.

Design

API Proxy Logic Apps

To keep the demo simple, each of the proxy Logic Apps will compose a static response message using some of the properties found in the request message. In a practical solution you would map the request message to the correct format required by the API service after the Parse JSON action and add a HTTP Request/Response action after the map to call the API service.

image

The details of each action is shown below.

Take note of each HTTP POST URL address in the logic apps as this will be required when setting up the subscriptions in Event Grid. The Request Body JSON Schema can be left blank.

image

Note an indexer is required for the body contents as it is normally sent as a collection.

image

Here we are composing a static  CRM response message to send back. The contents will be different for each proxy Logic App.

image

The composed message is put onto the service bus using the EventGrid Id as the Session Id. This is to group the messages from the other proxy Logic Apps. A custom property is used to indicate the source of the message and is used latter when aggregating the other response messages from other proxy Logic Apps.

image

The code view of the Send Message service bus connector shown below. Note the custom property “ServiceName”and the value “CRM”

image

The other 2 proxy Logic Apps for checking inventory and pricing are similar in design to this one. The only differences are the composing of the static response message and the value of the Service Name source property in the service bus connector.

Creating an Azure Event Grid

From the Azure Portal search for Event Grid and then click create.

image

Add a suitable name and the resource group.

image

Once the resource has been created, you will need to take a copy of the Access key and Topic Endpoint. This will be used latter when posting messages to this endpoint.

image

After the resource has been created, subscriptions will need to be created for each of the proxy Logic Apps using the HTTP POST URL addresses for the web hook endpoints. This is done by clicking the Event Subscription icon at the top menu bar.

image

We will need to create 3 separate subscriptions, one for each of the proxy Logic Apps. They all share the same Event Type and Prefix Filter, the only difference between them is the Name and the Subscriber Endpoint URL.

image

Once completed, you should have the following 3 subscriptions created.

image

Service Bus Queue

Next a SB queue is required to place the response messages returned from the API services that where called in each of the Micro Service proxy Logic Apps. The Basic pricing tier may be used as we do not require any topics to be created

.image

Once the Service Bus has been created, we need to create a queue with sessions enabled. Sessions are required to group all the messages with the same Id so they can processed together.

image

Message Gatherer Logic App

The purpose of this Logic App is to retrieve all the messages from the service bus matching a session Id and then to aggregate the messages into a single composite message. This Logic App is triggered by the website when it is ready to retrieve the composite responses from the services.

image

Details of each action shape is described below.

The “When a HTTP request is received” trigger is a GET method with the message {id} in the URL as a resource.

image

This variable is used to append all the API response messages.

image

Here we get all the messages from the service bus using the id from the query string of the HTTP Get as the Session Id.

image

We then iterate through all the messages received from the service bus queue and append each message to the responseMsg variable before completing the message in the queue.

image

Code view for the “Append to service response” is shown below. I am using the service bus custom property “ServiceName” that was set in the API Proxy Logic App for the message name.

image

Code view for the “Complete the message in a queue” is shown below.

image

Once all the messages have completed, we close the session in the queue using the Id from the query string of the HTTP Get.

image

The last step is to return the aggregated message as a typed JSON message.

image

Below is code view for the Response Action.

image

Console Application

To keep things simple I will use a console application to mimic a website publishing a request to the scatter-gather service. The same code can be put into a helper project and used by a website.

When publishing events to the Event Grid, it must conform to the following schema. The message to send to the Event Grid is placed in the “data” element.
{
“topic”: string,
“subject”: string,
“id”: string,
“eventType”: string,
“eventTime”: string,
“data”:{
object-unique-to-each-publisher
},
“dataVersion”: string,
“metadataVersion”: string
}
]

More information about the schema and properties can be found here:- https://docs.microsoft.com/en-us/azure/event-grid/event-schema

For the scatter-gather pattern I will be setting the “id” property to a Guid string value which will be used as the session key on the service bus. Both the eventType and subject properties will be used for the subscription filter.

I have created two custom objects to represent the Purchase Order and the EventGrid models.

Purchase Order Object
  1. public class PurchaseOrder
  2. {
  3.     public string OrderNumber { get; set; }
  4.     public string CustomerNumber { get; set; }
  5.     public string ProductCode { get; set; }
  6. }
Event Grid Object
  1. public class GridEvent<T> where T : class
  2.     {
  3.         public string Id { get; set; }
  4.         public string Subject { get; set; }
  5.         public string EventType { get; set; }
  6.         public T Data { get; set; }
  7.         public DateTime EventTime { get; set; }
  8.     }

The console application consists of 2 functions, one to send the request to the Event Grid and another to request the aggregated message from the Message Gatherer Logic App.

The “SendMsgAsync” function below accepts a event grid object as an argument and returns the status of the post action. It adds the parameter into a collection and serialises the object before posting it to the EventGrid. Remember to update the sasKey variable with the value from your EventGrid resource.

Send Message Func
  1. private static async Task<string> SendMsgAsync(GridEvent<PurchaseOrder> gridEvent)
  2.         {
  3.             var payloadList = new List<GridEvent<PurchaseOrder>>();
  4.             payloadList.Add(gridEvent);
  5.             string topicEndpoint = “https://scatter-eventgrid.westus2-1.eventgrid.azure.net/api/events”;
  6.             //add the event grid access key
  7.             string sasKey = “<Event Gris sas key>”;
  8.             // Event grid expects event data as JSON
  9.             var json = JsonConvert.SerializeObject(payloadList);
  10.             // Create request which will be sent to the topic
  11.             var content = new StringContent(json, Encoding.UTF8, “application/json”);
  12.             // Create a HTTP client which we will use to post to the Event Grid Topic
  13.             var httpClient = new HttpClient();
  14.             // Add key in the request headers
  15.             httpClient.DefaultRequestHeaders.Add(“aeg-sas-key”, sasKey);
  16.             // Send request
  17.             Console.WriteLine(“Sending event to Event Grid…”);
  18.             var result = await httpClient.PostAsync(topicEndpoint, content);
  19.             return result.ReasonPhrase;
  20.         }

 

The “ReadMsgAsync” function below accepts the batchId as an argument. It is used to make a HTTP Get to the API Gatherer Logic App to retrieve the composite message. It uses the batchId as a resource location in the call-back URL of the logic app.

You will need to add the {0} token after the …invoke/ resource location as highlighted. https://prod-27.australiasoutheast.logic.azure.com/workflows/16…/triggers/manual/paths/invoke/{0}?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=k-5w..

Read Message Func
  1. private static async Task<string> ReadMsgAsync( string batchId)
  2. {
  3.     var uri = string.Format(“<API Gatherer Logic App Callback url>”, batchId);
  4.     var httpClient = new HttpClient();
  5.     return await httpClient.GetStringAsync(uri);
  6. }

 

In the main function below, we create a sample PO and an EventGrid message object. We then call the SendMsgAsync function to post the message to the EventGrid in Azure. Then after waiting for a second we call the ReadMsgAsync function to read the composite message from the Logic App. Note we are using the batchId as the EventGrid object id to group all the response messages together from the API services.

Console Main Func
  1. static void Main(string[] args)
  2. {
  3.     //create id for for service bus sessions
  4.     var batchId = Guid.NewGuid().ToString();
  5.     var payload = new GridEvent<PurchaseOrder>
  6.     {
  7.         Data = new PurchaseOrder { OrderNumber = “1234”, CustomerNumber = “ABC100”, ProductCode = “PRD2043” },
  8.         EventTime = DateTime.UtcNow,
  9.         EventType = “orders”,
  10.         Id = batchId,
  11.         Subject = “po/validation”
  12.     };
  13.     var rc = SendMsgAsync(payload).Result;
  14.     Console.WriteLine($”Event sent with result:” + rc);
  15.     Thread.Sleep(1000);
  16.     var msg = ReadMsgAsync(batchId).Result;
  17.     Console.WriteLine(“Response Message…”);
  18.     Console.WriteLine(msg);
  19.     Console.ReadLine();
  20. }

Testing

Executing the console application displays the successful post to the EventGrid and the results from the API Gatherer Logic App after a few seconds.

image

Calling the ReadMsgAsync too quickly may not allow the API services to respond in time and some messages may be left out as shown here below.

image

Conclusion

Using the Azure EventGrid is very useful for scenarios where you want to publish a message to several subscribers that are triggered by web-hooks. If the web-hook does not acknowledge receipt of the message, then EventGrid will try again several times using an exponential back off  period from 10 seconds to an hour.

If you request the composite response message from the API services too soon, you may miss some of the responses from the Micro Services. It is advisable to check the composite message for any missing responses before continuing to process the composite message

The monthly running cost of this solution is very minimal when compared to a service bus topic version of this pattern which requires polling at regulars intervals. Where each poll incorporates a trigger cost.

This same pattern can also be used to design an asynchronous request/response messaging system also.

Enjoy…