Message Re-sequencing using Azure Functions and Table storage

Message re-sequencing  is another integration pattern used to rearrange messages received out-of-order and back into the original sequence again. Well you may ask yourself why is this pattern is so important in todays world of messaging systems.

As the integration world is moving towards services hosted as Microservices in the cloud due to its scalability attributes and independent deployable components. This may require breaking messages up into smaller logical segments or de-batching the messages to allow the services to manage the data more efficiently. Using this pattern, makes it possible to add the smaller segments back together again after being broken up in its correct order.

image

Scenario

A typical use case is a Purchase Order with multiple line items which require processing asynchronously by a Microservice at an individual line level. This service may calculate the taxes, check stock levels and add the total line price. Some line items may take longer to process than others before returning the updated line item.

The approach would be to de-batch the PO at the line level and send each line item to the Microservice. The response message from the Microservice would then go through a Message Re-Sequencer component to put the order lines back in the correct sequence again before returning the updated PO response.

Another use case would be to re-sequence messages after a scatter gather pattern (https://connectedcircuits.blog/2017/10/08/integration-scatter-gather-pattern-using-azure-logic-apps-and-service-bus/) before forwarding them on.

Approach

For this re-sequencer pattern to work, we require 3 attributes to be passed with each message to the messaging re-sequencer service, the sequence number of the message, the total number of messages to expect and a batch number.

A storage repository  is also required to temporary hold the out-of-sequence messages. The repository will be based on an Azure Table Storage as it provides all the requirements of storing NoSQL data structures, a Partition key to group the related messages together and a Row key to identity each message by the sequence number. By using the combination of these two keys we can quickly find and retrieve a message from the store.

An Azure Function is used to manage the saving and retrieving of messages from Table storage and is designed to be agnostic of the message types. This function is actually the core component of this solution.

The logic app is primarily there to receive the message, de-batch the line items and pass it onto the function.  Then forward on the message when all de-batched line items have been received.

There are two possibilities available as when to forward the re-ordered messages out.

  • On arrival – as a message arrives we check if any messages in the repository can be sent out before this one is sent.
  • Last message arrival –  where we send all the messages out I one hit after the final message  has been received.

This design will focus on the last option, where we will send all messages out in one hit. My next blog will cover how to send it out using the On Arrival method.

Design

Creating the Azure Table Storage

First we need to add an Azure Storage account to our resource group to store the incoming messages.

clip_image002

Take a copy of the connection string from Settings/Access keys of Storage account as we will require this latter.

clip_image004

Building the Azure Function

Start by creating an Azure Functions project in Visual Studio 2017.

clip_image006

Now add a new C# class to the project called MsgEntity with the following properties and an overloaded constructor shown below. Also we need to inherent the TableEntity base class from the “Microsoft.WindowsAzure.Storage.Table” assembly.

clip_image007

This class will be used as a container to hold the message contents and sequence numbering attributes.

Now we need to add the following three methods to the function described below.

  • AzureTableStoreMsg – Stores the message in Azure Table Storage and returns the total number of messages received. If the final message in the sequence has been previously received, then the date/time it was received is returned also.
  • AzureTableRetrieveMsg – Retrieves all messages from the table storage using the batch number or an individual message using the message sequence number.
  • AzureTableRemoveMsg – deletes all the messages from table storage related to the batch number.

To add a function method to the project, right click on the project and select New Item. From the Add New Item popup window, select Azure Function

 

.clip_image009

Then select the “Generic WebHook” type. This will allow a Logic App to access the interface from the workflow designer using the Azure Functions action shape latter.

clip_image010

Add the following code to this new method “AzureTableStoreMsg”

   1: [FunctionName("AzureTableStoreMsg")]

   2: public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3: {

   4:     log.Info($"Webhook was triggered!");

   5:

   6:     var lastMsgIndex = 0;

   7:     var msgCount = 0;

   8:     var eomDateTimeReceived = DateTime.MinValue;

   9:

  10:     try

  11:     {

  12:         //read the request headers              

  13:         var sequencenumber = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqNumber").FirstOrDefault());

  14:         var totalMsgCount = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqTotalCount").FirstOrDefault());

  15:         var batchnumber = req.Headers.GetValues("x-MsgReseqBatchId").FirstOrDefault();

  16:         var isEndMsg = totalMsgCount == sequencenumber ? true : false;

  17:

  18:         string jsonContent = await req.Content.ReadAsStringAsync();

  19:

  20:         var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  21:         // Create the table client.

  22:         CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  23:         // Retrieve a reference to the table.

  24:         CloudTable table = tableClient.GetTableReference("msgsequence");

  25:         // Create the table if it doesn't exist.

  26:         table.CreateIfNotExists();

  27:

  28:         // Get request body and initialise a MsgEntity object.

  29:         var requestBody = await req.Content.ReadAsAsync<object>();

  30:         var msgEntity = new MsgEntity(batchnumber, sequencenumber, isEndMsg);

  31:         msgEntity.Message = requestBody.ToString();

  32:

  33:         // Create the TableOperation object that inserts the message entity. Will rasie error if duplicate found.

  34:         TableOperation insertOperation = TableOperation.InsertOrReplace(msgEntity);

  35:

  36:         // Execute the insert operation.

  37:         await table.ExecuteAsync(insertOperation);

  38:

  39:         //iterate through all the messages for this partition check if the last message has been received and the number of messages

  40:         var queryMsgType = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  41:                            .Select(new string[] { "RowKey", "Timestamp", "IsLastMessage" });

  42:

  43:         foreach (MsgEntity entity in table.ExecuteQuery(queryMsgType))

  44:         {

  45:             if (entity.IsLastMessage)

  46:             {

  47:                 lastMsgIndex = Convert.ToInt32(entity.RowKey);

  48:                 eomDateTimeReceived = entity.Timestamp.UtcDateTime;

  49:             }

  50:             msgCount++;

  51:         }

  52:

  53:     }

  54:     catch (Exception ex)

  55:     {

  56:         return req.CreateResponse(HttpStatusCode.InternalServerError, ex.Message);

  57:     }

  58:

  59:     // Return the number of messages when the completion message arrived.

  60:     return req.CreateResponse(HttpStatusCode.OK, new { EOMReceived = eomDateTimeReceived, EOMIndex = lastMsgIndex, TotalMessagesReceived = msgCount });

  61:

  62: }

The current message sequence number, total number of messages and the batch number are passed in as custom header values while the message is passed in the body payload. This function will create a table called “msgsequence” if it does not exist and will populate the custom object with the message properties and the message before storing the object in the Azure Table. The response message from this function returns the total number of messages received and if the last message in the sequence has been received. Below is an example of the response message showing the number of messages it has received so far and when the last message sequence was received.

image

Add another Azure Function as before called “AzureTableRetrieveMsg” for retrieving messages from the Table store and add the following code below.

   1: [FunctionName("AzureTableRetreiveMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string,string>>(jsonContent);

   8:

   9:            var sequencenumber = data["MsgSequenceNUmber"];

  10:            var batchnumber = data["MsgBatchId"];

  11:

  12:

  13:            var storageAccount = Microsoft.WindowsAzure.Storage.CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  14:            // Create the table client.

  15:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  16:            // Retrieve a reference to the table.

  17:            CloudTable table = tableClient.GetTableReference("msgsequence");

  18:

  19:            // Construct the query operation for all entities where PartitionKey equals msgtype.

  20:            TableQuery<MsgEntity> query = null;

  21:            if (string.IsNullOrEmpty(sequencenumber))

  22:            {

  23:                query = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber));

  24:            }

  25:            else

  26:            {

  27:                var sequenceFilter = TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, string.Format(sequencenumber,"0000000000"));

  28:                var msgTypeFilter = TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber);

  29:                query = new TableQuery<MsgEntity>().Where(TableQuery.CombineFilters(sequenceFilter, TableOperators.And, msgTypeFilter));

  30:            }

  31:

  32:            var list = new List<dynamic>();

  33:            foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  34:            {

  35:                list.Add(JsonConvert.DeserializeObject(entity.Message));

  36:            }

  37:

  38:            return req.CreateResponse(HttpStatusCode.OK, list);

  39:        }

This function will optionally return all records for the batch number or a single message matching the passed in sequence number. A sample request message is shown below which will return all records matching the batch number if the MsgSequenceNumber is left blank.

image

The last Azure Function method to add is “AzureTableRemoveMsg”.

   1: [FunctionName("AzureTableDeleteMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string, string>>(jsonContent);

   8:

   9:            var batchnumber = data["MsgBatchId"];

  10:

  11:            var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  12:            // Create the table client.

  13:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  14:            // Retrieve a reference to the table.

  15:            CloudTable table = tableClient.GetTableReference("msgsequence");

  16:

  17:            // Construct the query operation for all entities where PartitionKey equals batchnumber.            

  18:            if (!string.IsNullOrEmpty(batchnumber))

  19:            {

  20:                TableQuery<MsgEntity> query = new TableQuery<MsgEntity>()

  21:                            .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  22:                            .Select(new string[] { "PartitionKey", "RowKey" });

  23:

  24:                if (table.ExecuteQuery(query).Count() == 0)

  25:                {

  26:                    return req.CreateResponse(HttpStatusCode.NotFound);

  27:                }

  28:                foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  29:                {

  30:                    TableOperation deleteOperation = TableOperation.Delete(entity);

  31:                    await table.ExecuteAsync(deleteOperation);

  32:                }

  33:                return req.CreateResponse(HttpStatusCode.OK);

  34:            }

  35:

  36:

  37:            return req.CreateResponse(HttpStatusCode.BadRequest);

  38:        }

This is to delete all the messages from the Table storage for a batch number. A sample request is shown below.

image

Next we need to add the Azure Table storage connection string to the local.settings.json file in the project.

clip_image012

Before publishing the function to Azure, be sure to create a Unit Test project and check its functionality.

Publishing the Function to Azure

Right click on the project and click Publish. This will take you to the publishing wizard form. Choose “Azure Function App” and “Create New”, then click the publish button.

clip_image014

In the next screen add a function name, subscription and hosting plans etc. Then click the create button to provision any new resources and to publish the function.

clip_image016

Once the function has been published, we need to log into the Azure portal and add the Storage Account connection string to the Application settings file. Double click on the azure function app in the Azure portal.

clip_image018

Then click on the Application settings link.

clip_image020

In the Application settings section, add a new key called “StorageConnectionString” and add the connection string that was used in the local.settings.json in the Visual Studio Function project. Then click the “Save” button at the top of the screen.

clip_image022

 

Logic App for message de-batching and re-sequencing

Next we will create the Logic App to de-batch the individual line items in a parallel fashion to send to an other Logic App for processing which is acting as a Microservice.  The response is then passed to the function we created above. Once all the messages have been received from the Microservice, we call another function method to return all the stored responses in the correct order before returning the result back to the consumer.

The message will be based on a PO (Purchase Order) shown below. We will be using the OrderNumber as the batch number and the LineNumber’s for re-sequencing the line items later.

image

Below is the collapsed view of the Logic App. It is triggered by a PO (Purchase Order) being posted to the HTTP endpoint with the schema is based on the PO message above.

image

Two variables are required for holding the total number of  line items and the PO lines from the Message Re-sequencer function.

image

The code view for the Initialize variable MaxLineItems is shown below, where we just take the length of the Lines collection.

image

After the variables are initialised, is the For each loop shape where we iterate through the PO lines of the message.

image

The POValidator action calls another Logic App passing the line item which simulates a Microservice to calculate the total line amount and could possibly check stock levels.

image

The response is then passed onto the AzureTableStoreMsg function method which was published to Azure in the previous steps.

Functions that have been created as generic webhooks can be selected from the Logic App designer by adding an action to the designer and choosing Azure Functions

.image

After selecting Azure functions, you should be able to see all your published functions. Click on the Azure function that was published above.

image

This will then display all the methods that we had created above. Make sure you select the AzureTableStoreMsg method from the list.

image

After the function method has been added, setup the properties as shown.

image

The body and header properties are set to the following values.

  • Request Body  – output body of the POValidator Logic App (Microservice)
  • x-MsgReseqBatchId  – the OrderNumber of the trigger message
  • x-MsgReseqNumber – the line number of the item in the loop
  • x-MsgReseqTotalCount – is the value of the MaxLineItems variable which was initiated at the start of the workflow.

And the code view for this action is shown here:

image

The step in the workflow is to check if we have received all the messages. This is done by checking the response message from the function method AzureTableStoreMsg and the two properties EOMIndex and TotalReceived to see if they are equal.

image

The syntax for the condition action “Check msg count” is as follows:

@equals(body(‘AzureTableStoreMsg’)[‘EOMIndex’], body(‘AzureTableStoreMsg’)[‘TotalMessagesReceived’])

If the condition is true, then the next step is to retrieve all the messages in the correct order from the Table Storage using the function method “AzureTableRetrieveMsg”. Follow the steps as before to add an action to call this function method. The only property to send in the request body is the MsgBatchId which is set to the PO Order number.

image

The code view for this shape looks like this.

image

We then set the variable LineItems to the output of the function method “AzureTableRetrieveMsg”.

image

Once we set the variable, we can then clear the table storage of this data by calling the function method “AzureTableDeleteMsg” and passing the PO Order number in the body.

image

When the “For each” shape completes we then compose the response message by replacing the existing line items with the items stored in the LineItem variable using the “SetProperty” expression.

image

Here is the code view of the Compose shape.

image

The final step is Response body to that of the Compose shape above.

image

That’s the development done for this integration pattern. Next we will create the POValidator logic app to simulate calling a Microservice. This Logic App just calculates the line total by multiplying the UnitPrice and Qty to crate a new property TotalPrice. I have also added a random Delay shape to simulate some line items taking longer to process.

Below is the workflow process for this Logic App. I won’t go into the detail of each action as this Logic App is just there to test the message re-sequencing.

image

Testing

To test this solution we are going to use one of my favourite tools PostMan. The following PO message will sent to the MessageResequencer Logic App. Notice the absence of the line item total, the POValidator Logic App will calculate this instead for us.

image

As you can see below, each line item has been sent to the POValidator Logic App and took different times to complete due  to the delay shape.

image

Here is the response message with the total price appended to each line item and in the correct sequence.

image

Now to prove that the line items do get re-ordered in the correct sequence, I will swap the line numbers around in the PO request message and with a different PO number.

image

Here is the response with the lines re-sequenced in the correct order.

image

Conclusion

Using Azure Functions and Table Storage makes it fairly easy to develop this integration pattern. To provide a more resilient repository for storing the messages temporary, you may want to consider using the Azure Cosmos Db.

Also the http connectors can also be swapped out for Service Bus connectors when communicating with services that are slow in responding.

You would have noticed that when you call the function method “AzureTableStoreMsg” it returns the following response message.

image

You can use the EOMReceived and TotalMessagesReceived to determine if an error has occurred or Microservice is taking too long to process the de-batched items. Under ideal conditions, the EOMIndex would be set on the last de-batched message and the TotalMessagesReceived equal the EOMIndex value.

Keep a watch out for my next blog where I will show you how re-sequence messages as they arrive.

Enjoy…

3 Replies to “Message Re-sequencing using Azure Functions and Table storage”

Leave a Reply

Your email address will not be published. Required fields are marked *