Scatter-Gather pattern using an Azure Event Grid

This is another version of the scatter-gather integration pattern which I previously blogged about some time ago.  The pervious version was based on using  service bus topics and Logic Apps which polled for new messages. Whilst this was adequate for a messaging system that did not require immediate request/response times, it was not an ideal solution for another project I was working on. This project required a website to initiate the request to a scatter-gather pattern and to have low latency response times of a few seconds.

With the recently introduced Event Grid service in Azure https://azure.microsoft.com/en-us/services/event-grid/, I was able to achieve request/responses in seconds using the scatter-gather pattern. This is possible because Event Grid provides event based notifications, unlike a service bus architecture which requires regular polling for new messages on the bus.

image

Using the design above enables the client to define how long to wait after the initial request before retrieving the responses from the services via the gatherer Logic App.

Note if you require a durable message delivery, I would still recommend using a service bus which provides peek-lock, FIFO, dead-lettering, transactional support.

Scenario

A website is required to validate a PO and display the customers details, the current stock level and the price when requesting information about a product. The information is to be sourced from 3 different API services, the customer information from CRM, product stock levels from an Inventory service and the product price from a Pricing service.

Approach

The website will send a request to a HTTPS Event Grid endpoint. This will publish the message to the 3 Logic App subscriptions which are all triggered by a HTTP request action.

One of the properties sent with each subscription is a message Id which will be used as the session Id for the service bus when returning the API response messages.

Each Logic App will map the request message to the required API format and wait for the API response message to be placed onto the service bus.

When the website is ready to receive the aggregated responses from the service bus, it will make a HTTP Get request to another Logic App which will then read all the messages from the service bus matching the session Id key and combine them into a single composite message.

Design

API Proxy Logic Apps

To keep the demo simple, each of the proxy Logic Apps will compose a static response message using some of the properties found in the request message. In a practical solution you would map the request message to the correct format required by the API service after the Parse JSON action and add a HTTP Request/Response action after the map to call the API service.

image

The details of each action is shown below.

Take note of each HTTP POST URL address in the logic apps as this will be required when setting up the subscriptions in Event Grid. The Request Body JSON Schema can be left blank.

image

Note an indexer is required for the body contents as it is normally sent as a collection.

image

Here we are composing a static  CRM response message to send back. The contents will be different for each proxy Logic App.

image

The composed message is put onto the service bus using the EventGrid Id as the Session Id. This is to group the messages from the other proxy Logic Apps. A custom property is used to indicate the source of the message and is used latter when aggregating the other response messages from other proxy Logic Apps.

image

The code view of the Send Message service bus connector shown below. Note the custom property “ServiceName”and the value “CRM”

image

The other 2 proxy Logic Apps for checking inventory and pricing are similar in design to this one. The only differences are the composing of the static response message and the value of the Service Name source property in the service bus connector.

Creating an Azure Event Grid

From the Azure Portal search for Event Grid and then click create.

image

Add a suitable name and the resource group.

image

Once the resource has been created, you will need to take a copy of the Access key and Topic Endpoint. This will be used latter when posting messages to this endpoint.

image

After the resource has been created, subscriptions will need to be created for each of the proxy Logic Apps using the HTTP POST URL addresses for the web hook endpoints. This is done by clicking the Event Subscription icon at the top menu bar.

image

We will need to create 3 separate subscriptions, one for each of the proxy Logic Apps. They all share the same Event Type and Prefix Filter, the only difference between them is the Name and the Subscriber Endpoint URL.

image

Once completed, you should have the following 3 subscriptions created.

image

Service Bus Queue

Next a SB queue is required to place the response messages returned from the API services that where called in each of the Micro Service proxy Logic Apps. The Basic pricing tier may be used as we do not require any topics to be created

.image

Once the Service Bus has been created, we need to create a queue with sessions enabled. Sessions are required to group all the messages with the same Id so they can processed together.

image

Message Gatherer Logic App

The purpose of this Logic App is to retrieve all the messages from the service bus matching a session Id and then to aggregate the messages into a single composite message. This Logic App is triggered by the website when it is ready to retrieve the composite responses from the services.

image

Details of each action shape is described below.

The “When a HTTP request is received” trigger is a GET method with the message {id} in the URL as a resource.

image

This variable is used to append all the API response messages.

image

Here we get all the messages from the service bus using the id from the query string of the HTTP Get as the Session Id.

image

We then iterate through all the messages received from the service bus queue and append each message to the responseMsg variable before completing the message in the queue.

image

Code view for the “Append to service response” is shown below. I am using the service bus custom property “ServiceName” that was set in the API Proxy Logic App for the message name.

image

Code view for the “Complete the message in a queue” is shown below.

image

Once all the messages have completed, we close the session in the queue using the Id from the query string of the HTTP Get.

image

The last step is to return the aggregated message as a typed JSON message.

image

Below is code view for the Response Action.

image

Console Application

To keep things simple I will use a console application to mimic a website publishing a request to the scatter-gather service. The same code can be put into a helper project and used by a website.

When publishing events to the Event Grid, it must conform to the following schema. The message to send to the Event Grid is placed in the “data” element.
{
“topic”: string,
“subject”: string,
“id”: string,
“eventType”: string,
“eventTime”: string,
“data”:{
object-unique-to-each-publisher
},
“dataVersion”: string,
“metadataVersion”: string
}
]

More information about the schema and properties can be found here:- https://docs.microsoft.com/en-us/azure/event-grid/event-schema

For the scatter-gather pattern I will be setting the “id” property to a Guid string value which will be used as the session key on the service bus. Both the eventType and subject properties will be used for the subscription filter.

I have created two custom objects to represent the Purchase Order and the EventGrid models.

Purchase Order Object
  1. public class PurchaseOrder
  2. {
  3.     public string OrderNumber { get; set; }
  4.     public string CustomerNumber { get; set; }
  5.     public string ProductCode { get; set; }
  6. }
Event Grid Object
  1. public class GridEvent<T> where T : class
  2.     {
  3.         public string Id { get; set; }
  4.         public string Subject { get; set; }
  5.         public string EventType { get; set; }
  6.         public T Data { get; set; }
  7.         public DateTime EventTime { get; set; }
  8.     }

The console application consists of 2 functions, one to send the request to the Event Grid and another to request the aggregated message from the Message Gatherer Logic App.

The “SendMsgAsync” function below accepts a event grid object as an argument and returns the status of the post action. It adds the parameter into a collection and serialises the object before posting it to the EventGrid. Remember to update the sasKey variable with the value from your EventGrid resource.

Send Message Func
  1. private static async Task<string> SendMsgAsync(GridEvent<PurchaseOrder> gridEvent)
  2.         {
  3.             var payloadList = new List<GridEvent<PurchaseOrder>>();
  4.             payloadList.Add(gridEvent);
  5.             string topicEndpoint = “https://scatter-eventgrid.westus2-1.eventgrid.azure.net/api/events”;
  6.             //add the event grid access key
  7.             string sasKey = “<Event Gris sas key>”;
  8.             // Event grid expects event data as JSON
  9.             var json = JsonConvert.SerializeObject(payloadList);
  10.             // Create request which will be sent to the topic
  11.             var content = new StringContent(json, Encoding.UTF8, “application/json”);
  12.             // Create a HTTP client which we will use to post to the Event Grid Topic
  13.             var httpClient = new HttpClient();
  14.             // Add key in the request headers
  15.             httpClient.DefaultRequestHeaders.Add(“aeg-sas-key”, sasKey);
  16.             // Send request
  17.             Console.WriteLine(“Sending event to Event Grid…”);
  18.             var result = await httpClient.PostAsync(topicEndpoint, content);
  19.             return result.ReasonPhrase;
  20.         }

 

The “ReadMsgAsync” function below accepts the batchId as an argument. It is used to make a HTTP Get to the API Gatherer Logic App to retrieve the composite message. It uses the batchId as a resource location in the call-back URL of the logic app.

You will need to add the {0} token after the …invoke/ resource location as highlighted. https://prod-27.australiasoutheast.logic.azure.com/workflows/16…/triggers/manual/paths/invoke/{0}?api-version=2016-10-01&sp=%2Ftriggers%2Fmanual%2Frun&sv=1.0&sig=k-5w..

Read Message Func
  1. private static async Task<string> ReadMsgAsync( string batchId)
  2. {
  3.     var uri = string.Format(“<API Gatherer Logic App Callback url>”, batchId);
  4.     var httpClient = new HttpClient();
  5.     return await httpClient.GetStringAsync(uri);
  6. }

 

In the main function below, we create a sample PO and an EventGrid message object. We then call the SendMsgAsync function to post the message to the EventGrid in Azure. Then after waiting for a second we call the ReadMsgAsync function to read the composite message from the Logic App. Note we are using the batchId as the EventGrid object id to group all the response messages together from the API services.

Console Main Func
  1. static void Main(string[] args)
  2. {
  3.     //create id for for service bus sessions
  4.     var batchId = Guid.NewGuid().ToString();
  5.     var payload = new GridEvent<PurchaseOrder>
  6.     {
  7.         Data = new PurchaseOrder { OrderNumber = “1234”, CustomerNumber = “ABC100”, ProductCode = “PRD2043” },
  8.         EventTime = DateTime.UtcNow,
  9.         EventType = “orders”,
  10.         Id = batchId,
  11.         Subject = “po/validation”
  12.     };
  13.     var rc = SendMsgAsync(payload).Result;
  14.     Console.WriteLine($”Event sent with result:” + rc);
  15.     Thread.Sleep(1000);
  16.     var msg = ReadMsgAsync(batchId).Result;
  17.     Console.WriteLine(“Response Message…”);
  18.     Console.WriteLine(msg);
  19.     Console.ReadLine();
  20. }

Testing

Executing the console application displays the successful post to the EventGrid and the results from the API Gatherer Logic App after a few seconds.

image

Calling the ReadMsgAsync too quickly may not allow the API services to respond in time and some messages may be left out as shown here below.

image

Conclusion

Using the Azure EventGrid is very useful for scenarios where you want to publish a message to several subscribers that are triggered by web-hooks. If the web-hook does not acknowledge receipt of the message, then EventGrid will try again several times using an exponential back off  period from 10 seconds to an hour.

If you request the composite response message from the API services too soon, you may miss some of the responses from the Micro Services. It is advisable to check the composite message for any missing responses before continuing to process the composite message

The monthly running cost of this solution is very minimal when compared to a service bus topic version of this pattern which requires polling at regulars intervals. Where each poll incorporates a trigger cost.

This same pattern can also be used to design an asynchronous request/response messaging system also.

Enjoy…

Message Re-sequencing using Azure Functions and Table storage

Message re-sequencing  is another integration pattern used to rearrange messages received out-of-order and back into the original sequence again. Well you may ask yourself why is this pattern is so important in todays world of messaging systems.

As the integration world is moving towards services hosted as Microservices in the cloud due to its scalability attributes and independent deployable components. This may require breaking messages up into smaller logical segments or de-batching the messages to allow the services to manage the data more efficiently. Using this pattern, makes it possible to add the smaller segments back together again after being broken up in its correct order.

image

Scenario

A typical use case is a Purchase Order with multiple line items which require processing asynchronously by a Microservice at an individual line level. This service may calculate the taxes, check stock levels and add the total line price. Some line items may take longer to process than others before returning the updated line item.

The approach would be to de-batch the PO at the line level and send each line item to the Microservice. The response message from the Microservice would then go through a Message Re-Sequencer component to put the order lines back in the correct sequence again before returning the updated PO response.

Another use case would be to re-sequence messages after a scatter gather pattern (https://connectedcircuits.blog/2017/10/08/integration-scatter-gather-pattern-using-azure-logic-apps-and-service-bus/) before forwarding them on.

Approach

For this re-sequencer pattern to work, we require 3 attributes to be passed with each message to the messaging re-sequencer service, the sequence number of the message, the total number of messages to expect and a batch number.

A storage repository  is also required to temporary hold the out-of-sequence messages. The repository will be based on an Azure Table Storage as it provides all the requirements of storing NoSQL data structures, a Partition key to group the related messages together and a Row key to identity each message by the sequence number. By using the combination of these two keys we can quickly find and retrieve a message from the store.

An Azure Function is used to manage the saving and retrieving of messages from Table storage and is designed to be agnostic of the message types. This function is actually the core component of this solution.

The logic app is primarily there to receive the message, de-batch the line items and pass it onto the function.  Then forward on the message when all de-batched line items have been received.

There are two possibilities available as when to forward the re-ordered messages out.

  • On arrival – as a message arrives we check if any messages in the repository can be sent out before this one is sent.
  • Last message arrival –  where we send all the messages out I one hit after the final message  has been received.

This design will focus on the last option, where we will send all messages out in one hit. My next blog will cover how to send it out using the On Arrival method.

Design

Creating the Azure Table Storage

First we need to add an Azure Storage account to our resource group to store the incoming messages.

clip_image002

Take a copy of the connection string from Settings/Access keys of Storage account as we will require this latter.

clip_image004

Building the Azure Function

Start by creating an Azure Functions project in Visual Studio 2017.

clip_image006

Now add a new C# class to the project called MsgEntity with the following properties and an overloaded constructor shown below. Also we need to inherent the TableEntity base class from the “Microsoft.WindowsAzure.Storage.Table” assembly.

clip_image007

This class will be used as a container to hold the message contents and sequence numbering attributes.

Now we need to add the following three methods to the function described below.

  • AzureTableStoreMsg – Stores the message in Azure Table Storage and returns the total number of messages received. If the final message in the sequence has been previously received, then the date/time it was received is returned also.
  • AzureTableRetrieveMsg – Retrieves all messages from the table storage using the batch number or an individual message using the message sequence number.
  • AzureTableRemoveMsg – deletes all the messages from table storage related to the batch number.

To add a function method to the project, right click on the project and select New Item. From the Add New Item popup window, select Azure Function

 

.clip_image009

Then select the “Generic WebHook” type. This will allow a Logic App to access the interface from the workflow designer using the Azure Functions action shape latter.

clip_image010

Add the following code to this new method “AzureTableStoreMsg”

   1: [FunctionName("AzureTableStoreMsg")]

   2: public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3: {

   4:     log.Info($"Webhook was triggered!");

   5:

   6:     var lastMsgIndex = 0;

   7:     var msgCount = 0;

   8:     var eomDateTimeReceived = DateTime.MinValue;

   9:

  10:     try

  11:     {

  12:         //read the request headers              

  13:         var sequencenumber = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqNumber").FirstOrDefault());

  14:         var totalMsgCount = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqTotalCount").FirstOrDefault());

  15:         var batchnumber = req.Headers.GetValues("x-MsgReseqBatchId").FirstOrDefault();

  16:         var isEndMsg = totalMsgCount == sequencenumber ? true : false;

  17:

  18:         string jsonContent = await req.Content.ReadAsStringAsync();

  19:

  20:         var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  21:         // Create the table client.

  22:         CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  23:         // Retrieve a reference to the table.

  24:         CloudTable table = tableClient.GetTableReference("msgsequence");

  25:         // Create the table if it doesn't exist.

  26:         table.CreateIfNotExists();

  27:

  28:         // Get request body and initialise a MsgEntity object.

  29:         var requestBody = await req.Content.ReadAsAsync<object>();

  30:         var msgEntity = new MsgEntity(batchnumber, sequencenumber, isEndMsg);

  31:         msgEntity.Message = requestBody.ToString();

  32:

  33:         // Create the TableOperation object that inserts the message entity. Will rasie error if duplicate found.

  34:         TableOperation insertOperation = TableOperation.InsertOrReplace(msgEntity);

  35:

  36:         // Execute the insert operation.

  37:         await table.ExecuteAsync(insertOperation);

  38:

  39:         //iterate through all the messages for this partition check if the last message has been received and the number of messages

  40:         var queryMsgType = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  41:                            .Select(new string[] { "RowKey", "Timestamp", "IsLastMessage" });

  42:

  43:         foreach (MsgEntity entity in table.ExecuteQuery(queryMsgType))

  44:         {

  45:             if (entity.IsLastMessage)

  46:             {

  47:                 lastMsgIndex = Convert.ToInt32(entity.RowKey);

  48:                 eomDateTimeReceived = entity.Timestamp.UtcDateTime;

  49:             }

  50:             msgCount++;

  51:         }

  52:

  53:     }

  54:     catch (Exception ex)

  55:     {

  56:         return req.CreateResponse(HttpStatusCode.InternalServerError, ex.Message);

  57:     }

  58:

  59:     // Return the number of messages when the completion message arrived.

  60:     return req.CreateResponse(HttpStatusCode.OK, new { EOMReceived = eomDateTimeReceived, EOMIndex = lastMsgIndex, TotalMessagesReceived = msgCount });

  61:

  62: }

The current message sequence number, total number of messages and the batch number are passed in as custom header values while the message is passed in the body payload. This function will create a table called “msgsequence” if it does not exist and will populate the custom object with the message properties and the message before storing the object in the Azure Table. The response message from this function returns the total number of messages received and if the last message in the sequence has been received. Below is an example of the response message showing the number of messages it has received so far and when the last message sequence was received.

image

Add another Azure Function as before called “AzureTableRetrieveMsg” for retrieving messages from the Table store and add the following code below.

   1: [FunctionName("AzureTableRetreiveMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string,string>>(jsonContent);

   8:

   9:            var sequencenumber = data["MsgSequenceNUmber"];

  10:            var batchnumber = data["MsgBatchId"];

  11:

  12:

  13:            var storageAccount = Microsoft.WindowsAzure.Storage.CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  14:            // Create the table client.

  15:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  16:            // Retrieve a reference to the table.

  17:            CloudTable table = tableClient.GetTableReference("msgsequence");

  18:

  19:            // Construct the query operation for all entities where PartitionKey equals msgtype.

  20:            TableQuery<MsgEntity> query = null;

  21:            if (string.IsNullOrEmpty(sequencenumber))

  22:            {

  23:                query = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber));

  24:            }

  25:            else

  26:            {

  27:                var sequenceFilter = TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, string.Format(sequencenumber,"0000000000"));

  28:                var msgTypeFilter = TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber);

  29:                query = new TableQuery<MsgEntity>().Where(TableQuery.CombineFilters(sequenceFilter, TableOperators.And, msgTypeFilter));

  30:            }

  31:

  32:            var list = new List<dynamic>();

  33:            foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  34:            {

  35:                list.Add(JsonConvert.DeserializeObject(entity.Message));

  36:            }

  37:

  38:            return req.CreateResponse(HttpStatusCode.OK, list);

  39:        }

This function will optionally return all records for the batch number or a single message matching the passed in sequence number. A sample request message is shown below which will return all records matching the batch number if the MsgSequenceNumber is left blank.

image

The last Azure Function method to add is “AzureTableRemoveMsg”.

   1: [FunctionName("AzureTableDeleteMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string, string>>(jsonContent);

   8:

   9:            var batchnumber = data["MsgBatchId"];

  10:

  11:            var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  12:            // Create the table client.

  13:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  14:            // Retrieve a reference to the table.

  15:            CloudTable table = tableClient.GetTableReference("msgsequence");

  16:

  17:            // Construct the query operation for all entities where PartitionKey equals batchnumber.            

  18:            if (!string.IsNullOrEmpty(batchnumber))

  19:            {

  20:                TableQuery<MsgEntity> query = new TableQuery<MsgEntity>()

  21:                            .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  22:                            .Select(new string[] { "PartitionKey", "RowKey" });

  23:

  24:                if (table.ExecuteQuery(query).Count() == 0)

  25:                {

  26:                    return req.CreateResponse(HttpStatusCode.NotFound);

  27:                }

  28:                foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  29:                {

  30:                    TableOperation deleteOperation = TableOperation.Delete(entity);

  31:                    await table.ExecuteAsync(deleteOperation);

  32:                }

  33:                return req.CreateResponse(HttpStatusCode.OK);

  34:            }

  35:

  36:

  37:            return req.CreateResponse(HttpStatusCode.BadRequest);

  38:        }

This is to delete all the messages from the Table storage for a batch number. A sample request is shown below.

image

Next we need to add the Azure Table storage connection string to the local.settings.json file in the project.

clip_image012

Before publishing the function to Azure, be sure to create a Unit Test project and check its functionality.

Publishing the Function to Azure

Right click on the project and click Publish. This will take you to the publishing wizard form. Choose “Azure Function App” and “Create New”, then click the publish button.

clip_image014

In the next screen add a function name, subscription and hosting plans etc. Then click the create button to provision any new resources and to publish the function.

clip_image016

Once the function has been published, we need to log into the Azure portal and add the Storage Account connection string to the Application settings file. Double click on the azure function app in the Azure portal.

clip_image018

Then click on the Application settings link.

clip_image020

In the Application settings section, add a new key called “StorageConnectionString” and add the connection string that was used in the local.settings.json in the Visual Studio Function project. Then click the “Save” button at the top of the screen.

clip_image022

 

Logic App for message de-batching and re-sequencing

Next we will create the Logic App to de-batch the individual line items in a parallel fashion to send to an other Logic App for processing which is acting as a Microservice.  The response is then passed to the function we created above. Once all the messages have been received from the Microservice, we call another function method to return all the stored responses in the correct order before returning the result back to the consumer.

The message will be based on a PO (Purchase Order) shown below. We will be using the OrderNumber as the batch number and the LineNumber’s for re-sequencing the line items later.

image

Below is the collapsed view of the Logic App. It is triggered by a PO (Purchase Order) being posted to the HTTP endpoint with the schema is based on the PO message above.

image

Two variables are required for holding the total number of  line items and the PO lines from the Message Re-sequencer function.

image

The code view for the Initialize variable MaxLineItems is shown below, where we just take the length of the Lines collection.

image

After the variables are initialised, is the For each loop shape where we iterate through the PO lines of the message.

image

The POValidator action calls another Logic App passing the line item which simulates a Microservice to calculate the total line amount and could possibly check stock levels.

image

The response is then passed onto the AzureTableStoreMsg function method which was published to Azure in the previous steps.

Functions that have been created as generic webhooks can be selected from the Logic App designer by adding an action to the designer and choosing Azure Functions

.image

After selecting Azure functions, you should be able to see all your published functions. Click on the Azure function that was published above.

image

This will then display all the methods that we had created above. Make sure you select the AzureTableStoreMsg method from the list.

image

After the function method has been added, setup the properties as shown.

image

The body and header properties are set to the following values.

  • Request Body  – output body of the POValidator Logic App (Microservice)
  • x-MsgReseqBatchId  – the OrderNumber of the trigger message
  • x-MsgReseqNumber – the line number of the item in the loop
  • x-MsgReseqTotalCount – is the value of the MaxLineItems variable which was initiated at the start of the workflow.

And the code view for this action is shown here:

image

The step in the workflow is to check if we have received all the messages. This is done by checking the response message from the function method AzureTableStoreMsg and the two properties EOMIndex and TotalReceived to see if they are equal.

image

The syntax for the condition action “Check msg count” is as follows:

@equals(body(‘AzureTableStoreMsg’)[‘EOMIndex’], body(‘AzureTableStoreMsg’)[‘TotalMessagesReceived’])

If the condition is true, then the next step is to retrieve all the messages in the correct order from the Table Storage using the function method “AzureTableRetrieveMsg”. Follow the steps as before to add an action to call this function method. The only property to send in the request body is the MsgBatchId which is set to the PO Order number.

image

The code view for this shape looks like this.

image

We then set the variable LineItems to the output of the function method “AzureTableRetrieveMsg”.

image

Once we set the variable, we can then clear the table storage of this data by calling the function method “AzureTableDeleteMsg” and passing the PO Order number in the body.

image

When the “For each” shape completes we then compose the response message by replacing the existing line items with the items stored in the LineItem variable using the “SetProperty” expression.

image

Here is the code view of the Compose shape.

image

The final step is Response body to that of the Compose shape above.

image

That’s the development done for this integration pattern. Next we will create the POValidator logic app to simulate calling a Microservice. This Logic App just calculates the line total by multiplying the UnitPrice and Qty to crate a new property TotalPrice. I have also added a random Delay shape to simulate some line items taking longer to process.

Below is the workflow process for this Logic App. I won’t go into the detail of each action as this Logic App is just there to test the message re-sequencing.

image

Testing

To test this solution we are going to use one of my favourite tools PostMan. The following PO message will sent to the MessageResequencer Logic App. Notice the absence of the line item total, the POValidator Logic App will calculate this instead for us.

image

As you can see below, each line item has been sent to the POValidator Logic App and took different times to complete due  to the delay shape.

image

Here is the response message with the total price appended to each line item and in the correct sequence.

image

Now to prove that the line items do get re-ordered in the correct sequence, I will swap the line numbers around in the PO request message and with a different PO number.

image

Here is the response with the lines re-sequenced in the correct order.

image

Conclusion

Using Azure Functions and Table Storage makes it fairly easy to develop this integration pattern. To provide a more resilient repository for storing the messages temporary, you may want to consider using the Azure Cosmos Db.

Also the http connectors can also be swapped out for Service Bus connectors when communicating with services that are slow in responding.

You would have noticed that when you call the function method “AzureTableStoreMsg” it returns the following response message.

image

You can use the EOMReceived and TotalMessagesReceived to determine if an error has occurred or Microservice is taking too long to process the de-batched items. Under ideal conditions, the EOMIndex would be set on the last de-batched message and the TotalMessagesReceived equal the EOMIndex value.

Keep a watch out for my next blog where I will show you how re-sequence messages as they arrive.

Enjoy…

Content based message routing using Azure Logic Apps, Function and Service Bus

Content Based Routing (CBR) is another pattern used in the integration world. The contents of the message determines the endpoint of the message.

This article will describe one option to develop this pattern using an Azure Service Bus, an Azure Function and a Logic App.

Basically the service bus will be used to route the message to the correct endpoint using topics and subscriptions. The Azure Function is used to host and execute the business rules to inspect the message contents and set the routing properties. A logic app is used to accept the message and call the function passing the received message as an argument. Once the function executes the business rules, it will return the routing properties in the response body. The routing information is then used to set the properties on the service bus API connector in the Logic App before publishing the message onto the bus.

image

Scenario

To demonstrate a typical use-case of this pattern we have 2 message types, Sales Orders (SO) and Purchase Orders (PO). For the SO I want to send the order to a priority queue if the total sales amount is over a particular value. And for a PO, it should be sent to a pre-approval queue if the order value is under a specified amount.

Here is an example of a SO message to be routed:

image

And an example of a PO being sent:

image

Solution

The real smarts of this solution is the function which will return a common JSON response message to set the values for the Topic name and the custom properties on the service bus connector. The fields of the response message are described below.

  • TopicName – the name of service bus topic to send the message to.
  • CBRFilter_1 – used by the subscription rule to filter the value on. Depending on your own requirements you may need more fields to filter more granular.
  • RuleSetVersion – used by the subscription rule to filter the value on. It’s a good idea to have this field as you may have several versions of this rule in play at any one time.

Let’s start with provisioning the service bus topics and subscriptions for the 2 types of messages. First create 2 topics called purchaseorder and salesorder.

 

image

Now add the following subscriptions and rules for each of the topics.

Topic Name Subscription Name Rule
purchaseorder Approved_V1.00 CBRFilter_1 = ‘Approved’ and RuleSetVersion = ‘1.00’
purchaseorder NotApproved_V1.00 CBRFilter_1 = ‘ApprovedNot’ and RuleSetVersion = ‘1.00’
salesorder HighPriority_V1.00 CBRFilter_1 = ‘PriorityHigh’ and RuleSetVersion = ‘1.00’
salesorder LowPriority_V1.00 CBRFilter_1 = ‘PriorityLow’ and RuleSetVersion = ‘1.00’

Next is the development of the Azure function. This is best developed in Visual Studio where you can include a Unit Test project to each of the rules. Add a new Azure Function project to your solution.

image

After the project has been created, right click on the function and click Add -> New Item. Choose Azure Function, give it a name and select the Http trigger option.

image

Below is code for the HTTP trigger function which includes the class definition for the RoutingProperties object. I am checking for specific elements SalesOrderNumber, PurchaseOrderNumber in the JSON message to determine the type of message and which determines what rule code block to execute. Each rule block code will first set the TopicName and RuleSetVersion properties.

public static class CBRRule

   {

       [FunctionName("CBRRule")]

       public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Function,  "post", Route = null)]HttpRequestMessage req, TraceWriter log)

       {

           log.Info("C# HTTP trigger function processed a request.");

           var routingProperties = new RoutingProperties();


           // Get request body

           JObject data = await req.Content.ReadAsAsync<JObject>();


           //Is this a  sales order message type           

           if (data != null && data["SalesOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "PriorityLow";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "SalesOrder";


               var lineItems = data["Lines"];

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);


               //if the total sales is greater than $1000 send the message to the high priority queue

               if (totalSaleAmount > 1000)

                   routingProperties.CBRFilter_1 = "PriorityHigh";

           }


           //Is this a purchase order message type           

           if (data != null && data["PurchaseOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "ApprovedNot";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "PurchaseOrder";


               var lineItems = data["Lines"];

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);


               //Approve PO if the total order price is less than $500

               if (totalSaleAmount < 500)

                   routingProperties.CBRFilter_1 = "Approved";

           }


           return req.CreateResponse(HttpStatusCode.OK, routingProperties);

       }


       /// <summary>

       /// Response message to set the custom routing properties of the service bus

       /// </summary>

       public class RoutingProperties

       {

           public string TopicName { get; set; }

           public string CBRFilter_1 { get; set; }

           public string RuleSetVersion { get; set; }


           public RoutingProperties()

           {

               this.CBRFilter_1 = "Unknown";

               this.RuleSetVersion = "Unknown";

               this.TopicName = "Unknown";

           }

       }


   }

The business rule for a SO aggregates all the line items and checks if the total amount is greater than 1000, if it is then set the property CBRFilter_1 to “PriorityHigh”.

The business rule for a PO also aggregates all the line items and checks if the total amount is less than 500, if it is then set the property CBRFilter to “Approved”.

With the following input message sent to the function:

clip_image001

The output of the function should look similar to this below

:clip_image001[6]

Now we need to publish the function from Visual Studio to your Azure resource group using the publishing wizard.

clip_image002

The last component of this solution is the Logic App which is triggered by an HTTP Request API and then calls the Azure function created above. The basic flow looks like this below.

clip_image004

The HTTP Request trigger has no request body JSON schema created. The trigger must accept any type of message.

clip_image006

Add an Azure Function after the trigger action and select the method called “CBRRule”

clip_image008

Set the Request Body to the trigger body content and the Method to “POST”

clip_image010

Next add a Service Bus action and set the properties as shown. Both the Queue/Topic name and Properties are set from the function response message

.clip_image012

Here is the code view of the Send Message action showing how the properties are set.

clip_image014

Testing

Using PostMan we can send sample messages to the Logic App and then see them sitting on the service bus waiting to be processed by some other method.

At the moment the service bus should have no messages waiting to be processed as shown

.clip_image015

Using PostMan to send the following PO, we should see this message end up in the purchaseorder/NotApproved subscription.

clip_image016

All going well, the message will arrive at the correct subscription waiting to be processed as shown below using Service Bus Explorer.

clip_image018

Sending the following SO will also route the message to the correct subscriber.

clip_image019

clip_image021

Conclusion

CBR can be easily achievable using an Azure Function to execute your business rules on a message to set up the routing properties. Taking this a step further, I would abstract the business rules for each message type in its own class for manageability.

Also it is advisable to setup a Unit Test project for each of the classes holding the business rules to ensure 100% code testing coverage.

Enjoy…

Itinerary based message routing using Azure Logic Apps and Service Bus Actions

This is another integration pattern used quite extensively in the integration world. It is used when a message is required to be routed to several endpoints in a particular order using some form of routing list. Depending on your business requirements, the message being routed may be enriched or replaced before sending it to the next service endpoint in the list.

image

This is probably one of the easiest integration patterns to implement using Logic Apps and Service Bus Actions. There are probably other methods to implement this pattern, but I wanted to abstract the routing logic away from the Logic App itself and leave it to focus on the business process and not worry about setting up the next routing endpoint.

By using the Actions feature of the service bus I can defined the routing order of the next service endpoint, or in this case the next service bus subscription by setting properties to match the next subscriber filter condition. A service bus action is executed after the filter condition has been met and is used to set the value of either a system or custom property before the message is consumed from the service bus. It is set in a similar way to the T-SQL “Set” command where you set a field to a value.

For this scenario I have a sales order message that is required to be passed to several logic apps in a particular sequence. There is a sales order header logic app, sales order line logic app, sale order payments logic app and finally a sales order completion logic app. The sales order message will look similar to this below.

image

Provisioning Azure Service Bus

The method involves creating a service bus topic called “salesorder” with the following subscriptions.

servicebus

The filter and action rules are setup as in the following table. There are 2 custom properties “MsgType” and “ItineraryLeg” used for the subscriptions. The MsgType is just used to group the messages and the ItineraryLeg  defines the order of the subscribers.

The action is used to specify the next subscriber to send the message to after the current process by setting the ItineraryLeg property.

Seq Subscription Name Rule Filter Rule Action
1 SOHeader MsgType=’salesorder’ and ItineraryLeg = ” set  ItineraryLeg = ‘solines’
2 SOLines MsgType=’salesorder’ and ItineraryLeg = ‘solines’ set  ItineraryLeg = ‘sopayments’
3 SOPayments MsgType=’salesorder’ and ItineraryLeg = ‘sopayments’ set  ItineraryLeg = ‘socompletion’
4 SOCompletion MsgType=’salesorder’ and ItineraryLeg = ‘socompletion’

A sample of the SOHeader rules are shown below:

image

Creating the Logic Apps

Next we will provision the 4 logic apps to create the SO header, process the line items, apply the payments and complete the sales order.

image

The logic apps (SalesOrderHeaderProcessor, SalesOrderLinesProcessor, SalesOrderPaymentsProcessor) are constructed in a similar manner as shown below and the only difference is the topic subscription names for the service bus trigger action.  The Delay action is where you would implement your business process logic on the received message.

image

Expanding the service bus trigger shape has the following properties set. The other logic apps will have different Topic subscription names set.

image

The real smarts of these workflows are the “Republish message” service bus action shape. You will need to add this shape to every logic app that is involved in processing the message. Here we set the ItineraryLog and MsgType property values to the same values in the Trigger service bus action shape. Also I am adding another custom  property called “Tracking”. This is used to track the name of logic app the message was routed to in a chorological order and can be useful for debugging. The content property is just set the received message from the service bus trigger. Depending on your business requirements, you may be required to publish a totally different message.

image

Below is the code behind for the Republish message shape showing how to setup the properties section.

“body”: {
“ContentData”: “@{triggerBody()?[‘ContentData’]}”,
“Properties”: {
“ItineraryLeg”: “@triggerBody()?[‘Properties’][‘ItineraryLeg’]”,
“MsgType”: “@triggerBody()?[‘Properties’][‘MsgType’]”,
“Tracking”: “@concat(coalesce(triggerBody()?[‘Properties’]?[‘Tracking’],’Begin’),’,’,workflow()?[‘name’])”
}
},

The other logic app SalesOrderCompletionProcessor simply pulls the message from the last subscription and sends the Tracking property value to RequestBin.

image

Messages are placed onto the service bus using the MessagePublisher logic app which accepts a json message and initialises the service bus custom properties “ItineraryLeg” and “MsgType”. Here the ItineraryLeg is set to an empty string and the MsgType being set to “salesorder”. The body of the HTTP request trigger is used as the message content for the Send message action shape.

image

Testing

Using PostMan we can POST a message to the MessagePublisher logic app. If you set the delays long enough in the logic apps, you can see the message being published and consumed by the subscribers in the correct order as defined in the service bus actions.

Here is the result of the message being posted to the MessagePublisher logic app and the output from the SalesOrderCompletionProcessor  logic app which sends it to RequestBin.

Begin,SalesOrderHeaderProcessor,SalesOrderLinesProcessor,SalesOrderPaymentsProcessor,SalesOrderCompletionProcessor

Now if I wanted to change the order of the message processing, say I wanted to process the payments before the sales order lines. Here I would simply update the actions on the service bus subscriptions as follows:

Seq Subscription Name Rule Filter Rule Action
1 SOHeader MsgType=’salesorder’ and ItineraryLeg = ” set  ItineraryLeg = ‘sopayments’
3 SOPaymentsSOLines MsgType=’salesorder’ and ItineraryLeg = ‘solines’ set  ItineraryLeg = ‘socompletion’
2 SOPayments MsgType=’salesorder’ and ItineraryLeg = ‘sopayments’ set  ItineraryLeg = ‘solines’
4 SOCompletion MsgType=’salesorder’ and ItineraryLeg = ‘socompletion’

Now the output of the SalesOrderCompletionProcessor looks like this below. You can clearly see the payments logic app being executed after the sales header process.

Begin,SalesOrderHeaderProcessor,SalesOrderPaymentsProcessor,SalesOrderLinesProcessor,SalesOrderCompletionProcessor

In Summary

Use service bus actions to manage the itinerary list to abstract the routing logic away from the normal business process of the logic apps.

Typically you would use Azure Resource templates to setup the service bus subscriptions and rules under TFS control.

Keep a watch out for my next article on Content Based Routing using Azure Logic Apps and Service Bus.

Enjoy…

Integration Scatter-Gather pattern using Azure Logic Apps and Service Bus

Recently I have taken a role as the Integration Architect and tech lead for a project to integrate messaging between MS Dynamics 365  AX/CRM and 3rd party systems. This was a huge solution that took over 10 months to design and develop.  It consisted of developing over 60 Logic Apps.

With a large integration project like this were multiple entities are required to be updated across multiple systems in a transactional manner, I required a process to correlate and aggregate all the responses into one composite message before proceeding onto the next task in the workflow. The scatter-gatherer pattern was chosen as a  good candidate for this type of scenario.

The next problem was how to implement this pattern using Logic Apps. Using what I had learnt from my previous blog on enforcing ordered delivery of messages  https://connectedcircuits.blog//wp-content/uploads/2017/08/26/enforcing-ordered-delivery-using-azure-logic-apps-and-service-bus/ I was able to design the solution below.

image

It is based on a single Logic App to publish the message onto a service bus topic and to retrieve all the responses from another service bus queue. I also wanted the solution to be flexible to decide which services the message should be scattered to by passing in a routing list which determines who to send the request message to.

The main component of the solution is the (Scatter/Gather)  logic app which sends the request message to the the service bus topic after setting subscription property values and a unique batch Id. In this example I have 3 sub-processor logic apps which will process the message in some form or another and then write a response message onto the service bus queue. Because sessions have been enabled on this queue and I am setting the session Id with the same BatchId that was sent with the message, we are able to correlate all the responses from to sub-processors within the same logic app instance that initially published the message onto the service bus topic.

After the Scatter/Gather logic app sends the message onto the service bus topic, it will go into a loop until all the response messages from the Process logic apps are received. It will then return the aggregated responses from the sub-process logic apps.

The whole solution can be broken down into 3 sections, setting up the Azure Service Bus, developing the Scatter-Gatherer Logic App workflow and the Sub-process Logic Apps subscribing to the topic.

Setting up the service bus

Lets start by setting up the Azure Service Bus. A Topic is provisioned to publish the request message with rules created for each subscriber. In this scenario I will have 3 subscriptions with default rules set to a property called MsgProcessor = ‘1’, MsgProcessor = ‘2’ and MsgProcessor = ‘3’ on each subscription respectively.

image

Next a Service Bus Queue is provisioned to receive all the response messages from the subscribers. Sessions must be enabled on this queue to allow a single consumer to process all messages with the same sessionId which in our case will be the Scatter/Gather Logic App.

image

Developing the Scatter/Gather Logic App

The next part to develop is the scatter-gather Logic App which is the real smarts of the solution. Below is the full workflow with all the actions collapsed. We will go through and expand each of the shapes.

image

This  workflow is triggered by a HTTP request, but can also be triggered by some other connectors.

image

Next is a series of variables required by the workflow listed below:

  • BatchId – This is used to group all the messages together and is used as the SessionId and is initialised with a random Guid.
  • CompositeMsg – This holds the aggregated response messages from the Processor logic apps.
  • IntemediateMsg – Used in the loop the temporary store the aggregated message.
  • ScatterCount – The number of sub-processors the request body was published to.
  • ResponseCount – Holds the current number of responses received from the Processing logic apps.

image

Once the variables have been setup, a parallel branch is used to scatter the HTTP Request body contents to the subscribing systems. For this demo I have an option to send the request body content to 3 different sub-processes.

image

To determine which sub-process to send the request body to, I pass a list of the process names to send the request body to as one of the HTTP Request Header properties as shown here:

“X-ScatterList”:”‘Process1′,’Process2’,’Process3′”

The filters on the parallel conditions are setup as below, where is checks if the list in the HTTP header contains the process name.

image

Expanding one of the condition tasks shows an action to send the request onto the Service Bus  Topic and to increment the ScatterCount variable by one.

image

Below is the “Send message processor 1”  service bus connector expanded to show the properties. The two important pieces of information are custom properties BatchId and MsgProcessor. The BatchId will be used to set the sessionId latter and the MsgProcessor is used by the subscription filter on the  service bus Topic.

image

The next step is to increment to Scatter count. This is how we keep track of the number sub-processors that the message was scattered to.

image

Once the request body content has been published to the service bus Topic, we cycle in a loop until we have received all the response messages from the service bus queue or the Until loop times out.

image

The first step in the loop is to get one new message at a time from the queue using the BatchId variable as the session Id. This is to ensure we only get messages off the queue matching this same Id.

image

Then we check if a message was found using the Condition action with this filter: @equals(length(body(‘Get_messages_from_a_queue_(peek-lock)’)), 1)

 

image

If a message was found on the service bus queue then the “If true” branch is executed and if no message was found the “If false” branch is executed setting a delay of a few seconds before iterating again.

Below are the actions inside of the “If true” branch.

image

First we check if this is the first message received off the queue using the following filter: @equals(variables(‘ResponseCount’), 0)

 

image

If it is, then we just set the variable “CompositeMsg” to the service bus queue content data.

image

As we are using the Service Bus Connector that is capable of returning multiple messages, we need to use an indexer to get to the first message. Below is the code view of the above action. Note when the sub-processor logic app puts a message onto the service bus, we base64 encode it, therefore we need to decode it back to string value.

image

Now if this was not the first message received of the service bus, we need to append it the other messages received. This is done with the following actions in the “If false” branch shown below.

image

First we need to copy the contents of the CompositeMsg variable into the “IntermediateMsg” variable. Then concatenate the message received of the service bus and the contents of the “IntemediateMsg” variable,  and then add them to the “CompositeMsg” variable. The syntax of the value for the Append intermediate to composite is shown below:

“value”: “@{concat(variables(‘IntermediateMsg’),’,’,base64ToString(body(‘Get_messages_from_a_queue_(peek-lock)’)?[0][‘ContentData’]))}”

After the message from the queue has been added or appended to the CompositeMsg variable, it is completed to remove it from the queue and the ResponseCount variable is incremented.

image

Once all the response messages have been received from the sub-process logic apps or the “Until loop” times out, we need to close the service bus queue session using the BatchId and return the composite response message.

image

Here I am just sending the composite response message to a RequestBin endpoint. Ideally you would place this onto another service bus and perhaps with the initial HTTP request body to tie everything together.

Sub-Processor Logic Apps

The last part of this solution is the sub-processor logic apps which subscribe to the service bus Topic and processes the message in some form or another before returning a response message onto the service bus queue. Using a separate logic app the manage the post processing of the initial request message provides scalability and separation from the  messaging orchestration components.

image

The properties of the trigger service bus connector is shown below which has a typical setup.

image

Next the delay task is there just to simulate processing of the message received from the topic. This where you would normally call another API endpoint or process the message in some other fashion. Remember the maximum lock duration is 5 minutes and if your process will take longer than this, you will need to renew the lock.

image

After the message has been processed, you will need to compose a response message to put back onto the service bus queue. The schema of the response message should be generic across all the sub-process logic apps to make it easier to parse latter on. For the demo a relatively simple schema will be used consisting of the received BatchId and the sub-processor logic app name.

image

Once the response message has been composed, it is placed onto the service bus queue ensuring the session Id is set to the BatchId that was sent with the message. Remember this queue has been provisioned with “Sessions Enabled”

image

The code view for the connector looks like this:

image

The last task of the workflow is to complete the topic subscription as shown here:

image

Testing the solution

We can use Postman to send the following request to the scatter-gather logic app. Also I am setting the scatter list header property to publish the message to all 3 sub-processor logic apps.

POST /workflows/…/triggers/manual/paths/invoke?api-version=2016-10-01&amp;sp=%2Ftriggers%2FmanualHTTP/1.1
Host: prod-11.australiasoutheast.logic.azure.com:443
Content-Type: application/json
X-ScatterList: ‘Process1′,’Process2′,’Process3’
Cache-Control: no-cache
Postman-Token: 644c5cdb-fd60-4903-b3ec-2d3d6febfe7e

{
“OrderId” : “12”,
“Message” : “Hello1”
}

Using RequestBin we can see the aggregated response messages from all 3 sub-processors.

[{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process1"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process2"},{"BatchId":"5e1df244-2153-4529-aaba-12a82aa788fd","ResponseMsg":"Process3"}]

What’s next

Further enhancements can be made by adding flexibility for the timeout of the Until loop as some responses may take hours/days  to send a response back.  When waiting for very long periods for a response to return, you may need to extend the delay period in the “Until loop” to avoid reaching the 5000 iteration limit.

Keep a watch for my next blog where I will show you to implement the Itinerary based routing integration pattern using Azure Logic Apps and  Service Bus.

Enjoy…

Using an Azure APIM Policy to call an OAuth endpoint and cache the token

Recently I have been involved as the Integration Architect and tech lead for a project to integrate between MS Dynamics 356  AX/CRM and 3rd party systems.

One of the challenges was to provide single endpoints for internal systems calling web services outside of the organisation that utilised OAuth2.0.  This is where a consumer normally has to call an OAuth token endpoint first and then append the token to the request before calling the actual web service.

By using APIM as a proxy and policies in APIM, I managed to achieve the goal of providing a single URL endpoint for the consumer. The policy initially gets the token from the authorisation endpoint, caches the token and then passes the token to the web service being called. This process is known as fragment caching, where parts of the responses are cached for subsequent requests. Also by caching the bearer part of the token improved the performance significantly for subsequent calls.

Below were the steps I used to add a web API to create transfers orders in Dynamics AX and a policy using the Azure APIM management portal.

1. First create the properties for the oAuth clientId and client secret. A good tip is to prefix the property name and set the Tags with the name of the API you are calling. This helps latter on when you have multiple properties to manage.

 

image

The Properties page should look like something like below after adding your custom properties:

 

image

2. Next, go to the API’s page to either add or import the API’s definition you are calling. Here I am just going to add the API manually by first entering the name, API endpoint address and the public facing URL suffix. Remember to add it to an existing product.

image

3. After the API has been created we will then add the operations. For this demo I will just add one operation to create a transfer order.

image

4. Now that the API we wish to call has been added to the APIM service we can finally pay our attention to creating a policy which does the background task of obtaining the OAuth token. Here I will setup a policy on the CreateTransferOrder operation of the Web API. Once selected, click the ADD POLICY button to begin creating the policy from the template.

image

5. The “<inbound>” policy section is applied to the incoming request before forwarding the request to the backend service. This is where we will check the cache for the authorisation token and if no hit, then we call the OAuth token endpoint to obtain a token.

   1: <inbound>

   2:     <cache-lookup-value key="token-{{Dev-Web1-ClientId}}" variable-name="bearerToken" />

   3:     <choose>

   4:         <when condition="@(!context.Variables.ContainsKey("bearerToken"))">

   5:             <send-request mode="new" response-variable-name="oauthResponse" timeout="20" ignore-error="false">

   6:                 <set-url>https://login.microsoftonline.com/xxxxxx.onmicrosoft.com/oauth2/token</set-url>

   7:                 <set-method>POST</set-method>

   8:                 <set-header name="Content-Type" exists-action="override">

   9:                     <value>application/x-www-form-urlencoded</value>

  10:                     <!-- for multiple headers with the same name add additional value elements -->

  11:                 </set-header>

  12:                 <set-body>@("grant_type=client_credentials&client_id={{Dev-Web1-ClientId}}&client_secret={{Dev-Web1-ClientSecret}}&resource=https%3A%2F%2Fxxxx.xxxxx.ax.dynamics.com")</set-body>

  13:             </send-request>

  14:             <set-variable name="accessToken" value="@((string)((IResponse)context.Variables["oauthResponse"]).Body.As<JObject>()["access_token"])" />

  15:             <!-- Store result in cache -->

  16:             <cache-store-value key="token-{{Dev-Web1-ClientId}}" value="@((string)context.Variables["AccessToken"])" duration="3600" />

  17:         </when>

  18:     </choose>

  19: </inbound>

Lets go through the key points of this definition below.

  • Line 2: – Assigns the value in cache to the context variable called “bearerToken”. On first entry, the cache value will be null and the variable will not be created. Note I am adding the ClientId as part of the cache key name to keep it unique. Property values are accessed by surrounding the key name with double braces. eg  {{myPropertyName}}
  • Line 4: – Checks if the context variable collection contains a key called “bearerToken” and if not found executes the code between the opening and closing “<when>” XML elements.
  • Line 5: – Initiates the request to the OAuth endpoint with a response timeout of 20 seconds. This will put the response message into the variable called “oauthResponse”
  • Line 6: – Is where you set the URL to send the request to. In this scenario I am using the Azure AD OAuth token endpoint below as our STS service: https://login.microsoftonline.com/xxxxxx.onmicrosoft.com/oauth2/token
  • Line 12: – This is where you define the body payload for the request and this is defined  as a typical client credentials grant type payload. Here I am getting the values for the client Id and secret from the user definable properties set in the Properties page. The resource parameter is just hardcoded to the Urlencoded resource URL of the API but can also parameterised.

<set-body>@(“grant_type=client_credentials&client_id={{Dev-Web1-ClientId}}&client_secret={{Dev-Web1-ClientSecret}}&resource=https%3A%2F%2Fxxxx.xxxxx.ax.dynamics.com”)</set-body>

  • Line 14: – Casts the response as a JSON object to allow the retrieval of the “access_token” value using an indexer and assigns it to the context variable “accessToken”.
  • Line 16: – Is where we add the contents of the variable “accessToken” into cache for a period of 3600 seconds.

6. Now that the “<inbound>” section has been completed, we can look at the “<backend>” section of the policy. This is where the policy forwards your request to the backend web service as defined in the API configuration page.

   1: <backend>

   2:     <send-request mode="copy" response-variable-name="transferWSResponse" timeout="20" ignore-error="false">

   3:         <set-method>POST</set-method>

   4:         <set-header name="Authorization" exists-action="override">

   5:             <value>@("Bearer " + (string)context.Variables["bearerToken"])</value>

   6:         </set-header>

   7:         <set-header name="Ocp-Apim-Subscription-Key" exists-action="delete" />

   8:         <set-header name="Content-Type" exists-action="override">

   9:             <value>application/json</value>

  10:         </set-header>

  11:     </send-request>

  12: </backend>

Lets go through this section as before.

  • Line 2: – Creates the request to the backend web service. Here we are placing the response from the web service into the variable called “transferWSResponse”.
  • Line 4: – Is the creating the “Authorization” header to be sent with the request.
  • Line 5: – Adds the bearer token value from the context variable “bearerToken” the authorisation header.
  • Line 7: – Removes the APIM subscription from being forwarded to the backend web service.

7. Now we need to return the response message from the backend web service to the caller. This is done in the “<outbound>” policy section. Here we just simply return the value of the variable “transferWSResponse” back to the caller.

   1: <outbound>

   2:     <return-response response-variable-name="transferWSResponse">

   3:     </return-response>

   4:     <base />

   5: </outbound>

That’s the whole policy defined which will call the OAuth endpoint the get the token and cache it for subsequent calls.

Using the tracing feature in APIM, when the first request is made, the cache will be null and the variable will not be set as shown below.

image

The next trace shows any subsequent requests will hit the cache and set the context variable to the bearer token until it expires.

image

One important note about retrieving data from cache is its an out-of-process call and can add tens of milliseconds onto a request.

Working with the APIM polices can make a huge impact on your API development efforts as logging and access management can be off-loaded using the polices available in APIM. A full list of expressions used in polices can be found here: https://docs.microsoft.com/en-us/azure/api-management/api-management-policy-expressions

Enjoy…

APIM backup & restore using Azure Automation Services

In this post I will describe the steps of using PowerShell scripts to backup APIM and using the Automation service to schedule the backup every month. The restore function also allows you to restore  APIM into another resource group or APIM service. For the project I am working on now, this is what I am doing to move the configuration settings between each environment.

First you need to create a blob store which ideally should be Read-Access geo-redundant storage (RA-GRS). This is where the APIM backups will be stored. After the blob store has been provisioned, create a container for the backup file as shown below.

image

Once the container is created, take note of the Storage account name and Access key for the blob store. These values will be used in the PowerShell script later.

image

Next provision an Azure Automation service and ensure the Create Azure Run As account is set to “yes”.

image

Once it has been provisioned, ensure the modules have been updated by clicking on the “Modules” link on the left hand navigation panel and then “Update Azure Modules”. Note this does take a while to complete.

image

After the update has been completed, click the “Browse gallery” link and in search textbox type “apim”. Once found,  double click on the row to open the import blade.

image

Now click the Import icon to import the cmdlet. This can take several minutes to import.

image

After the PowerShell module has been imported, create a new Runbook and ensure the type has been set to “PowerShell”. Then click the Create button at the bottom of the page.

image

This will open up a new blade where we can add and test the PowerShell script to backup the APIM settings.

image

Now add the following script below into the text editor and remember to update the variables with your environment settings. Once you have added the script, click the “Save” button and then the “Test pane” button to ensure the script runs successfully.

   1: Disable-AzureDataCollection

   2: Write-Output "Starting backup of APIM..."

   3:

   4: # sign in non-interactively using the service principal

   5: $connectionName = "AzureRunAsConnection";

   6: $storageAccountName = "apimstorebackup";

   7: $storageAccountKey = "<storage account key>";

   8: $resourceGroupName = "APIMService";

   9: $apimName = "apimmanager";

  10: $targetContainerName = "backup";

  11: $targetBlobName "AzureAPIM.apimbackup"

  12: try

  13: {

  14:     # Get the connection "AzureRunAsConnection "

  15:     $servicePrincipalConnection=Get-AutomationConnection -Name $connectionName

  16:

  17:     Write-Output "Logging in to Azure..."

  18:     Add-AzureRmAccount `

  19:         -ServicePrincipal `

  20:         -TenantId $servicePrincipalConnection.TenantId `

  21:         -ApplicationId $servicePrincipalConnection.ApplicationId `

  22:         -CertificateThumbprint $servicePrincipalConnection.CertificateThumbprint

  23: }

  24: catch {

  25:     if (!$servicePrincipalConnection)

  26:     {

  27:         $ErrorMessage = "Connection $connectionName not found."

  28:         throw $ErrorMessage

  29:     } else{

  30:         Write-Error -Message $_.Exception

  31:         throw $_.Exception

  32:     }

  33: }

  34:

  35: $sourceContext = (New-AzureStorageContext -StorageAccountName $storageAccountName  -StorageAccountKey $storageAccountKey);

  36:

  37: Write-Output "Starting backup of APIM instance";

  38: Backup-AzureRmApiManagement `

  39:             -ResourceGroupName $resourceGroupName `

  40:             -Name $apimName `

  41:             -StorageContext $sourceContext `

  42:             -TargetContainerName $targetContainerName `

  43:             -TargetBlobName $targetBlobName;

  44:

  45: Write-Output "Backup of APIM completed.";

Here are the description of the variables:

  • $connectionName = “AzureRunAsConnection” – this is the default connection account that was created when the Automation service was provisioned.
  • $storageAccountName = “apimstorebackup” – name of the blob storage account that was created in the first step.
  • $storageAccountKey = “<storage account key>” – the blob store access key obtained from the portal.
  • $resourceGroupName = “APIMService” –  name of the Azure resource group.
  • $apimName = “apimmanager” – the name of the APIM service.
  • $targetContainerName = “backup”  – name of the backup container in blob store.
  • $tartgetBlobName = “AzureAPIM.apimbackup” – file name of the backup file.  This can be omitted and will create a default filename {apimName}-{yyyy-MM-dd-HH-mm}.apimbackup

Once you have confirmed the script executes without any errors, you can now set up a recurring schedule by creating a new schedule in the Automation service blade under Shared Resources.

imageimage

Next you need to link your Runbook to this schedule by double clicking on your runbook name and then the schedule button on the top menu. This will open another blade where you can view all your schedules that you can select from.

image

That is the automated back process completed now. Below is the PowerShell script required to restored the backup file.

#get the storgae context
$sourceContext = (New-AzureStorageContext `
-StorageAccountName “<blob storage name>” `
-StorageAccountKey “<blob storage account key from Azure portal>”)

#restore the backup
Restore-AzureRmApiManagement -ResourceGroupName “<name of resource group>” `
-Name “<name of the APIM service>” `
-StorageContext $sourceContext  `
-SourceContainerName “<blob storage container name>” `
-SourceBlobName “<backup file name>”

More details on these scripts can be found here: https://docs.microsoft.com/en-us/powershell/module/azurerm.apimanagement/restore-azurermapimanagement?view=azurermps-4.3.1

Enjoy.

Enforcing Ordered Delivery using Azure Logic Apps and Service Bus

When consuming messages from an Azure service bus the order may not be guaranteed due to the brokered based messaging scheme where multiple consumers can consume messages from the bus. Sure you can force the Logic App to execute as a single instance but then you sacrifice performance and scalability. You can also use ReceiveAndDelete but then you loose the transactional nature of the bus. Ultimately to ensure a message is consumed in the correct order using the transactional nature of the bus you would add a sequence number to each message and use this to enforce the ordering.

To achieve ordered delivery using Logic Apps, you would need to ensure all related messages are consumed by the same Logic App instance and for this we use the session Id property on the service bus. Below is the full workflow process to force ordered delivery using Logic Apps and session Id’s on the service bus subscription.

image

This scenario is based on a financial institution which requires all monetary transfers to be processed in an ordered fashion.  The key is choosing a suitable session identifier and with this in mind, the account number was the most suitable candidate as we want a single consumer to process all the transactions for a particular account number.

Here we have created a subscription for a topic called AccountTransfers. Note the Enabled sessions is checked.

image

Once the service bus has been configured, we can now dissect the workflow to see how we can achieve ordered delivery.

The workflow is initiated by a pooling Service Bus Connector. The properties of this connector are shown below. The key point here is to set the Session id to “Next Available”. This forces the Logic App to create a new instance for each unique session id value found on the service bus.

image

The next action “ProcessSBMessage” is used to call another logic app which does the processing of the message found on the bus. Here I am just passing the raw base64 encoded message from the Service Bus Trigger action. Using the pattern “separation of concerns” moves the business logic  away from the process of ensuring ordered delivery.

image

Once the message has been sent to the chained Logic App and a response has been returned, we can complete the message from bus with the following action.

image

Next we go into a loop until the exit condition has been satisfied. I am going to use a counter that is incremented if no messages are found on the service bus. If no more messages are found on the service bus after 30 seconds, the loop will exit.

image

The loop inside starts with another service bus connector trigger which gets the messages from the topic subscription. Here we only want to retrieve one message at a time from the service bus using a peek-lock trigger and using the Session Id from the initial service bus trigger “When a message is received in a topic subscription”.  We then check if a message is found in the output body using the expression “@not(equals(length(body(‘Get_messages_from_a_topic_subscription_(peek-lock)’)), 0))

image

If a message is found, the “If True” branch is executed which again calls the same Logic App as before to process the message. Note the indexer to get to the context data as the service bus connector trigger above returns a collection.

image

Once a successful response is received from the ProcessSBMessage Logic App, the message is completed and the LoopCounter variable is reset to zero. Note the lock token is from the service bus connector trigger within the loop and the Session Id is from the initial service bus connector which started the workflow.

image

Below is the code view for setting the lockToken and SessionId of the “Complete the message” action inside the loop.  Take note of the indexer “[0]” before the LockToken element.

image

If no messages are found on the service bus, the False branch is then executed. This simply has a delay action as not to pool too quickly and increments the LoopCounter.

image

The last step is to close the session when the Until loop exists using the Session Id from the initial service bus connector trigger which started the workflow.

image

Now you are ready to the send messages into the service bus. You should see a Logic App spin up for each unique session Id. Remember to set the session Id property on the service bus to some value before sending the message.

Enjoy…

Error updating AX entities using the Dynamics 365 for Operations connector in Logic Apps

When trying to update an entity via the Dynamics 365 connector you may encounter the following error.

{ “status”: 400, “message”: “Only 1 of 2 keys provided for lookup, provide keys for SalesOrderNumber,dataAreaId.”, “source”: “127.0.0.1” }

One would think passing the ItemInternalId guid value which is the primary key for the entity as the Object Id property would be adequate to find the record to update. Seems not by the error being thrown back.

image

 

Apparently you need to supply the 2 keys,  SalesOrderNumber and dataAreaId  which was mentioned in the error response message as the Object Id as shown below. Note the comma between the sales order number (Sales Order) and the dataAreaId (Company)

image

So the item path for the entity to update looking from the code view would look like this:

image

Enjoy…

Fixing syntax errors when porting Logic Apps from the web portal to Visual Studio

After initially designing your Logic App in the Azure Portal, you may wish to port it to Visual Studio 2017 to manage the template under a source control repository and to further develop it from Visual Studio.

After porting the code, you may encounter some of the following errors when trying to save or deploy your Logic App from Visual Studio.

Error: …the string character ‘@’ at position ‘0’ is not expected.

This is fairly easy to identity as Visual Studio highlights the code which it thinks the syntax is incorrect as shown below. Remember this code was ported over from the Azure Portal where it parsed without any issues.

image

It is complaining about the unrecognised function . To get around this issue we need to use the “concat” string function to treat this as a string literal.

The original syntax is here: “ProductCodes”: “[@{outputs(‘Compose_Product_Detail’)}]”

By surrounding the whole value “[@{outputs(‘Compose_Product_Detail’)}]” with concat as shown below resolves this error.

“@concat(‘[‘, outputs(‘Compose_Product_Detail’)’, ‘]’)”

Breaking the designer when parameterising the subscription Id when calling a function

You may have a call-back function defined in your Logic App which has the subscription guid embedded in the code (blanked out for security reasons) similar to below:

image

When trying to parameterise the subscription key by simply adding the function “subscription().subscriptionId” in place of the guid value as shown below:

“function”: {
“id”: “/subscriptions/subscription().subscriptionId/resourceGroup…

You will get the following error when trying to same the changes.

image

To overcome this issue, wrap the value in a concat function as shown below. Note the url has been shorted with “…” to make it readable.

“function”: {
“id”: “[concat(‘/subscriptions/’,subscription().subscriptionId,’/resourceGroups/…/providers/Microsoft.Web/sites/…/functions/Cmn_GuidMapNullValue’)]”

You can also apply this same technique to parameterise other information in the Id key such as the website location.

Enjoy…