Message Re-sequencing using Azure Functions and Table storage

Message re-sequencing  is another integration pattern used to rearrange messages received out-of-order and back into the original sequence again. Well you may ask yourself why is this pattern is so important in todays world of messaging systems.

As the integration world is moving towards services hosted as Microservices in the cloud due to its scalability attributes and independent deployable components. This may require breaking messages up into smaller logical segments or de-batching the messages to allow the services to manage the data more efficiently. Using this pattern, makes it possible to add the smaller segments back together again after being broken up in its correct order.

image

Scenario

A typical use case is a Purchase Order with multiple line items which require processing asynchronously by a Microservice at an individual line level. This service may calculate the taxes, check stock levels and add the total line price. Some line items may take longer to process than others before returning the updated line item.

The approach would be to de-batch the PO at the line level and send each line item to the Microservice. The response message from the Microservice would then go through a Message Re-Sequencer component to put the order lines back in the correct sequence again before returning the updated PO response.

Another use case would be to re-sequence messages after a scatter gather pattern (https://connectedcircuits.blog/2017/10/08/integration-scatter-gather-pattern-using-azure-logic-apps-and-service-bus/) before forwarding them on.

Approach

For this re-sequencer pattern to work, we require 3 attributes to be passed with each message to the messaging re-sequencer service, the sequence number of the message, the total number of messages to expect and a batch number.

A storage repository  is also required to temporary hold the out-of-sequence messages. The repository will be based on an Azure Table Storage as it provides all the requirements of storing NoSQL data structures, a Partition key to group the related messages together and a Row key to identity each message by the sequence number. By using the combination of these two keys we can quickly find and retrieve a message from the store.

An Azure Function is used to manage the saving and retrieving of messages from Table storage and is designed to be agnostic of the message types. This function is actually the core component of this solution.

The logic app is primarily there to receive the message, de-batch the line items and pass it onto the function.  Then forward on the message when all de-batched line items have been received.

There are two possibilities available as when to forward the re-ordered messages out.

  • On arrival – as a message arrives we check if any messages in the repository can be sent out before this one is sent.
  • Last message arrival –  where we send all the messages out I one hit after the final message  has been received.

This design will focus on the last option, where we will send all messages out in one hit. My next blog will cover how to send it out using the On Arrival method.

Design

Creating the Azure Table Storage

First we need to add an Azure Storage account to our resource group to store the incoming messages.

clip_image002

Take a copy of the connection string from Settings/Access keys of Storage account as we will require this latter.

clip_image004

Building the Azure Function

Start by creating an Azure Functions project in Visual Studio 2017.

clip_image006

Now add a new C# class to the project called MsgEntity with the following properties and an overloaded constructor shown below. Also we need to inherent the TableEntity base class from the “Microsoft.WindowsAzure.Storage.Table” assembly.

clip_image007

This class will be used as a container to hold the message contents and sequence numbering attributes.

Now we need to add the following three methods to the function described below.

  • AzureTableStoreMsg – Stores the message in Azure Table Storage and returns the total number of messages received. If the final message in the sequence has been previously received, then the date/time it was received is returned also.
  • AzureTableRetrieveMsg – Retrieves all messages from the table storage using the batch number or an individual message using the message sequence number.
  • AzureTableRemoveMsg – deletes all the messages from table storage related to the batch number.

To add a function method to the project, right click on the project and select New Item. From the Add New Item popup window, select Azure Function

 

.clip_image009

Then select the “Generic WebHook” type. This will allow a Logic App to access the interface from the workflow designer using the Azure Functions action shape latter.

clip_image010

Add the following code to this new method “AzureTableStoreMsg”

   1: [FunctionName("AzureTableStoreMsg")]

   2: public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3: {

   4:     log.Info($"Webhook was triggered!");

   5:

   6:     var lastMsgIndex = 0;

   7:     var msgCount = 0;

   8:     var eomDateTimeReceived = DateTime.MinValue;

   9:

  10:     try

  11:     {

  12:         //read the request headers              

  13:         var sequencenumber = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqNumber").FirstOrDefault());

  14:         var totalMsgCount = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqTotalCount").FirstOrDefault());

  15:         var batchnumber = req.Headers.GetValues("x-MsgReseqBatchId").FirstOrDefault();

  16:         var isEndMsg = totalMsgCount == sequencenumber ? true : false;

  17:

  18:         string jsonContent = await req.Content.ReadAsStringAsync();

  19:

  20:         var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  21:         // Create the table client.

  22:         CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  23:         // Retrieve a reference to the table.

  24:         CloudTable table = tableClient.GetTableReference("msgsequence");

  25:         // Create the table if it doesn't exist.

  26:         table.CreateIfNotExists();

  27:

  28:         // Get request body and initialise a MsgEntity object.

  29:         var requestBody = await req.Content.ReadAsAsync<object>();

  30:         var msgEntity = new MsgEntity(batchnumber, sequencenumber, isEndMsg);

  31:         msgEntity.Message = requestBody.ToString();

  32:

  33:         // Create the TableOperation object that inserts the message entity. Will rasie error if duplicate found.

  34:         TableOperation insertOperation = TableOperation.InsertOrReplace(msgEntity);

  35:

  36:         // Execute the insert operation.

  37:         await table.ExecuteAsync(insertOperation);

  38:

  39:         //iterate through all the messages for this partition check if the last message has been received and the number of messages

  40:         var queryMsgType = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  41:                            .Select(new string[] { "RowKey", "Timestamp", "IsLastMessage" });

  42:

  43:         foreach (MsgEntity entity in table.ExecuteQuery(queryMsgType))

  44:         {

  45:             if (entity.IsLastMessage)

  46:             {

  47:                 lastMsgIndex = Convert.ToInt32(entity.RowKey);

  48:                 eomDateTimeReceived = entity.Timestamp.UtcDateTime;

  49:             }

  50:             msgCount++;

  51:         }

  52:

  53:     }

  54:     catch (Exception ex)

  55:     {

  56:         return req.CreateResponse(HttpStatusCode.InternalServerError, ex.Message);

  57:     }

  58:

  59:     // Return the number of messages when the completion message arrived.

  60:     return req.CreateResponse(HttpStatusCode.OK, new { EOMReceived = eomDateTimeReceived, EOMIndex = lastMsgIndex, TotalMessagesReceived = msgCount });

  61:

  62: }

The current message sequence number, total number of messages and the batch number are passed in as custom header values while the message is passed in the body payload. This function will create a table called “msgsequence” if it does not exist and will populate the custom object with the message properties and the message before storing the object in the Azure Table. The response message from this function returns the total number of messages received and if the last message in the sequence has been received. Below is an example of the response message showing the number of messages it has received so far and when the last message sequence was received.

image

Add another Azure Function as before called “AzureTableRetrieveMsg” for retrieving messages from the Table store and add the following code below.

   1: [FunctionName("AzureTableRetreiveMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string,string>>(jsonContent);

   8:

   9:            var sequencenumber = data["MsgSequenceNUmber"];

  10:            var batchnumber = data["MsgBatchId"];

  11:

  12:

  13:            var storageAccount = Microsoft.WindowsAzure.Storage.CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  14:            // Create the table client.

  15:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  16:            // Retrieve a reference to the table.

  17:            CloudTable table = tableClient.GetTableReference("msgsequence");

  18:

  19:            // Construct the query operation for all entities where PartitionKey equals msgtype.

  20:            TableQuery<MsgEntity> query = null;

  21:            if (string.IsNullOrEmpty(sequencenumber))

  22:            {

  23:                query = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber));

  24:            }

  25:            else

  26:            {

  27:                var sequenceFilter = TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, string.Format(sequencenumber,"0000000000"));

  28:                var msgTypeFilter = TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber);

  29:                query = new TableQuery<MsgEntity>().Where(TableQuery.CombineFilters(sequenceFilter, TableOperators.And, msgTypeFilter));

  30:            }

  31:

  32:            var list = new List<dynamic>();

  33:            foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  34:            {

  35:                list.Add(JsonConvert.DeserializeObject(entity.Message));

  36:            }

  37:

  38:            return req.CreateResponse(HttpStatusCode.OK, list);

  39:        }

This function will optionally return all records for the batch number or a single message matching the passed in sequence number. A sample request message is shown below which will return all records matching the batch number if the MsgSequenceNumber is left blank.

image

The last Azure Function method to add is “AzureTableRemoveMsg”.

   1: [FunctionName("AzureTableDeleteMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string, string>>(jsonContent);

   8:

   9:            var batchnumber = data["MsgBatchId"];

  10:

  11:            var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  12:            // Create the table client.

  13:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  14:            // Retrieve a reference to the table.

  15:            CloudTable table = tableClient.GetTableReference("msgsequence");

  16:

  17:            // Construct the query operation for all entities where PartitionKey equals batchnumber.            

  18:            if (!string.IsNullOrEmpty(batchnumber))

  19:            {

  20:                TableQuery<MsgEntity> query = new TableQuery<MsgEntity>()

  21:                            .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  22:                            .Select(new string[] { "PartitionKey", "RowKey" });

  23:

  24:                if (table.ExecuteQuery(query).Count() == 0)

  25:                {

  26:                    return req.CreateResponse(HttpStatusCode.NotFound);

  27:                }

  28:                foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  29:                {

  30:                    TableOperation deleteOperation = TableOperation.Delete(entity);

  31:                    await table.ExecuteAsync(deleteOperation);

  32:                }

  33:                return req.CreateResponse(HttpStatusCode.OK);

  34:            }

  35:

  36:

  37:            return req.CreateResponse(HttpStatusCode.BadRequest);

  38:        }

This is to delete all the messages from the Table storage for a batch number. A sample request is shown below.

image

Next we need to add the Azure Table storage connection string to the local.settings.json file in the project.

clip_image012

Before publishing the function to Azure, be sure to create a Unit Test project and check its functionality.

Publishing the Function to Azure

Right click on the project and click Publish. This will take you to the publishing wizard form. Choose “Azure Function App” and “Create New”, then click the publish button.

clip_image014

In the next screen add a function name, subscription and hosting plans etc. Then click the create button to provision any new resources and to publish the function.

clip_image016

Once the function has been published, we need to log into the Azure portal and add the Storage Account connection string to the Application settings file. Double click on the azure function app in the Azure portal.

clip_image018

Then click on the Application settings link.

clip_image020

In the Application settings section, add a new key called “StorageConnectionString” and add the connection string that was used in the local.settings.json in the Visual Studio Function project. Then click the “Save” button at the top of the screen.

clip_image022

 

Logic App for message de-batching and re-sequencing

Next we will create the Logic App to de-batch the individual line items in a parallel fashion to send to an other Logic App for processing which is acting as a Microservice.  The response is then passed to the function we created above. Once all the messages have been received from the Microservice, we call another function method to return all the stored responses in the correct order before returning the result back to the consumer.

The message will be based on a PO (Purchase Order) shown below. We will be using the OrderNumber as the batch number and the LineNumber’s for re-sequencing the line items later.

image

Below is the collapsed view of the Logic App. It is triggered by a PO (Purchase Order) being posted to the HTTP endpoint with the schema is based on the PO message above.

image

Two variables are required for holding the total number of  line items and the PO lines from the Message Re-sequencer function.

image

The code view for the Initialize variable MaxLineItems is shown below, where we just take the length of the Lines collection.

image

After the variables are initialised, is the For each loop shape where we iterate through the PO lines of the message.

image

The POValidator action calls another Logic App passing the line item which simulates a Microservice to calculate the total line amount and could possibly check stock levels.

image

The response is then passed onto the AzureTableStoreMsg function method which was published to Azure in the previous steps.

Functions that have been created as generic webhooks can be selected from the Logic App designer by adding an action to the designer and choosing Azure Functions

.image

After selecting Azure functions, you should be able to see all your published functions. Click on the Azure function that was published above.

image

This will then display all the methods that we had created above. Make sure you select the AzureTableStoreMsg method from the list.

image

After the function method has been added, setup the properties as shown.

image

The body and header properties are set to the following values.

  • Request Body  – output body of the POValidator Logic App (Microservice)
  • x-MsgReseqBatchId  – the OrderNumber of the trigger message
  • x-MsgReseqNumber – the line number of the item in the loop
  • x-MsgReseqTotalCount – is the value of the MaxLineItems variable which was initiated at the start of the workflow.

And the code view for this action is shown here:

image

The step in the workflow is to check if we have received all the messages. This is done by checking the response message from the function method AzureTableStoreMsg and the two properties EOMIndex and TotalReceived to see if they are equal.

image

The syntax for the condition action “Check msg count” is as follows:

@equals(body(‘AzureTableStoreMsg’)[‘EOMIndex’], body(‘AzureTableStoreMsg’)[‘TotalMessagesReceived’])

If the condition is true, then the next step is to retrieve all the messages in the correct order from the Table Storage using the function method “AzureTableRetrieveMsg”. Follow the steps as before to add an action to call this function method. The only property to send in the request body is the MsgBatchId which is set to the PO Order number.

image

The code view for this shape looks like this.

image

We then set the variable LineItems to the output of the function method “AzureTableRetrieveMsg”.

image

Once we set the variable, we can then clear the table storage of this data by calling the function method “AzureTableDeleteMsg” and passing the PO Order number in the body.

image

When the “For each” shape completes we then compose the response message by replacing the existing line items with the items stored in the LineItem variable using the “SetProperty” expression.

image

Here is the code view of the Compose shape.

image

The final step is Response body to that of the Compose shape above.

image

That’s the development done for this integration pattern. Next we will create the POValidator logic app to simulate calling a Microservice. This Logic App just calculates the line total by multiplying the UnitPrice and Qty to crate a new property TotalPrice. I have also added a random Delay shape to simulate some line items taking longer to process.

Below is the workflow process for this Logic App. I won’t go into the detail of each action as this Logic App is just there to test the message re-sequencing.

image

Testing

To test this solution we are going to use one of my favourite tools PostMan. The following PO message will sent to the MessageResequencer Logic App. Notice the absence of the line item total, the POValidator Logic App will calculate this instead for us.

image

As you can see below, each line item has been sent to the POValidator Logic App and took different times to complete due  to the delay shape.

image

Here is the response message with the total price appended to each line item and in the correct sequence.

image

Now to prove that the line items do get re-ordered in the correct sequence, I will swap the line numbers around in the PO request message and with a different PO number.

image

Here is the response with the lines re-sequenced in the correct order.

image

Conclusion

Using Azure Functions and Table Storage makes it fairly easy to develop this integration pattern. To provide a more resilient repository for storing the messages temporary, you may want to consider using the Azure Cosmos Db.

Also the http connectors can also be swapped out for Service Bus connectors when communicating with services that are slow in responding.

You would have noticed that when you call the function method “AzureTableStoreMsg” it returns the following response message.

image

You can use the EOMReceived and TotalMessagesReceived to determine if an error has occurred or Microservice is taking too long to process the de-batched items. Under ideal conditions, the EOMIndex would be set on the last de-batched message and the TotalMessagesReceived equal the EOMIndex value.

Keep a watch out for my next blog where I will show you how re-sequence messages as they arrive.

Enjoy…

Content based message routing using Azure Logic Apps, Function and Service Bus

Content Based Routing (CBR) is another pattern used in the integration world. The contents of the message determines the endpoint of the message.

This article will describe one option to develop this pattern using an Azure Service Bus, an Azure Function and a Logic App.

Basically the service bus will be used to route the message to the correct endpoint using topics and subscriptions. The Azure Function is used to host and execute the business rules to inspect the message contents and set the routing properties. A logic app is used to accept the message and call the function passing the received message as an argument. Once the function executes the business rules, it will return the routing properties in the response body. The routing information is then used to set the properties on the service bus API connector in the Logic App before publishing the message onto the bus.

image

Scenario

To demonstrate a typical use-case of this pattern we have 2 message types, Sales Orders (SO) and Purchase Orders (PO). For the SO I want to send the order to a priority queue if the total sales amount is over a particular value. And for a PO, it should be sent to a pre-approval queue if the order value is under a specified amount.

Here is an example of a SO message to be routed:

image

And an example of a PO being sent:

image

Solution

The real smarts of this solution is the function which will return a common JSON response message to set the values for the Topic name and the custom properties on the service bus connector. The fields of the response message are described below.

  • TopicName – the name of service bus topic to send the message to.
  • CBRFilter_1 – used by the subscription rule to filter the value on. Depending on your own requirements you may need more fields to filter more granular.
  • RuleSetVersion – used by the subscription rule to filter the value on. It’s a good idea to have this field as you may have several versions of this rule in play at any one time.

Let’s start with provisioning the service bus topics and subscriptions for the 2 types of messages. First create 2 topics called purchaseorder and salesorder.

 

image

Now add the following subscriptions and rules for each of the topics.

Topic Name Subscription Name Rule
purchaseorder Approved_V1.00 CBRFilter_1 = ‘Approved’ and RuleSetVersion = ‘1.00’
purchaseorder NotApproved_V1.00 CBRFilter_1 = ‘ApprovedNot’ and RuleSetVersion = ‘1.00’
salesorder HighPriority_V1.00 CBRFilter_1 = ‘PriorityHigh’ and RuleSetVersion = ‘1.00’
salesorder LowPriority_V1.00 CBRFilter_1 = ‘PriorityLow’ and RuleSetVersion = ‘1.00’

Next is the development of the Azure function. This is best developed in Visual Studio where you can include a Unit Test project to each of the rules. Add a new Azure Function project to your solution.

image

After the project has been created, right click on the function and click Add -> New Item. Choose Azure Function, give it a name and select the Http trigger option.

image

Below is code for the HTTP trigger function which includes the class definition for the RoutingProperties object. I am checking for specific elements SalesOrderNumber, PurchaseOrderNumber in the JSON message to determine the type of message and which determines what rule code block to execute. Each rule block code will first set the TopicName and RuleSetVersion properties.

public static class CBRRule

   {

       [FunctionName("CBRRule")]

       public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Function,  "post", Route = null)]HttpRequestMessage req, TraceWriter log)

       {

           log.Info("C# HTTP trigger function processed a request.");

           var routingProperties = new RoutingProperties();


           // Get request body

           JObject data = await req.Content.ReadAsAsync<JObject>();


           //Is this a  sales order message type           

           if (data != null && data["SalesOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "PriorityLow";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "SalesOrder";


               var lineItems = data["Lines"];

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);


               //if the total sales is greater than $1000 send the message to the high priority queue

               if (totalSaleAmount > 1000)

                   routingProperties.CBRFilter_1 = "PriorityHigh";

           }


           //Is this a purchase order message type           

           if (data != null && data["PurchaseOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "ApprovedNot";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "PurchaseOrder";


               var lineItems = data["Lines"];

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);


               //Approve PO if the total order price is less than $500

               if (totalSaleAmount < 500)

                   routingProperties.CBRFilter_1 = "Approved";

           }


           return req.CreateResponse(HttpStatusCode.OK, routingProperties);

       }


       /// <summary>

       /// Response message to set the custom routing properties of the service bus

       /// </summary>

       public class RoutingProperties

       {

           public string TopicName { get; set; }

           public string CBRFilter_1 { get; set; }

           public string RuleSetVersion { get; set; }


           public RoutingProperties()

           {

               this.CBRFilter_1 = "Unknown";

               this.RuleSetVersion = "Unknown";

               this.TopicName = "Unknown";

           }

       }


   }

The business rule for a SO aggregates all the line items and checks if the total amount is greater than 1000, if it is then set the property CBRFilter_1 to “PriorityHigh”.

The business rule for a PO also aggregates all the line items and checks if the total amount is less than 500, if it is then set the property CBRFilter to “Approved”.

With the following input message sent to the function:

clip_image001

The output of the function should look similar to this below

:clip_image001[6]

Now we need to publish the function from Visual Studio to your Azure resource group using the publishing wizard.

clip_image002

The last component of this solution is the Logic App which is triggered by an HTTP Request API and then calls the Azure function created above. The basic flow looks like this below.

clip_image004

The HTTP Request trigger has no request body JSON schema created. The trigger must accept any type of message.

clip_image006

Add an Azure Function after the trigger action and select the method called “CBRRule”

clip_image008

Set the Request Body to the trigger body content and the Method to “POST”

clip_image010

Next add a Service Bus action and set the properties as shown. Both the Queue/Topic name and Properties are set from the function response message

.clip_image012

Here is the code view of the Send Message action showing how the properties are set.

clip_image014

Testing

Using PostMan we can send sample messages to the Logic App and then see them sitting on the service bus waiting to be processed by some other method.

At the moment the service bus should have no messages waiting to be processed as shown

.clip_image015

Using PostMan to send the following PO, we should see this message end up in the purchaseorder/NotApproved subscription.

clip_image016

All going well, the message will arrive at the correct subscription waiting to be processed as shown below using Service Bus Explorer.

clip_image018

Sending the following SO will also route the message to the correct subscriber.

clip_image019

clip_image021

Conclusion

CBR can be easily achievable using an Azure Function to execute your business rules on a message to set up the routing properties. Taking this a step further, I would abstract the business rules for each message type in its own class for manageability.

Also it is advisable to setup a Unit Test project for each of the classes holding the business rules to ensure 100% code testing coverage.

Enjoy…