Extracting Claims from an OAuth Token in an Azure Function

This is a follow up from my previous blog on adding custom application claims to an App registered in AAD (https://connectedcircuits.blog/2020/08/13/setting-up-custom-claims-for-an-api-registered-in-azure-ad/). In this article I will be developing an Azure Function that accepts an OAuth token which has custom claims. The function will validate the token and return all the claims found in the bearer token as the response message.

Instead of using the built-in Azure AD authentication and authorization support in Azure Functions, I will be using the NuGet packages Microsoft.IdentityModel.Protocols and System.IdentityModel.Tokens.Jwt to validate the JWT token. This will allow decoding of bearer tokens from other authorisation providers.

These packages uses the JSON Web Key Set (JWKS) endpoint from the authorisation server to obtain the public key. The key is then used to validate the token to ensure it has not been tampered with. The main advantage of this option is you don’t need to worry about storing the issuer’s public key and remembering to update the certs before they expire.

 

Function Code

The full code for this solution can found on my github repository https://github.com/connectedcircuits/azOAuthClaimsFunct.

Below is a function which returns the list of signing keys from the jwks_uri endpoint. Ideally the response should be cached, as downloading the keys from the endpoint can take some time.

// Get the public keys from the jwks endpoint      
private static async Task<ICollection<SecurityKey>> GetSecurityKeysAsync(string idpEndpoint )
{
var openIdConfigurationEndpoint = $"{idpEndpoint}.well-known/openid-configuration";
var configurationManager = new ConfigurationManager<OpenIdConnectConfiguration>(openIdConfigurationEndpoint, new OpenIdConnectConfigurationRetriever());
var openIdConfig = await configurationManager.GetConfigurationAsync(CancellationToken.None);
return openIdConfig.SigningKeys;
}

The next part of the code is to configure the TokenValidationParameters properties with the authorisation server address, the audiences and the signing keys obtained from the GetSecurityKeysAsync function mentioned above.

TokenValidationParameters validationParameters = new TokenValidationParameters
{
ValidIssuer = issuer,
ValidAudiences = new[] { audiences },
IssuerSigningKeys = keys
};

Next is to validate the token and acquire the claims found in the token which is assigned to the Claims Principal object.

//Grab the claims from the token.
JwtSecurityTokenHandler handler = new JwtSecurityTokenHandler();
SecurityToken validatedToken;
ClaimsPrincipal principal;
try
{
principal = handler.ValidateToken(token, validationParameters, out validatedToken);
}
catch(SecurityTokenExpiredException ex)
{
log.LogError(ex.Message);
req.HttpContext.Response.Headers.Add("X-Error-Message", $"Token expired at {ex.Expires}");
return new UnauthorizedResult();
}
catch(Exception ex)
{
log.LogError(ex.Message);
return new UnauthorizedResult();
}

Once you have the principle object instantiated, you can use the IsInRole(“<role_name>”) method to check if the token contains the role. This method will return a boolean true value if the role is found.

 

Runtime results

This is the token request for an  app registered in Azure AD that has the crm.read and crm.write claims assigned.

image

This is the response from the Azure Function using the bearer token attained from AAD. Here you can see the two custom application claims crm.read and crm.write listed amongst the default claims.

 

image 

 

 

This is another example of using Auth0 (https://auth0.com/) as an alternative authorisation server. The API was registered with full crud permissions added whilst the client was only given access to read & update roles. Below is the request sent to Auth0 for a token.

image

 

This is the response from calling the Azure Function using the Bearer token from Auth0 with the two custom claims crm:read and crm:update returned with the other default claims.

 

image

 

Conclusion

As you can see, you can use any authorisation server that supports a jwks_uri endpoint to acquire the public signing keys and when the generated token uses RS256 as the algorithm for signing and verifying the token signatures.

Enjoy…

Ensuring Ordered Delivery of Messages using Azure Functions

Typically when using Azure Functions to consume messages from a Service Bus (SB), the ordering is not guaranteed although the SB is First-In-First-Out (FIFO). This is due to competing consumers, where multiple instances of the function are competing for messages of the service bus.

An example where out of ordering can happen is when a function instance takes longer to process a message than other instances therefore affecting the process ordering. This is represented in the sequence diagram below, where the function instance 1 took longer to update the same record in a database than instance 2.

image

One option to enforce ordered delivery is to configure the Azure Function to spin up only one instance. The only problem with this solution is it won’t scale very well. A more scalable option is to use sessions.This allows you to have multiple instances of a function executing giving you a higher message throughput.

To enforce message ordering several properties must be set. The property Requires Session must be enabled on the SB queues and topic subscriptions. Messages sent onto the SB must set the context property SessionId to unique value from other non related messages. Some examples of a session Id could be the account number, customer number, batch Id, etc. Azure Functions need to have the IsSessionsEnabled property set to enabled on the SB input binding.

This feature for Azure Functions to use SB sessions only came GA as of mid 2019. Enabling sessions on the Azure Function places a lock on all messages that have the same session Id causing the locked messages to be consumed by that one function instance that placed the lock.

Typical Scenario

A warehouse needs to track the progress of an order from when its first received to when it gets dispatched. Throughout each stage (Ordered, Picked, Packaged, Dispatched) of the ordering process, the status of the order must be updated. This involves placing a new message onto the service bus every time the order status needs to get updated. An Azure function will then pull the messages from the service bus and update the order status in a database where the customer can view the current state of their order.

To simulate the warehouse tracking system, a console app will be used to create messages for each status change (Ordered, Picked, Packaged, Dispatched), for several hundred orders. The session Id of each status message will be set to the order number. The app will then send the messages to a SB Topic where it will have two subscriptions, one with sessions enabled and the other disabled. This is so we can compare the ordering of messages being received with and without sessions enabled.

Order message generater
  1. class Program
  2.   {
  3.       private static string connectionString = ConfigurationManager.AppSettings[“ServiceBusConnectionString”];
  4.       private static string topicName = ConfigurationManager.AppSettings[“TopicName”];
  5.       private static int orders = 100;
  6.       private static int messagePerSession = 4;
  7.       static async Task Main(string[] args)
  8.       {
  9.           Console.WriteLine(“Creating Service Bus sender….”);
  10.           var taskList = new List<Task>();
  11.           var sender = new MessageSender(connectionString, topicName);
  12.           //create an order
  13.           for (int order = 0; order < orders; order++)
  14.           {
  15.               var orderNumber = $”OrderId-{order.ToString()};
  16.               var messageList = new List<Message>();
  17.               //simulate a status update in the correct order
  18.               for (int m = 0; m < messagePerSession; m++)
  19.               {
  20.                   var status = string.Empty;
  21.                   switch (m)
  22.                   {
  23.                       case 0 :
  24.                           status = “1 – Ordered”;
  25.                           break;
  26.                       case 1:
  27.                           status = “2 – Picked”;
  28.                           break;
  29.                       case 2:
  30.                           status = “3 – Packaged”;
  31.                           break;
  32.                       case 3:
  33.                           status = “4 – Dispatched”;
  34.                           break;
  35.                   }
  36.                   var message = new Message(Encoding.UTF8.GetBytes($”Status-{status}))
  37.                   {
  38.                       //set the service bus SessionId property to the current order number
  39.                       SessionId = orderNumber
  40.                   };
  41.                   messageList.Add(message);
  42.               }
  43.               //send the list of status update messages for the order to the service bus
  44.               taskList.Add(sender.SendAsync(messageList));
  45.           }
  46.           Console.WriteLine(“Sending all messages…”);
  47.           await Task.WhenAll(taskList);
  48.           Console.WriteLine(“All messages sent.”);
  49.       }
  50.   }

Two Azure functions will be created, where one has sessions enabled and the other disabled. The functions will have a random delay created from 1 to 10 seconds to simulate some business logic which may be calling out to an external service before updating the order status. Instead of the function writing to a database, each status update message received will be written to an Azure Table storage to create an audit log of when a status update message was processed.

Below is the source code for the function which will process the messages on the service bus using sessions. Note the IsSessionEnabled property is set to true on the ServiceBusTrigger input binding. The randomiser is to simulate some business logic that could vary in time to process a message.

Azure Function using sessions
  1. public static class MsgOrderingSessions
  2.     {
  3.         [FunctionName(“MsgOrderingSessions”)]
  4.         [return: Table(“OrdersSession”, Connection = “StorageConnectionAppSetting”)]
  5.         public static OrderEntity Run([ServiceBusTrigger(“orders”, “OrdersSession”, Connection = “SbConnStr”, IsSessionsEnabled = true)]
  6.               Message sbMesssage, ILogger log)
  7.         {
  8.             log.LogInformation($”C# ServiceBus topic trigger function processed message: {Encoding.UTF8.GetString(sbMesssage.Body)});
  9.             Random random = new Random();
  10.             int randNumb = random.Next(1000, 10000);
  11.             System.Threading.Thread.Sleep(randNumb);
  12.             return new OrderEntity { PartitionKey = $”{sbMesssage.SessionId} – {DateTime.Now.Ticks} , RowKey = Guid.NewGuid().ToString(), Text = Encoding.UTF8.GetString(sbMesssage.Body) };
  13.         }
  14.     }

Below is the source code for the function which does not use sessions. Here the IsSessionEnabled is set to false.

Azure Function no sessions
  1. public static class MsgOrderingNoSession
  2.     {
  3.         [FunctionName(“MsgOrderingNoSessions”)]
  4.         [return: Table(“OrdersNoSession”, Connection = “StorageConnectionAppSetting”)]
  5.         public static OrderEntity Run([ServiceBusTrigger(“orders”, “OrdersNoSession”, Connection = “SbConnStr”, IsSessionsEnabled = false)]
  6.               Message sbMesssage, ILogger log)
  7.         {
  8.             log.LogInformation($”C# ServiceBus topic trigger function processed message: {Encoding.UTF8.GetString(sbMesssage.Body)});
  9.             Random random = new Random();
  10.             int randNumb = random.Next(1000, 10000);
  11.             System.Threading.Thread.Sleep(randNumb);
  12.             return new OrderEntity { PartitionKey = $”{sbMesssage.SessionId} – {DateTime.Now.Ticks}, RowKey = Guid.NewGuid().ToString(), Text = Encoding.UTF8.GetString(sbMesssage.Body) };
  13.         }       
  14.     }

Below is the settings for the service bus topic which has 2 subscriptions and one of them has Requires Session checked.

image

Running the console app creates 400 messages on both subscriptions, 4 status update messages per 1 order.

image

Conclusion

The Azure function which had the ServiceBusTrigger, IsSessionsEnabled = false inserted the rows out of order due to multiple function instances competing for the next message on the service bus.

image

Now the Azure Function which had IsSessionsEnabled = true and read messages from a service bus subscription which also had the Requires Session flag enabled, the messages were processed in the correct sequence as they were placed onto the service bus.

image

When using sessions, there is a slight performance hit depending on the number of function instances executing. In this example both functions where running under the consumption plan which spun up 6 instances. As you can see the number of messages waiting on each of the subscriptions below, the subscription which had sessions disabled are processing the messages a lot faster.

When sessions are used, each function instance places a locked on all messages having the same session Id which are processed one after another. As there were only 6 instances available, only a maximum of six orders could be processed at one time.

image

Enjoy…

Using an Azure APIM Scatter-Gather policy with a Mapping Function

This is a follow up from a previous blog “Azure APIM Scatter-Gather Pattern Policy” where I wrote about using the Wait policy to create a scatter-gather pattern.  A few colleges were asking about being able to map the inbound request to the different schemas required by each of the Microservices.

A high level design is shown below using two “Wait” polices and an Azure Function for the mapping. The first policy ‘Translation’ sends the request to the mapping function and when completed the second policy ‘Scatter’ is executed to send the mapped request to each of the Microservices.

image

The internals of the Azure Function that maps the incoming request is shown below as an example. Here I am using a route on the supplier name and a simple If-Then statement to select which static translator method to call.

public static class MappingService
{
[FunctionName("DataMapping")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "Pricing/{suppliername}")]
HttpRequest req,
string suppliername,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");

string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic response = null;
if (suppliername.Equals("Supplier1", StringComparison.OrdinalIgnoreCase))
response = Translator.TranslateSupplier1(requestBody);
else if (suppliername.Equals("Supplier2", StringComparison.OrdinalIgnoreCase))
response = Translator.TranslateSupplier2(requestBody);

return response != null
? (ActionResult)new OkObjectResult(response)
: new BadRequestObjectResult("Invalid message.");
}
}

Below is the code for the Translation Policy which calls the two Azure function resources in parallel that accepts the inbound price request message. The function access code is stored as a named-value pair in APIM called “functionapkey”.

<!-- Call the mapping service -->
<wait for="all">
<send-request mode="copy" response-variable-name="res_SupplierMap1" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier1?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="res_SupplierMap2" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier2?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>
 

The code for the Scatter Policy is shown below which is similar to the original blog post. However it uses the mapped outputs from the Translation policy that are stored in the res_SupplerMap1 and res_SupplerMap2 context variables instead.

<!-- Call the pricing services -->
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap1"]).Body.As<string>())</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap2"]).Body.As<string>())</set-body>
</send-request>
</wait>

The last policy checks the status of each of the pricing services and returns the results as a composite message if there were no errors encountered. This is similar to the original blog post but instead of returning a JObject I am now returning a JArray collection.

<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@{
JArray suppliers = new JArray();
suppliers.Add(((IResponse)context.Variables["response_1"]).Body.As<JObject>());
suppliers.Add(((IResponse)context.Variables["response_2"]).Body.As<JObject>());
return suppliers.ToString();
}</set-body>
</return-response>
</otherwise>
</choose>

The completed policy for the method looks like this below. Take note the request payload is stored in the variable named “requestPayload” initially to avoid locking body context in the <Wait> policies.

<policies>
<inbound>
<set-variable name="requestPayload" value="@(context.Request.Body.As&lt;string>(true))" />
<!-- Call the mapping service -->
<wait for="all">
<send-request mode="copy" response-variable-name="res_SupplierMap1" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier1?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="res_SupplierMap2" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier2?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>
<!-- Call the pricing services -->
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap1"]).Body.As<string>())</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap2"]).Body.As<string>())</set-body>
</send-request>
</wait>
<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@{
JArray suppliers = new JArray();
suppliers.Add(((IResponse)context.Variables["response_1"]).Body.As<JObject>());
suppliers.Add(((IResponse)context.Variables["response_2"]).Body.As<JObject>());
return suppliers.ToString();
}</set-body>
</return-response>
</otherwise>
</choose>
</inbound>
<backend>
<base />
</backend>
<outbound>
<base />
</outbound>
<on-error>
<base />
</on-error>
</policies>

Using the tracing feature in APIM, you can see the initial price request message below. This will be sent to the mapping function Pricing method.

 image

Below is the trace output from APIM showing the two different messages returned from mapping function and assigned to the res_Supplier variables.

image

Below is the composite message returned from APIM as an JSON array containing each supplier.

image

In conclusion, using the two <Wait> polices to send multiple requests in parallel yields a request/response latency of around 200ms on average in this scenario. Also instead of using an Azure Function to map the incoming request, you could replace it with a couple of “Transformation” policies.

Enjoy…

ARM Template for Provisioning an Azure Function App with a Key Vault

Late in 2018, Microsoft announced you can now store sensitive application setting values in an App Service to an Azure Key Vault without any changes to the function code. The only requirement was to update the value settings with @Microsoft.KeyVault(SecretUri=secret_uri_with_version)” to reference the Key Vault and enabling an identity account of the App Service to access the Key Vault.

This is a great leap forward having this feature baked into an App Service, however trying to create an ARM template to provision an App Service, Storage Account and a Key Vault by following these instructions https://docs.microsoft.com/en-us/azure/app-service/app-service-key-vault-references#azure-resource-manager-deployment proved to be rather challenging. After several hours of getting the versions correct and getting the dependencies in the correct order, I managed to create a working template to provision all 3 artefacts.

The template creates the App Service and uses the system-assigned managed identity account to access the Key Vault with Get only permissions. The primary key for the Storage Account and the Application Insights key are stored in Key Vault. These are then referenced by the AzureWebJobsStorage, WEBSITE_CONTENTAZUREFILECONNECTIONSTRING and APPINSIGHTS_INSTRUMENTATIONKEY names in the application settings of the Function App Service.

Updating the parameters file with the required artefact names and deploying the template, provisions the following services in Azure. Only thing left is to deploy your Function into the newly provisioned App Service.

image

If you try and view the Secrets in Key Vault, you will encounter an authorisation error shown below.  If you wish, you may update the ARM template to add your ID to the access policies collection of the Key Vault.

image

To add your account using the Azure Portal, navigate to Access Policies and then click Add new. Notice the App Svc account has already been added by the ARM template.

image

Then click on Select principal and type in you login address into the Principle blade to find your name and then click the select button.

image

Once your login name has been added, you can then select the required permissions.

image

Now you can view the keys added by the ARM template.

image

Below are the Application settings under the App Service showing references to the Key Vault for the values.

image

The code for the ARM Template can be downloaded from here: https://github.com/connectedcircuits/azappsvc. I have added comments into the template so you should be able to modify it to suit your requirements.

Enjoy…

Always subscribe to Dead-lettered messages when using an Azure Service Bus

I typically come across solutions that incorporate a service bus where monitoring of the dead-letter queues are simply ignored. Upon questioning the reason, the normal excuse for not monitoring the DLQ (dead-letter queue) is quote “I don’t require it because I have added exception handling therefore no messages will end up on the DLQ”. From experience there will be scenarios where the exception logic did not capture an edge case scenario and the message will end up onto the DLQ without anyone knowing about. 

A simple solution is to have a single process to monitor all the DQL messages and raise an alert when one occurs. Below is one of my building blocks which a typically incorporate when there is a service bus involved in a solution.

image

To use the DLQ building block to centrally capture any DLQ message, simply set the “ForwardDeadLetteredMessagesTo” property on each of the messaging queues to a common DLQ handler queue as shown below.

image

Now when a message gets dead lettered, it will end up in this common queue which is monitored by an Azure Function. Note the current NuGet version v3.0.4 of the ServiceBus DLL has the DeadLetterSource property and is not currently available in the Logic App Service Bus connector. The function writes the DLQ meta data, any custom properties and the message payload to a blobstore file. By using an Azure Storage Account V2 for the blobstore, a new blob creation event will be fired to any interested subscribers which in this case is a Logic App.

Azure Function Code

Below is the code for the Azure Function. Here I am using the IBinder interface to allow me to set the folder path and file name imperatively. The connection strings (ServiceBusConn, StorageAccountConn) are defined in the Application settings of the  Azure App Service.

Code Snippet
  1. using Microsoft.Azure.ServiceBus;
  2. using Microsoft.Azure.WebJobs;
  3. using Microsoft.Extensions.Logging;
  4. using Newtonsoft.Json;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.IO;
  8. using System.Threading.Tasks;
  9.  
  10. namespace ServiceBusDLQMonitoring
  11. {
  12.     public static class Function1
  13.     {
  14.         [FunctionName("Function1")]
  15.  
  16.         public static async Task RunAsync([ServiceBusTrigger("dlq-processor", Connection = "ServiceBusConn")] Message dlqQueue,
  17.             Binder blobBinder,          
  18.             ILogger log)
  19.         {
  20.             log.LogInformation($"C# ServiceBus queue trigger function processed message: {dlqQueue.MessageId}");
  21.  
  22.             //set the filename and blob path
  23.             var blobFile = Guid.NewGuid().ToString() + ".json";
  24.             var path = string.Concat("dlq/",DateTime.UtcNow.ToString("yyyy/MM/dd"),"/" ,dlqQueue.SystemProperties.DeadLetterSource,"/", blobFile);
  25.  
  26.             var dlqMsq = new DLQMessage
  27.             {
  28.                 DeadletterSource = dlqQueue.SystemProperties.DeadLetterSource,
  29.                 MessageId = dlqQueue.MessageId,
  30.                 SequenceNumber = dlqQueue.SystemProperties.SequenceNumber,
  31.                 SessionId = dlqQueue.SessionId,
  32.                 UserProperties = dlqQueue.UserProperties,
  33.                 DeadLetterReason = dlqQueue.UserProperties["DeadLetterReason"].ToString(),
  34.                 EnqueuedDttmUTC = dlqQueue.SystemProperties.EnqueuedTimeUtc,
  35.                 ContentType = dlqQueue.ContentType,
  36.                 DeliveryCount = dlqQueue.SystemProperties.DeliveryCount,
  37.                 Label = dlqQueue.Label,
  38.                 MsgBase64Encoded = Convert.ToBase64String(dlqQueue.Body, Base64FormattingOptions.None)
  39.             };
  40.  
  41.             var blobAttributes = new Attribute[]
  42.             {
  43.                 new BlobAttribute(path),
  44.                 new StorageAccountAttribute("StorageAccountConn")                
  45.             };
  46.  
  47.             using (var writer = await blobBinder.BindAsync<TextWriter>(blobAttributes))
  48.             {              
  49.               writer.Write(JsonConvert.SerializeObject(dlqMsq,Formatting.Indented));
  50.             }
  51.         }
  52.     }
  53.  
  54.     public class DLQMessage
  55.     {       
  56.         public string DeadletterSource { get; set; }
  57.         public long SequenceNumber { get; set; }
  58.         public string MessageId { get; set; }
  59.         public string SessionId { get; set; }
  60.         public string Label { get; set; }
  61.         public string DeadLetterReason { get; set; }
  62.         public DateTime EnqueuedDttmUTC { get; set; }
  63.         public int DeliveryCount { get; set; }        
  64.         public string ContentType { get; set; }
  65.         public IDictionary<string,object> UserProperties { get; set; }
  66.         public string MsgBase64Encoded { get; set; }
  67.     }
  68. }

 

Logic App Implementation

A Logic App is used to the retrieve the message from the blob store when it is triggered by the EventGrid  HTTP webhook. The basic workflow is shown below and can be expanded to suit your own requirements.

image

The expression for the ‘Get blob content using path’ action is @{replace(triggerBody()[0][‘data’][‘url’],’https://dqlmessages.blob.core.windows.net/’,”)}. Here I am just replacing the Domain name with an empty string as I only want the resource location.

The Parse JSON action has the following schema. This makes it easier to reference the properties downstream.

{

    "properties": {

        "ContentType": {

            "type": [

                "string",

                "null"

            ]

        },

        "DeadLetterReason": {

            "type": "string"

        },

        "DeadletterSource": {

            "type": "string"

        },

        "DeliveryCount": {

            "type": "integer"

        },

        "EnqueuedDttmUTC": {

            "type": "string"

        },

        "Label": {

            "type": [

                "string",

                "null"

            ]

        },

        "MessageId": {

            "type": "string"

        },

        "MsgBase64Encoded": {

            "type": [

                "string",

                "null"

            ]

        },

        "SequenceNumber": {

            "type": "integer"

        },

        "SessionId": {

            "type": [

                "string",

                "null"

            ]

        },

        "UserProperties": {

            "type": "any"

        }

    },

    "type": "object"

}

 

The last action ‘Set variable MsgBody’  has the value set to: “@{base64ToString(body(‘Parse_JSON’)?[‘MsgBase64Encoded’])}”

Blob creation event configuration

Next is to setup a subscription to the Blob creation event. Click on Events under the Storage account for the DLQ messages as shown below.

image

Then click on the +Event Subscription to add a new subscription.

image

Setup the subscription Basic details with a suitable subscription name and the blob storage account resource name. Uncheck the ‘Subscribe to all event types’ and select Blob Created event. Set the endpoint details to Web Hook and the URL of the Logic App Http trigger endpoint address.

image

Under Filters, enable subject filtering. Add the following prefix ‘/blobServices/default/containers/’ to the name of the DLQ container (in this example its called ‘dlq’) and add it to the ‘Subject Begins With’ textbox.  Then in the ‘Subject Ends With’, set it to the filename extension ‘.json’. Now click the Create button at the bottom of the page to create the subscription.

image

Sample Logic App output

Once everything is wired up and if there are any messages that have been placed onto the DLQ, you should see some logic app runs. Below is an example of the outputs from the last two actions.

image

All you need to do now is extend the Logic App to deliver the DLQ alert to somewhere.

Enjoy…

Using Logic App Webhooks to execute long running SQL queries

On one of my projects, I had a requirement to send several large datasets as csv files to an FTP server using a Logic App.

I had thought about 2 options to implement this requirement. The first option was to use the Azure SQL Server API connector to extract the dataset from the database. But the amount of data to be serialised as JSON would certainly cause the SQL API connector to timeout or exceed the maximum payload size without some form of database paging and using paging would not meet the requirement of a single file.

The second option was to utilise the HTTP Webhook API connector available with Logic Apps to call an Azure Function passing the call-back URL and the database stored procedure to execute. The Azure Function would simply place the JSON message body onto an Azure Storage account queue and then return a HTTP response code 200 to put the logic app into a dehydrated state.

Another Azure Function would poll the queue for new messages. When a new message is received from the queue, it would query the database using the stored procedure name sent in the queue message.  A datareader is then used to read the returned recordset for speed and populate a DataTable. The rows in the DataTable are then converted into a csv file which is then written to a blob container in a streaming fashion. Once the function has completed creating the csv file, it would call-back to Logic App using the URL sent in the queue message which will be in a dehydrated state to continue with the workflow and send the file from the Blobstore to the FTP server.

Using the Webhook option meant the Logic App would go into a dehydrated state after calling the Azure Function and I would not be charged for any consumption time Smile  whilst in this state. This meant the Azure Function may take as long as required to create the csv file while executing under a consumption billing plan.

Below is a sequence diagram showing the activation of each process. The whole process is triggered by a scheduler inside a logic app.

image

Creating the Solution

I have broken the solution into 2 projects, one for the Azure Functions and the other for the Helper classes. I typically abstract all the business logic out from the Azure Functions and place them in a separate class library project were I can create unit tests for the business logic.

image

Defining the Helper Libraries

The first helper class is the AzureStorage which has one method defined to return a blob reference object and to create the blob container if it does not exist.

using Microsoft.WindowsAzure.Storage;

using Microsoft.WindowsAzure.Storage.Blob;

using System;

using System.Configuration;

using System.Threading.Tasks;


namespace Helpers

{

    public static class AzureStorage

    {

        public static async Task<CloudBlockBlob> CreateBlobAsync(string fileName, string containerName)

        {

            try

            {

                // Retrieve storage account from connection string.

                CloudStorageAccount storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["BlobStore"]);

                // Create the blob client.

                CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();


                CloudBlobContainer container = blobClient.GetContainerReference(containerName.ToLower());

                //firstly, we need check the container if exists or not. And if not, we need to create one.

                var creaeContainer = await container.CreateIfNotExistsAsync();


                // Retrieve reference to a blob

                return  container.GetBlockBlobReference(fileName.ToLower());

            }

            catch (Exception ex)

            {

                throw ex;

            }

        }

    }

}

The next class is the “DataAccess” for accessing the database using a datareader to load a DataTable which is then returned. Remember the default command timeout is 30 seconds, so this needs to be increased on the SQLCommand object.

Note this is just a simple implementation of reading the data. Ideally you would add transient fault handling code with retry logic built-in for production releases.

using System;

using System.Configuration;

using System.Data;

using System.Data.SqlClient;


namespace Helpers

{

    internal static class DataAccess

    {

        internal static DataTable CreateDataExtract(string sprocName)

        {

            var constr = ConfigurationManager.ConnectionStrings["SQLConnectionString"].ConnectionString;

            DataTable dataTable = null;

            try

            {

                dataTable = new DataTable();

                using (SqlConnection conn = new SqlConnection(constr))

                {

                    conn.Open();

                    SqlCommand cmd = new SqlCommand(sprocName, conn);

                    cmd.CommandType = CommandType.StoredProcedure;

                    cmd.CommandTimeout = 600;

                    dataTable.Load(cmd.ExecuteReader());

                }

            }

            catch(SqlException sqlEx)

            {

                //capture SQL errors

                throw sqlEx;

            }

            catch (Exception genEx)

            {

                //capture general errors

                throw genEx;

            }

            return dataTable;

        }

    }

}

The last helper class “DataExtract” is the entry point for the Azure Function. This calls the other helper methods to read the database and then converts the DataTable into a csv file by writing the output in a streaming manner to the blob container called “csv”

using System;

using System.Data;

using System.IO;

using System.Text;

using System.Threading.Tasks;


namespace Helpers

{

    public static class DataExtracts

    {


        public static async Task<string> GenerateCSVDataExtractAsync(string storedProc, string columnDelimiter = ",")

        {

            string filename;

            try

            {

                //call data acess 

                var dataTable = DataAccess.CreateDataExtract(storedProc);


                //save table as CSV to blobstore

                filename = Guid.NewGuid().ToString("D") + ".csv";

                await WriteDataTableToCSVAsync(dataTable, "csv", filename, columnDelimiter);


            }

            catch(Exception ex)

            {

                throw ex;

            }


            return filename;

        }


        private static async Task WriteDataTableToCSVAsync(DataTable dt, string containerName, string filename, string columnDelimiter)

        {

            if (dt != null)

            {

                using (var ms = new MemoryStream())

                using (var sw = new StreamWriter(ms, Encoding.UTF8))

                {

                    //create the header row       

                    for (int i = 0; i < dt.Columns.Count; i++)

                    {

                        sw.Write(dt.Columns[i].ColumnName);

                        sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                    }


                    //append the data rows

                    foreach (DataRow row in dt.Rows)

                    {

                        for (int i = 0; i < dt.Columns.Count; i++)

                        {

                            sw.Write(EscapeCSV(row[i].ToString()));

                            sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                        }

                    }

                    sw.Flush();

                    ms.Position = 0;


                    //write to blobstore

                    var blob = await BlobStore.CreateBlobAsync(filename, containerName);

                    await blob.UploadFromStreamAsync(ms);

                }

            }

        }


        private static  string EscapeCSV(string colData)

        {

            string quote = "\"";

            string escapedQuote = "\"\"";

            char[] quotedCharacters = new char[] { ',', '"', '\r', '\n', '\t' };



            if (colData == null) return "";

            if (colData.Contains(quote)) colData = colData.Replace(quote, escapedQuote);

            if (colData.IndexOfAny(quotedCharacters) > 1)

                colData = quote + colData + quote;


            return colData;

        }


    }


}


Defining the Azure Functions

The next project called “LongRunningJobs” contains the 2 Azure Functions required by the solution.

Below is the code for the function “SQLJobRequest” which is called by the Logic App. This simply puts the posted JSON request message onto an Azure Storage Queue using an output parameter of type Queue which simplifies the coding to a few lines of code.

using System.Net;

using System.Net.Http;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Extensions.Http;

using Microsoft.Azure.WebJobs.Host;


namespace LongRunningJobs

{


    public static class SQLJobRequest

    {

        [FunctionName("SQLJobRequest")]

        public static async Task<HttpResponseMessage> Run(

                                    [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]HttpRequestMessage req,

                                    [Queue("sqljobs", Connection = "BlobStore")]IAsyncCollector<string> outputQueueItem,

                                    TraceWriter log )

        {

            log.Info("C# HTTP trigger function processed a request.");

            var isParamsValid = false;


            // Get request body

            dynamic data = await req.Content.ReadAsAsync<object>();

            string sprocName = data?.SprocName;

            string callBackUrl = data?.CallBackUrl;


            if (!string.IsNullOrEmpty(sprocName) && !string.IsNullOrEmpty(callBackUrl))

            {

                isParamsValid = true;

                await outputQueueItem.AddAsync(data.ToString());

            }



            return isParamsValid == false

                ? req.CreateResponse(HttpStatusCode.BadRequest, "Please pass the sproc name and callback url.")

                : req.CreateResponse(HttpStatusCode.OK);

        }

    }

}

The line [Queue(“<name-of-queue>“, Connection = “<blob-connection-string>“)]IAsyncCollector<string> outputQueueItem in the function parameter list, uses output attributes to define the name of the queue and the name of the blob connection string to use.

The other function “SQLJobExecute” polls the Storage Queue “sqljobs” for new messages. When a new message is received, it calls the helper class method “DataExtracts.GenerateCSVDataExtractAsync(sprocName)” to create the csv file. If the csv file was created successfully, the filename of the csv file is returned in the HTTP Call-back Post body.

using System;

using System.Net.Http;

using System.Net.Http.Headers;

using System.Text;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Host;

using Newtonsoft.Json;


namespace LongRunningJobs

{

    public static class SQLJobExecute

    {

        [FunctionName("SQLJobExecute")]

        public static void Run([QueueTrigger("sqljobs", Connection = "BlobStore")]string queueItem, TraceWriter log)

        {

            log.Info($"C# Queue trigger function processed: {queueItem}");


            dynamic data = JsonConvert.DeserializeObject(queueItem);


            string sprocName = data.SprocName;

            string callBackUrl = data.CallBackUrl;

            string logicappRunId = data.RunId;


            //check if valid parameters were passed.

            if(string.IsNullOrEmpty(sprocName) || string.IsNullOrEmpty(callBackUrl) || string.IsNullOrEmpty(logicappRunId))

                log.Error("Null value parameters passed.");

            else

                Task.Run(async() => { await ExecuteQueryAsync(callBackUrl, sprocName, logicappRunId, log);});


        }


        private static async Task ExecuteQueryAsync(string callBackUrl, string sprocName, string logicAppRunId, TraceWriter log)

        {

            string blobFilename = string.Empty;

            try

            {

                //call the helper class to create the csv file.

                blobFilename = await DataExtracts.GenerateCSVDataExtractAsync(sprocName);

            }

            catch (Exception ex)

            {

                log.Error(ex.Message, ex);

            }


            try

            {

                //call-back to the Logic App Webhook using the URL passed in the message from the queue.

                using (var httpClient = new HttpClient())

                {

                    var postUrl = new Uri(callBackUrl);

                    httpClient.DefaultRequestHeaders.Accept.Clear();

                    httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

                    var content = new StringContent("{\"SprocName\":\"" + sprocName + "\",\"Filename\":\"" + blobFilename + "\"}", Encoding.UTF8);

                    var response = await httpClient.PostAsync(postUrl, content);

                    response.EnsureSuccessStatusCode();

                }

            }

            catch (Exception ex)

            {

                log.Error(string.Format("Error occurred when executing function for logic app runId:{0}\n{1}", logicAppRunId, ex.Message), ex);

            }


        }

    }

}

Below is the local.settings.json file which has the following values defined for the Blob Store and SQL Database.

image

After the Azure Functions have been deployed to an Azure App Service, the Application Settings for the Blob Store and SQL Server settings highlighted below need to be added.

image

Logic App Definition

Below are Azure resources used for this solution. The Azure Function are deployed to the “dalfunctions” App Service and “dataextracts” Storage account contains the Blob Store for the csv files and the storage queue “sqljobs”.

image

The Logic App is a very simple workflow which is triggered by a scheduler.

image

The HTTP Webhook shape is what calls the function endpoint “SQLJobRequest” and then puts the Logic App into a dehydrated state.

image

The Subscribe-URI can be obtained from the App Service after the Azure Functions have been deployed and using the “Get function URL” link below.

image

The Subscribe-Body property consists of a JSON payload.  This includes the call-back Url, Logic App run Id for debugging latter if required and the SQL stored procedure name to call in the database. Below is the code view for the HTTP Webhook action.

image

After the HTTP Webhook shape is the JSON Parser and a condition to check if a filename was returned in the webhook request body.

image

The schema for the JSON Parser checks for the FileName and the SprocName.

image

The expression for the condition is below:

image

The “True” condition gets the csv file from the Blob Store and sends it to the FTP server. Note we have to prefix the filename with the Blob container name. Once the file has been sent, it is deleted from the Blob Store.

image

If no filename was received when the Logic App comes out of its dehydrated state, the workflow is terminated.

image

Sample runs

As you can see from the sample runs below, some are taking over 6 minutes to complete which would normally cause a HTTP Function action to timeout.image

Conclusion

The solution can easily be modified to pass a list of stored procedures to create several csv files in one call and then have the call-back function pass the list of filenames to send via FTP.

This pattern could also be used for other scenarios that require an Azure Function to execute some long running process which would cause the normal HTTP Function action to timeout while waiting for the response message.

Enjoy…

Message Re-sequencing using Azure Functions and Table storage

Message re-sequencing  is another integration pattern used to rearrange messages received out-of-order and back into the original sequence again. Well you may ask yourself why is this pattern is so important in todays world of messaging systems.

As the integration world is moving towards services hosted as Microservices in the cloud due to its scalability attributes and independent deployable components. This may require breaking messages up into smaller logical segments or de-batching the messages to allow the services to manage the data more efficiently. Using this pattern, makes it possible to add the smaller segments back together again after being broken up in its correct order.

image

Scenario

A typical use case is a Purchase Order with multiple line items which require processing asynchronously by a Microservice at an individual line level. This service may calculate the taxes, check stock levels and add the total line price. Some line items may take longer to process than others before returning the updated line item.

The approach would be to de-batch the PO at the line level and send each line item to the Microservice. The response message from the Microservice would then go through a Message Re-Sequencer component to put the order lines back in the correct sequence again before returning the updated PO response.

Another use case would be to re-sequence messages after a scatter gather pattern (https://connectedcircuits.blog/2017/10/08/integration-scatter-gather-pattern-using-azure-logic-apps-and-service-bus/) before forwarding them on.

Approach

For this re-sequencer pattern to work, we require 3 attributes to be passed with each message to the messaging re-sequencer service, the sequence number of the message, the total number of messages to expect and a batch number.

A storage repository  is also required to temporary hold the out-of-sequence messages. The repository will be based on an Azure Table Storage as it provides all the requirements of storing NoSQL data structures, a Partition key to group the related messages together and a Row key to identity each message by the sequence number. By using the combination of these two keys we can quickly find and retrieve a message from the store.

An Azure Function is used to manage the saving and retrieving of messages from Table storage and is designed to be agnostic of the message types. This function is actually the core component of this solution.

The logic app is primarily there to receive the message, de-batch the line items and pass it onto the function.  Then forward on the message when all de-batched line items have been received.

There are two possibilities available as when to forward the re-ordered messages out.

  • On arrival – as a message arrives we check if any messages in the repository can be sent out before this one is sent.
  • Last message arrival –  where we send all the messages out I one hit after the final message  has been received.

This design will focus on the last option, where we will send all messages out in one hit. My next blog will cover how to send it out using the On Arrival method.

Design

Creating the Azure Table Storage

First we need to add an Azure Storage account to our resource group to store the incoming messages.

clip_image002

Take a copy of the connection string from Settings/Access keys of Storage account as we will require this latter.

clip_image004

Building the Azure Function

Start by creating an Azure Functions project in Visual Studio 2017.

clip_image006

Now add a new C# class to the project called MsgEntity with the following properties and an overloaded constructor shown below. Also we need to inherent the TableEntity base class from the “Microsoft.WindowsAzure.Storage.Table” assembly.

clip_image007

This class will be used as a container to hold the message contents and sequence numbering attributes.

Now we need to add the following three methods to the function described below.

  • AzureTableStoreMsg – Stores the message in Azure Table Storage and returns the total number of messages received. If the final message in the sequence has been previously received, then the date/time it was received is returned also.
  • AzureTableRetrieveMsg – Retrieves all messages from the table storage using the batch number or an individual message using the message sequence number.
  • AzureTableRemoveMsg – deletes all the messages from table storage related to the batch number.

To add a function method to the project, right click on the project and select New Item. From the Add New Item popup window, select Azure Function

 

.clip_image009

Then select the “Generic WebHook” type. This will allow a Logic App to access the interface from the workflow designer using the Azure Functions action shape latter.

clip_image010

Add the following code to this new method “AzureTableStoreMsg”

   1: [FunctionName("AzureTableStoreMsg")]

   2: public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3: {

   4:     log.Info($"Webhook was triggered!");

   5:

   6:     var lastMsgIndex = 0;

   7:     var msgCount = 0;

   8:     var eomDateTimeReceived = DateTime.MinValue;

   9:

  10:     try

  11:     {

  12:         //read the request headers              

  13:         var sequencenumber = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqNumber").FirstOrDefault());

  14:         var totalMsgCount = Convert.ToInt32(req.Headers.GetValues("x-MsgReseqTotalCount").FirstOrDefault());

  15:         var batchnumber = req.Headers.GetValues("x-MsgReseqBatchId").FirstOrDefault();

  16:         var isEndMsg = totalMsgCount == sequencenumber ? true : false;

  17:

  18:         string jsonContent = await req.Content.ReadAsStringAsync();

  19:

  20:         var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  21:         // Create the table client.

  22:         CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  23:         // Retrieve a reference to the table.

  24:         CloudTable table = tableClient.GetTableReference("msgsequence");

  25:         // Create the table if it doesn't exist.

  26:         table.CreateIfNotExists();

  27:

  28:         // Get request body and initialise a MsgEntity object.

  29:         var requestBody = await req.Content.ReadAsAsync<object>();

  30:         var msgEntity = new MsgEntity(batchnumber, sequencenumber, isEndMsg);

  31:         msgEntity.Message = requestBody.ToString();

  32:

  33:         // Create the TableOperation object that inserts the message entity. Will rasie error if duplicate found.

  34:         TableOperation insertOperation = TableOperation.InsertOrReplace(msgEntity);

  35:

  36:         // Execute the insert operation.

  37:         await table.ExecuteAsync(insertOperation);

  38:

  39:         //iterate through all the messages for this partition check if the last message has been received and the number of messages

  40:         var queryMsgType = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  41:                            .Select(new string[] { "RowKey", "Timestamp", "IsLastMessage" });

  42:

  43:         foreach (MsgEntity entity in table.ExecuteQuery(queryMsgType))

  44:         {

  45:             if (entity.IsLastMessage)

  46:             {

  47:                 lastMsgIndex = Convert.ToInt32(entity.RowKey);

  48:                 eomDateTimeReceived = entity.Timestamp.UtcDateTime;

  49:             }

  50:             msgCount++;

  51:         }

  52:

  53:     }

  54:     catch (Exception ex)

  55:     {

  56:         return req.CreateResponse(HttpStatusCode.InternalServerError, ex.Message);

  57:     }

  58:

  59:     // Return the number of messages when the completion message arrived.

  60:     return req.CreateResponse(HttpStatusCode.OK, new { EOMReceived = eomDateTimeReceived, EOMIndex = lastMsgIndex, TotalMessagesReceived = msgCount });

  61:

  62: }

The current message sequence number, total number of messages and the batch number are passed in as custom header values while the message is passed in the body payload. This function will create a table called “msgsequence” if it does not exist and will populate the custom object with the message properties and the message before storing the object in the Azure Table. The response message from this function returns the total number of messages received and if the last message in the sequence has been received. Below is an example of the response message showing the number of messages it has received so far and when the last message sequence was received.

image

Add another Azure Function as before called “AzureTableRetrieveMsg” for retrieving messages from the Table store and add the following code below.

   1: [FunctionName("AzureTableRetreiveMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string,string>>(jsonContent);

   8:

   9:            var sequencenumber = data["MsgSequenceNUmber"];

  10:            var batchnumber = data["MsgBatchId"];

  11:

  12:

  13:            var storageAccount = Microsoft.WindowsAzure.Storage.CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  14:            // Create the table client.

  15:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  16:            // Retrieve a reference to the table.

  17:            CloudTable table = tableClient.GetTableReference("msgsequence");

  18:

  19:            // Construct the query operation for all entities where PartitionKey equals msgtype.

  20:            TableQuery<MsgEntity> query = null;

  21:            if (string.IsNullOrEmpty(sequencenumber))

  22:            {

  23:                query = new TableQuery<MsgEntity>().Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber));

  24:            }

  25:            else

  26:            {

  27:                var sequenceFilter = TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, string.Format(sequencenumber,"0000000000"));

  28:                var msgTypeFilter = TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber);

  29:                query = new TableQuery<MsgEntity>().Where(TableQuery.CombineFilters(sequenceFilter, TableOperators.And, msgTypeFilter));

  30:            }

  31:

  32:            var list = new List<dynamic>();

  33:            foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  34:            {

  35:                list.Add(JsonConvert.DeserializeObject(entity.Message));

  36:            }

  37:

  38:            return req.CreateResponse(HttpStatusCode.OK, list);

  39:        }

This function will optionally return all records for the batch number or a single message matching the passed in sequence number. A sample request message is shown below which will return all records matching the batch number if the MsgSequenceNumber is left blank.

image

The last Azure Function method to add is “AzureTableRemoveMsg”.

   1: [FunctionName("AzureTableDeleteMsg")]

   2:        public static async Task<object> Run([HttpTrigger(WebHookType = "genericJson")]HttpRequestMessage req, TraceWriter log)

   3:        {

   4:            log.Info($"Webhook was triggered!");

   5:

   6:            string jsonContent = await req.Content.ReadAsStringAsync();

   7:            Dictionary<string, string> data = JsonConvert.DeserializeObject<Dictionary<string, string>>(jsonContent);

   8:

   9:            var batchnumber = data["MsgBatchId"];

  10:

  11:            var storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["StorageConnectionString"]);

  12:            // Create the table client.

  13:            CloudTableClient tableClient = storageAccount.CreateCloudTableClient();

  14:            // Retrieve a reference to the table.

  15:            CloudTable table = tableClient.GetTableReference("msgsequence");

  16:

  17:            // Construct the query operation for all entities where PartitionKey equals batchnumber.            

  18:            if (!string.IsNullOrEmpty(batchnumber))

  19:            {

  20:                TableQuery<MsgEntity> query = new TableQuery<MsgEntity>()

  21:                            .Where(TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, batchnumber))

  22:                            .Select(new string[] { "PartitionKey", "RowKey" });

  23:

  24:                if (table.ExecuteQuery(query).Count() == 0)

  25:                {

  26:                    return req.CreateResponse(HttpStatusCode.NotFound);

  27:                }

  28:                foreach (MsgEntity entity in table.ExecuteQuery(query).OrderBy(x => x.RowKey))

  29:                {

  30:                    TableOperation deleteOperation = TableOperation.Delete(entity);

  31:                    await table.ExecuteAsync(deleteOperation);

  32:                }

  33:                return req.CreateResponse(HttpStatusCode.OK);

  34:            }

  35:

  36:

  37:            return req.CreateResponse(HttpStatusCode.BadRequest);

  38:        }

This is to delete all the messages from the Table storage for a batch number. A sample request is shown below.

image

Next we need to add the Azure Table storage connection string to the local.settings.json file in the project.

clip_image012

Before publishing the function to Azure, be sure to create a Unit Test project and check its functionality.

Publishing the Function to Azure

Right click on the project and click Publish. This will take you to the publishing wizard form. Choose “Azure Function App” and “Create New”, then click the publish button.

clip_image014

In the next screen add a function name, subscription and hosting plans etc. Then click the create button to provision any new resources and to publish the function.

clip_image016

Once the function has been published, we need to log into the Azure portal and add the Storage Account connection string to the Application settings file. Double click on the azure function app in the Azure portal.

clip_image018

Then click on the Application settings link.

clip_image020

In the Application settings section, add a new key called “StorageConnectionString” and add the connection string that was used in the local.settings.json in the Visual Studio Function project. Then click the “Save” button at the top of the screen.

clip_image022

 

Logic App for message de-batching and re-sequencing

Next we will create the Logic App to de-batch the individual line items in a parallel fashion to send to an other Logic App for processing which is acting as a Microservice.  The response is then passed to the function we created above. Once all the messages have been received from the Microservice, we call another function method to return all the stored responses in the correct order before returning the result back to the consumer.

The message will be based on a PO (Purchase Order) shown below. We will be using the OrderNumber as the batch number and the LineNumber’s for re-sequencing the line items later.

image

Below is the collapsed view of the Logic App. It is triggered by a PO (Purchase Order) being posted to the HTTP endpoint with the schema is based on the PO message above.

image

Two variables are required for holding the total number of  line items and the PO lines from the Message Re-sequencer function.

image

The code view for the Initialize variable MaxLineItems is shown below, where we just take the length of the Lines collection.

image

After the variables are initialised, is the For each loop shape where we iterate through the PO lines of the message.

image

The POValidator action calls another Logic App passing the line item which simulates a Microservice to calculate the total line amount and could possibly check stock levels.

image

The response is then passed onto the AzureTableStoreMsg function method which was published to Azure in the previous steps.

Functions that have been created as generic webhooks can be selected from the Logic App designer by adding an action to the designer and choosing Azure Functions

.image

After selecting Azure functions, you should be able to see all your published functions. Click on the Azure function that was published above.

image

This will then display all the methods that we had created above. Make sure you select the AzureTableStoreMsg method from the list.

image

After the function method has been added, setup the properties as shown.

image

The body and header properties are set to the following values.

  • Request Body  – output body of the POValidator Logic App (Microservice)
  • x-MsgReseqBatchId  – the OrderNumber of the trigger message
  • x-MsgReseqNumber – the line number of the item in the loop
  • x-MsgReseqTotalCount – is the value of the MaxLineItems variable which was initiated at the start of the workflow.

And the code view for this action is shown here:

image

The step in the workflow is to check if we have received all the messages. This is done by checking the response message from the function method AzureTableStoreMsg and the two properties EOMIndex and TotalReceived to see if they are equal.

image

The syntax for the condition action “Check msg count” is as follows:

@equals(body(‘AzureTableStoreMsg’)[‘EOMIndex’], body(‘AzureTableStoreMsg’)[‘TotalMessagesReceived’])

If the condition is true, then the next step is to retrieve all the messages in the correct order from the Table Storage using the function method “AzureTableRetrieveMsg”. Follow the steps as before to add an action to call this function method. The only property to send in the request body is the MsgBatchId which is set to the PO Order number.

image

The code view for this shape looks like this.

image

We then set the variable LineItems to the output of the function method “AzureTableRetrieveMsg”.

image

Once we set the variable, we can then clear the table storage of this data by calling the function method “AzureTableDeleteMsg” and passing the PO Order number in the body.

image

When the “For each” shape completes we then compose the response message by replacing the existing line items with the items stored in the LineItem variable using the “SetProperty” expression.

image

Here is the code view of the Compose shape.

image

The final step is Response body to that of the Compose shape above.

image

That’s the development done for this integration pattern. Next we will create the POValidator logic app to simulate calling a Microservice. This Logic App just calculates the line total by multiplying the UnitPrice and Qty to crate a new property TotalPrice. I have also added a random Delay shape to simulate some line items taking longer to process.

Below is the workflow process for this Logic App. I won’t go into the detail of each action as this Logic App is just there to test the message re-sequencing.

image

Testing

To test this solution we are going to use one of my favourite tools PostMan. The following PO message will sent to the MessageResequencer Logic App. Notice the absence of the line item total, the POValidator Logic App will calculate this instead for us.

image

As you can see below, each line item has been sent to the POValidator Logic App and took different times to complete due  to the delay shape.

image

Here is the response message with the total price appended to each line item and in the correct sequence.

image

Now to prove that the line items do get re-ordered in the correct sequence, I will swap the line numbers around in the PO request message and with a different PO number.

image

Here is the response with the lines re-sequenced in the correct order.

image

Conclusion

Using Azure Functions and Table Storage makes it fairly easy to develop this integration pattern. To provide a more resilient repository for storing the messages temporary, you may want to consider using the Azure Cosmos Db.

Also the http connectors can also be swapped out for Service Bus connectors when communicating with services that are slow in responding.

You would have noticed that when you call the function method “AzureTableStoreMsg” it returns the following response message.

image

You can use the EOMReceived and TotalMessagesReceived to determine if an error has occurred or Microservice is taking too long to process the de-batched items. Under ideal conditions, the EOMIndex would be set on the last de-batched message and the TotalMessagesReceived equal the EOMIndex value.

Keep a watch out for my next blog where I will show you how re-sequence messages as they arrive.

Enjoy…