Ensuring Ordered Delivery of Messages using Azure Functions

Typically when using Azure Functions to consume messages from a Service Bus (SB), the ordering is not guaranteed although the SB is First-In-First-Out (FIFO). This is due to competing consumers, where multiple instances of the function are competing for messages of the service bus.

An example where out of ordering can happen is when a function instance takes longer to process a message than other instances therefore affecting the process ordering. This is represented in the sequence diagram below, where the function instance 1 took longer to update the same record in a database than instance 2.

image

One option to enforce ordered delivery is to configure the Azure Function to spin up only one instance. The only problem with this solution is it won’t scale very well. A more scalable option is to use sessions.This allows you to have multiple instances of a function executing giving you a higher message throughput.

To enforce message ordering several properties must be set. The property Requires Session must be enabled on the SB queues and topic subscriptions. Messages sent onto the SB must set the context property SessionId to unique value from other non related messages. Some examples of a session Id could be the account number, customer number, batch Id, etc. Azure Functions need to have the IsSessionsEnabled property set to enabled on the SB input binding.

This feature for Azure Functions to use SB sessions only came GA as of mid 2019. Enabling sessions on the Azure Function places a lock on all messages that have the same session Id causing the locked messages to be consumed by that one function instance that placed the lock.

Typical Scenario

A warehouse needs to track the progress of an order from when its first received to when it gets dispatched. Throughout each stage (Ordered, Picked, Packaged, Dispatched) of the ordering process, the status of the order must be updated. This involves placing a new message onto the service bus every time the order status needs to get updated. An Azure function will then pull the messages from the service bus and update the order status in a database where the customer can view the current state of their order.

To simulate the warehouse tracking system, a console app will be used to create messages for each status change (Ordered, Picked, Packaged, Dispatched), for several hundred orders. The session Id of each status message will be set to the order number. The app will then send the messages to a SB Topic where it will have two subscriptions, one with sessions enabled and the other disabled. This is so we can compare the ordering of messages being received with and without sessions enabled.

Order message generater
  1. class Program
  2.   {
  3.       private static string connectionString = ConfigurationManager.AppSettings[“ServiceBusConnectionString”];
  4.       private static string topicName = ConfigurationManager.AppSettings[“TopicName”];
  5.       private static int orders = 100;
  6.       private static int messagePerSession = 4;
  7.       static async Task Main(string[] args)
  8.       {
  9.           Console.WriteLine(“Creating Service Bus sender….”);
  10.           var taskList = new List<Task>();
  11.           var sender = new MessageSender(connectionString, topicName);
  12.           //create an order
  13.           for (int order = 0; order < orders; order++)
  14.           {
  15.               var orderNumber = $”OrderId-{order.ToString()};
  16.               var messageList = new List<Message>();
  17.               //simulate a status update in the correct order
  18.               for (int m = 0; m < messagePerSession; m++)
  19.               {
  20.                   var status = string.Empty;
  21.                   switch (m)
  22.                   {
  23.                       case 0 :
  24.                           status = “1 – Ordered”;
  25.                           break;
  26.                       case 1:
  27.                           status = “2 – Picked”;
  28.                           break;
  29.                       case 2:
  30.                           status = “3 – Packaged”;
  31.                           break;
  32.                       case 3:
  33.                           status = “4 – Dispatched”;
  34.                           break;
  35.                   }
  36.                   var message = new Message(Encoding.UTF8.GetBytes($”Status-{status}))
  37.                   {
  38.                       //set the service bus SessionId property to the current order number
  39.                       SessionId = orderNumber
  40.                   };
  41.                   messageList.Add(message);
  42.               }
  43.               //send the list of status update messages for the order to the service bus
  44.               taskList.Add(sender.SendAsync(messageList));
  45.           }
  46.           Console.WriteLine(“Sending all messages…”);
  47.           await Task.WhenAll(taskList);
  48.           Console.WriteLine(“All messages sent.”);
  49.       }
  50.   }

Two Azure functions will be created, where one has sessions enabled and the other disabled. The functions will have a random delay created from 1 to 10 seconds to simulate some business logic which may be calling out to an external service before updating the order status. Instead of the function writing to a database, each status update message received will be written to an Azure Table storage to create an audit log of when a status update message was processed.

Below is the source code for the function which will process the messages on the service bus using sessions. Note the IsSessionEnabled property is set to true on the ServiceBusTrigger input binding. The randomiser is to simulate some business logic that could vary in time to process a message.

Azure Function using sessions
  1. public static class MsgOrderingSessions
  2.     {
  3.         [FunctionName(“MsgOrderingSessions”)]
  4.         [return: Table(“OrdersSession”, Connection = “StorageConnectionAppSetting”)]
  5.         public static OrderEntity Run([ServiceBusTrigger(“orders”, “OrdersSession”, Connection = “SbConnStr”, IsSessionsEnabled = true)]
  6.               Message sbMesssage, ILogger log)
  7.         {
  8.             log.LogInformation($”C# ServiceBus topic trigger function processed message: {Encoding.UTF8.GetString(sbMesssage.Body)});
  9.             Random random = new Random();
  10.             int randNumb = random.Next(1000, 10000);
  11.             System.Threading.Thread.Sleep(randNumb);
  12.             return new OrderEntity { PartitionKey = $”{sbMesssage.SessionId} – {DateTime.Now.Ticks} , RowKey = Guid.NewGuid().ToString(), Text = Encoding.UTF8.GetString(sbMesssage.Body) };
  13.         }
  14.     }

Below is the source code for the function which does not use sessions. Here the IsSessionEnabled is set to false.

Azure Function no sessions
  1. public static class MsgOrderingNoSession
  2.     {
  3.         [FunctionName(“MsgOrderingNoSessions”)]
  4.         [return: Table(“OrdersNoSession”, Connection = “StorageConnectionAppSetting”)]
  5.         public static OrderEntity Run([ServiceBusTrigger(“orders”, “OrdersNoSession”, Connection = “SbConnStr”, IsSessionsEnabled = false)]
  6.               Message sbMesssage, ILogger log)
  7.         {
  8.             log.LogInformation($”C# ServiceBus topic trigger function processed message: {Encoding.UTF8.GetString(sbMesssage.Body)});
  9.             Random random = new Random();
  10.             int randNumb = random.Next(1000, 10000);
  11.             System.Threading.Thread.Sleep(randNumb);
  12.             return new OrderEntity { PartitionKey = $”{sbMesssage.SessionId} – {DateTime.Now.Ticks}, RowKey = Guid.NewGuid().ToString(), Text = Encoding.UTF8.GetString(sbMesssage.Body) };
  13.         }       
  14.     }

Below is the settings for the service bus topic which has 2 subscriptions and one of them has Requires Session checked.

image

Running the console app creates 400 messages on both subscriptions, 4 status update messages per 1 order.

image

Conclusion

The Azure function which had the ServiceBusTrigger, IsSessionsEnabled = false inserted the rows out of order due to multiple function instances competing for the next message on the service bus.

image

Now the Azure Function which had IsSessionsEnabled = true and read messages from a service bus subscription which also had the Requires Session flag enabled, the messages were processed in the correct sequence as they were placed onto the service bus.

image

When using sessions, there is a slight performance hit depending on the number of function instances executing. In this example both functions where running under the consumption plan which spun up 6 instances. As you can see the number of messages waiting on each of the subscriptions below, the subscription which had sessions disabled are processing the messages a lot faster.

When sessions are used, each function instance places a locked on all messages having the same session Id which are processed one after another. As there were only 6 instances available, only a maximum of six orders could be processed at one time.

image

Enjoy…

Using an Azure APIM Scatter-Gather policy with a Mapping Function

This is a follow up from a previous blog “Azure APIM Scatter-Gather Pattern Policy” where I wrote about using the Wait policy to create a scatter-gather pattern.  A few colleges were asking about being able to map the inbound request to the different schemas required by each of the Microservices.

A high level design is shown below using two “Wait” polices and an Azure Function for the mapping. The first policy ‘Translation’ sends the request to the mapping function and when completed the second policy ‘Scatter’ is executed to send the mapped request to each of the Microservices.

image

The internals of the Azure Function that maps the incoming request is shown below as an example. Here I am using a route on the supplier name and a simple If-Then statement to select which static translator method to call.

public static class MappingService
{
[FunctionName("DataMapping")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "Pricing/{suppliername}")]
HttpRequest req,
string suppliername,
ILogger log)
{
log.LogInformation("C# HTTP trigger function processed a request.");

string requestBody = await new StreamReader(req.Body).ReadToEndAsync();
dynamic response = null;
if (suppliername.Equals("Supplier1", StringComparison.OrdinalIgnoreCase))
response = Translator.TranslateSupplier1(requestBody);
else if (suppliername.Equals("Supplier2", StringComparison.OrdinalIgnoreCase))
response = Translator.TranslateSupplier2(requestBody);

return response != null
? (ActionResult)new OkObjectResult(response)
: new BadRequestObjectResult("Invalid message.");
}
}

Below is the code for the Translation Policy which calls the two Azure function resources in parallel that accepts the inbound price request message. The function access code is stored as a named-value pair in APIM called “functionapkey”.

<!-- Call the mapping service -->
<wait for="all">
<send-request mode="copy" response-variable-name="res_SupplierMap1" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier1?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="res_SupplierMap2" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier2?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>
 

The code for the Scatter Policy is shown below which is similar to the original blog post. However it uses the mapped outputs from the Translation policy that are stored in the res_SupplerMap1 and res_SupplerMap2 context variables instead.

<!-- Call the pricing services -->
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap1"]).Body.As<string>())</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap2"]).Body.As<string>())</set-body>
</send-request>
</wait>

The last policy checks the status of each of the pricing services and returns the results as a composite message if there were no errors encountered. This is similar to the original blog post but instead of returning a JObject I am now returning a JArray collection.

<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@{
JArray suppliers = new JArray();
suppliers.Add(((IResponse)context.Variables["response_1"]).Body.As<JObject>());
suppliers.Add(((IResponse)context.Variables["response_2"]).Body.As<JObject>());
return suppliers.ToString();
}</set-body>
</return-response>
</otherwise>
</choose>

The completed policy for the method looks like this below. Take note the request payload is stored in the variable named “requestPayload” initially to avoid locking body context in the <Wait> policies.

<policies>
<inbound>
<set-variable name="requestPayload" value="@(context.Request.Body.As&lt;string>(true))" />
<!-- Call the mapping service -->
<wait for="all">
<send-request mode="copy" response-variable-name="res_SupplierMap1" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier1?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="res_SupplierMap2" timeout="120" ignore-error="false">
<set-url>https://mapsvc.azurewebsites.net/api/Pricing/supplier2?code={{funcmapkey}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>
<!-- Call the pricing services -->
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap1"]).Body.As<string>())</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(((IResponse)context.Variables["res_SupplierMap2"]).Body.As<string>())</set-body>
</send-request>
</wait>
<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@{
JArray suppliers = new JArray();
suppliers.Add(((IResponse)context.Variables["response_1"]).Body.As<JObject>());
suppliers.Add(((IResponse)context.Variables["response_2"]).Body.As<JObject>());
return suppliers.ToString();
}</set-body>
</return-response>
</otherwise>
</choose>
</inbound>
<backend>
<base />
</backend>
<outbound>
<base />
</outbound>
<on-error>
<base />
</on-error>
</policies>

Using the tracing feature in APIM, you can see the initial price request message below. This will be sent to the mapping function Pricing method.

 image

Below is the trace output from APIM showing the two different messages returned from mapping function and assigned to the res_Supplier variables.

image

Below is the composite message returned from APIM as an JSON array containing each supplier.

image

In conclusion, using the two <Wait> polices to send multiple requests in parallel yields a request/response latency of around 200ms on average in this scenario. Also instead of using an Azure Function to map the incoming request, you could replace it with a couple of “Transformation” policies.

Enjoy…

Azure APIM Scatter-Gather Pattern Policy

Recently I have been reading about some of the advanced policies available in APIM and the one that caught my eye was the “Wait” policy found here https://docs.microsoft.com/en-us/azure/api-management/api-management-advanced-policies. This policy will execute child policies in parallel and will wait until either one or all of the child polices have completed before continuing. 

I was pondering on a good use case for this policy and the first one came to mind was a scatter-gather pattern shown below. I could easily create this pattern without having to provision any other required Azure services.

 

image

The Scatter-Gather pattern is created by exposing a REST endpoint through APIM which will accept a JSON payload. When a request comes into APIM, a custom policy will forward this request onto the service endpoints defined in the policy and wait until they have all responded before continuing on with the rest of the statements in the parent policy. Once all the destination endpoints have responded, the return status code on each service is then checked for success or an error. If an error does occur, then the error is returned immediately otherwise the message from each service is combined into a composite JSON message which is returned back to the caller as the response message.

For the scenario, I will be sending a price request for a product to 3 suppliers simultaneously. In Azure APIM, I created a method which will accept a JSON price request message as shown below.

image

 

Then I added an Inbound processing policy to that method which is broken down into two parts. Below is the first part of the policy where it sends the inbound request to multiple endpoints in parallel using the <send-request> tag. Named values are used for the service endpoint address defined in the <set-url>{{serviceendpoint_xxx}}</set-url> tags. The response from each service call is inserted into the variable response-variable-name=”response_xxx” which is used in the second part of this policy.

<set-variable name="requestPayload" value="@(context.Request.Body.As&lt;string>(true))" />
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_3" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_3}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>

After the closing </wait> tag,  the response code from each of the services is checked inside a <choose> block. If its not equal to 200 then the response is immediately returned, otherwise each of the responses is wrap inside a JSON object and returned as the composite response message.
 
<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_3"]).StatusCode != 200)">
<return-response response-variable-name="response_3" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@(new JObject(
new JProperty("service_1",((IResponse)context.Variables["response_1"]).Body.As<JObject>()),
new JProperty("service_2",((IResponse)context.Variables["response_2"]).Body.As<JObject>()),
new JProperty("service_3",((IResponse)context.Variables["response_3"]).Body.As<JObject>())
).ToString())</set-body>
</return-response>
</otherwise>
</choose>
 
Below is the whole policy for the method. More service endpoints may be added or removed from this policy as desired.
<policies>
<inbound>
<set-variable name="requestPayload" value="@(context.Request.Body.As&lt;string>(true))" />
<wait for="all">
<send-request mode="copy" response-variable-name="response_1" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_1}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_2" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_2}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
<send-request mode="copy" response-variable-name="response_3" timeout="120" ignore-error="false">
<set-url>{{serviceendpoint_3}}</set-url>
<set-method>POST</set-method>
<set-body>@(context.Variables.GetValueOrDefault<string>("requestPayload"))</set-body>
</send-request>
</wait>
<choose>
<when condition="@((int)((IResponse)context.Variables["response_1"]).StatusCode != 200)">
<return-response response-variable-name="response_1" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_2"]).StatusCode != 200)">
<return-response response-variable-name="response_2" />
</when>
<when condition="@((int)((IResponse)context.Variables["response_3"]).StatusCode != 200)">
<return-response response-variable-name="response_3" />
</when>
<otherwise>
<return-response>
<set-status code="200" reason="OK" />
<set-header name="Content-Type" exists-action="override">
<value>application/json</value>
</set-header>
<set-body>@(new JObject(
new JProperty("service_1",((IResponse)context.Variables["response_1"]).Body.As<JObject>()),
new JProperty("service_2",((IResponse)context.Variables["response_2"]).Body.As<JObject>()),
new JProperty("service_3",((IResponse)context.Variables["response_3"]).Body.As<JObject>())
).ToString())</set-body>
</return-response>
</otherwise>
</choose>
</inbound>
<backend>
<base />
</backend>
<outbound>
<base />
</outbound>
<on-error>
<base />
</on-error>
</policies>

 
For testing purposes, I just created 3 simple logic apps triggered by a HTTP request which returned some static data. Using Postman, I sent the following message.
image
 
The response message from APIM is shown below.
 
image
 
Enjoy…

ARM Template for Provisioning an Azure Function App with a Key Vault

Late in 2018, Microsoft announced you can now store sensitive application setting values in an App Service to an Azure Key Vault without any changes to the function code. The only requirement was to update the value settings with @Microsoft.KeyVault(SecretUri=secret_uri_with_version)” to reference the Key Vault and enabling an identity account of the App Service to access the Key Vault.

This is a great leap forward having this feature baked into an App Service, however trying to create an ARM template to provision an App Service, Storage Account and a Key Vault by following these instructions https://docs.microsoft.com/en-us/azure/app-service/app-service-key-vault-references#azure-resource-manager-deployment proved to be rather challenging. After several hours of getting the versions correct and getting the dependencies in the correct order, I managed to create a working template to provision all 3 artefacts.

The template creates the App Service and uses the system-assigned managed identity account to access the Key Vault with Get only permissions. The primary key for the Storage Account and the Application Insights key are stored in Key Vault. These are then referenced by the AzureWebJobsStorage, WEBSITE_CONTENTAZUREFILECONNECTIONSTRING and APPINSIGHTS_INSTRUMENTATIONKEY names in the application settings of the Function App Service.

Updating the parameters file with the required artefact names and deploying the template, provisions the following services in Azure. Only thing left is to deploy your Function into the newly provisioned App Service.

image

If you try and view the Secrets in Key Vault, you will encounter an authorisation error shown below.  If you wish, you may update the ARM template to add your ID to the access policies collection of the Key Vault.

image

To add your account using the Azure Portal, navigate to Access Policies and then click Add new. Notice the App Svc account has already been added by the ARM template.

image

Then click on Select principal and type in you login address into the Principle blade to find your name and then click the select button.

image

Once your login name has been added, you can then select the required permissions.

image

Now you can view the keys added by the ARM template.

image

Below are the Application settings under the App Service showing references to the Key Vault for the values.

image

The code for the ARM Template can be downloaded from here: https://github.com/connectedcircuits/azappsvc. I have added comments into the template so you should be able to modify it to suit your requirements.

Enjoy…

Always subscribe to Dead-lettered messages when using an Azure Service Bus

I typically come across solutions that incorporate a service bus where monitoring of the dead-letter queues are simply ignored. Upon questioning the reason, the normal excuse for not monitoring the DLQ (dead-letter queue) is quote “I don’t require it because I have added exception handling therefore no messages will end up on the DLQ”. From experience there will be scenarios where the exception logic did not capture an edge case scenario and the message will end up onto the DLQ without anyone knowing about. 

A simple solution is to have a single process to monitor all the DQL messages and raise an alert when one occurs. Below is one of my building blocks which a typically incorporate when there is a service bus involved in a solution.

image

To use the DLQ building block to centrally capture any DLQ message, simply set the “ForwardDeadLetteredMessagesTo” property on each of the messaging queues to a common DLQ handler queue as shown below.

image

Now when a message gets dead lettered, it will end up in this common queue which is monitored by an Azure Function. Note the current NuGet version v3.0.4 of the ServiceBus DLL has the DeadLetterSource property and is not currently available in the Logic App Service Bus connector. The function writes the DLQ meta data, any custom properties and the message payload to a blobstore file. By using an Azure Storage Account V2 for the blobstore, a new blob creation event will be fired to any interested subscribers which in this case is a Logic App.

Azure Function Code

Below is the code for the Azure Function. Here I am using the IBinder interface to allow me to set the folder path and file name imperatively. The connection strings (ServiceBusConn, StorageAccountConn) are defined in the Application settings of the  Azure App Service.

Code Snippet
  1. using Microsoft.Azure.ServiceBus;
  2. using Microsoft.Azure.WebJobs;
  3. using Microsoft.Extensions.Logging;
  4. using Newtonsoft.Json;
  5. using System;
  6. using System.Collections.Generic;
  7. using System.IO;
  8. using System.Threading.Tasks;
  9.  
  10. namespace ServiceBusDLQMonitoring
  11. {
  12.     public static class Function1
  13.     {
  14.         [FunctionName("Function1")]
  15.  
  16.         public static async Task RunAsync([ServiceBusTrigger("dlq-processor", Connection = "ServiceBusConn")] Message dlqQueue,
  17.             Binder blobBinder,          
  18.             ILogger log)
  19.         {
  20.             log.LogInformation($"C# ServiceBus queue trigger function processed message: {dlqQueue.MessageId}");
  21.  
  22.             //set the filename and blob path
  23.             var blobFile = Guid.NewGuid().ToString() + ".json";
  24.             var path = string.Concat("dlq/",DateTime.UtcNow.ToString("yyyy/MM/dd"),"/" ,dlqQueue.SystemProperties.DeadLetterSource,"/", blobFile);
  25.  
  26.             var dlqMsq = new DLQMessage
  27.             {
  28.                 DeadletterSource = dlqQueue.SystemProperties.DeadLetterSource,
  29.                 MessageId = dlqQueue.MessageId,
  30.                 SequenceNumber = dlqQueue.SystemProperties.SequenceNumber,
  31.                 SessionId = dlqQueue.SessionId,
  32.                 UserProperties = dlqQueue.UserProperties,
  33.                 DeadLetterReason = dlqQueue.UserProperties["DeadLetterReason"].ToString(),
  34.                 EnqueuedDttmUTC = dlqQueue.SystemProperties.EnqueuedTimeUtc,
  35.                 ContentType = dlqQueue.ContentType,
  36.                 DeliveryCount = dlqQueue.SystemProperties.DeliveryCount,
  37.                 Label = dlqQueue.Label,
  38.                 MsgBase64Encoded = Convert.ToBase64String(dlqQueue.Body, Base64FormattingOptions.None)
  39.             };
  40.  
  41.             var blobAttributes = new Attribute[]
  42.             {
  43.                 new BlobAttribute(path),
  44.                 new StorageAccountAttribute("StorageAccountConn")                
  45.             };
  46.  
  47.             using (var writer = await blobBinder.BindAsync<TextWriter>(blobAttributes))
  48.             {              
  49.               writer.Write(JsonConvert.SerializeObject(dlqMsq,Formatting.Indented));
  50.             }
  51.         }
  52.     }
  53.  
  54.     public class DLQMessage
  55.     {       
  56.         public string DeadletterSource { get; set; }
  57.         public long SequenceNumber { get; set; }
  58.         public string MessageId { get; set; }
  59.         public string SessionId { get; set; }
  60.         public string Label { get; set; }
  61.         public string DeadLetterReason { get; set; }
  62.         public DateTime EnqueuedDttmUTC { get; set; }
  63.         public int DeliveryCount { get; set; }        
  64.         public string ContentType { get; set; }
  65.         public IDictionary<string,object> UserProperties { get; set; }
  66.         public string MsgBase64Encoded { get; set; }
  67.     }
  68. }

 

Logic App Implementation

A Logic App is used to the retrieve the message from the blob store when it is triggered by the EventGrid  HTTP webhook. The basic workflow is shown below and can be expanded to suit your own requirements.

image

The expression for the ‘Get blob content using path’ action is @{replace(triggerBody()[0][‘data’][‘url’],’https://dqlmessages.blob.core.windows.net/’,”)}. Here I am just replacing the Domain name with an empty string as I only want the resource location.

The Parse JSON action has the following schema. This makes it easier to reference the properties downstream.

{

    "properties": {

        "ContentType": {

            "type": [

                "string",

                "null"

            ]

        },

        "DeadLetterReason": {

            "type": "string"

        },

        "DeadletterSource": {

            "type": "string"

        },

        "DeliveryCount": {

            "type": "integer"

        },

        "EnqueuedDttmUTC": {

            "type": "string"

        },

        "Label": {

            "type": [

                "string",

                "null"

            ]

        },

        "MessageId": {

            "type": "string"

        },

        "MsgBase64Encoded": {

            "type": [

                "string",

                "null"

            ]

        },

        "SequenceNumber": {

            "type": "integer"

        },

        "SessionId": {

            "type": [

                "string",

                "null"

            ]

        },

        "UserProperties": {

            "type": "any"

        }

    },

    "type": "object"

}

 

The last action ‘Set variable MsgBody’  has the value set to: “@{base64ToString(body(‘Parse_JSON’)?[‘MsgBase64Encoded’])}”

Blob creation event configuration

Next is to setup a subscription to the Blob creation event. Click on Events under the Storage account for the DLQ messages as shown below.

image

Then click on the +Event Subscription to add a new subscription.

image

Setup the subscription Basic details with a suitable subscription name and the blob storage account resource name. Uncheck the ‘Subscribe to all event types’ and select Blob Created event. Set the endpoint details to Web Hook and the URL of the Logic App Http trigger endpoint address.

image

Under Filters, enable subject filtering. Add the following prefix ‘/blobServices/default/containers/’ to the name of the DLQ container (in this example its called ‘dlq’) and add it to the ‘Subject Begins With’ textbox.  Then in the ‘Subject Ends With’, set it to the filename extension ‘.json’. Now click the Create button at the bottom of the page to create the subscription.

image

Sample Logic App output

Once everything is wired up and if there are any messages that have been placed onto the DLQ, you should see some logic app runs. Below is an example of the outputs from the last two actions.

image

All you need to do now is extend the Logic App to deliver the DLQ alert to somewhere.

Enjoy…

Providing DR By Using An Azure Service Bus In Two Regions

This is a Disaster Recovery option I used on one of my projects. A client required a DR solution which guaranteed not to lose any messages during the DR failover process. In essence, no messages were allowed to be lost in mid flight whilst the DR process was taking place.

Solution

The approach I took was to use an Azure Service Bus  in a primary Azure region and another service bus in a secondary Azure  DR region. The publisher would then send the same message to both service bus endpoints. For my solution I used Azure APIM to post the same message to both primary and secondary regions, thereby simplifying the code for the publisher. 

image

The primary region would simply process the messages as per normal using a Logic App to poll for new messages on the service bus. However in the DR region,  the equivalent Logic App would be set in a disabled state. The messages in the secondary region would remain on the bus until the Time-To-Live (TTL) threshold is reached before either being moved onto the Deadletter queue, or dropped from the queue altogether by setting the EnableDeadLetteringOnMessageExpiration property to false. Using this technique provides automatic pruning of the older messages that would have definitely been processed by the primary region before failing over.

The value chosen for the TTL property is determined by how long it would take to failover to the DR region and the time to detect an issue being realised in the primary region.

Failing over would simply involve enable the Logic App the DR region and failing back would involve disabling this Logic App again. 

Pros

  • No messages are lost during the failover process.
  • Low monetary transaction cost by the DR environment as no Logic Apps are being triggered during the normal process flow through the primary region.
  • Simplistic DR design which involves just another another queue.
  • Simple failover process.

Cons

  • There is a delay before any new messages would be processed while  the older messages on the service bus are reprocessed first.
  • The backend system processing the messages must be idempotent, meaning the same message maybe processed by the backend system multiple times and produce the same outcome.
  • Requires the publisher to send the same message to 2 different endpoints. This may be mitigated by using APIM to manage sending the messages to both service bus endpoints.

Enjoy…

Using Azure Logic App and an Automation Runbook to execute a long running SQL Stored Procedure

When trying to execute a long running SQL query using the Logic App SQL Connector, you will most likely hit the 2 minute execution timeout limit on the connector.

Even if you move the SQL query to an Azure function and use ADO.Net to access the backend database,  you will hit the 10 minute execution timeout limit of the function when using the consumption plan.

An Alternative Approach

The solution here is to use the Azure Automation service to execute a PowerShell script which calls a SQL query. There is now a new connector available in Logic Apps to execute an Automation Runbook which you have the option to wait until it completes before moving onto the next action shape in the workflow. 

Implementation

First find and add an Automation Account to your Azure resource group. Set the Create Azure Run As account to No.

image

Once the Automation Account has been created, click on the Credentials on the left menu pane to store the SQL Server login credentials.

image

Then add the SQL username and password and click Create. We will be using this as the Credential parameter in our PowerShell script.

image

Now click on Runbooks to create a new runbook. This is where we add a PowerShell script to execute.

image

Add a name for the Runbook and ensure the Runbook type is set to PowerShell Workflow, then click the Create button at the bottom of the page.

image

After the Runbook has been created, click on the Edit icon to add the PowerShell script.

image

Inside the PowerShell editor,  paste the following code. Basically we are using  ADO.Net object to create a SQL connection and to execute the stored procedure. To ensure the command object does not timeout, set the CommandTimeout = 0.

workflow ExecuteSQLStoredProcedure

{

    param(

        [parameter(Mandatory=$True)]

        [string] $SqlServer,

        

        [parameter(Mandatory=$False)]

        [int] $SqlServerPort=1433,

        

        [parameter(Mandatory=$True)]

        [string] $Database,

        

        [parameter(Mandatory=$True)]

        [string] $Sproc,

        

        [parameter(Mandatory=$True)]

        [PSCredential] $SqlCredential

    )

 

    # Get the username and password from the SQL Credential

    $SqlUsername = $SqlCredential.UserName

    $SqlPass = $SqlCredential.GetNetworkCredential().Password

    

    inlinescript {

        # Define the connection to the SQL Database

        $SQLConn = New-Object System.Data.SqlClient.SqlConnection("Server=tcp:$using:SqlServer,$using:SqlServerPort;Database=$using:Database;User ID=$using:SqlUsername;Password=$using:SqlPass;Trusted_Connection=False;Encrypt=True;Connection Timeout=30;")

        

        # Open the SQL connection

        $SQLConn.Open()

 

        # Define the SQL command to run. 

        $Cmd =  New-Object System.Data.SQLClient.SqlCommand 

        $Cmd.Connection = $SQLConn 

        $Cmd.CommandTimeout=0

        $Cmd.CommandType = [System.Data.CommandType]::StoredProcedure

        $Cmd.CommandText = $using:Sproc

 

        $Cmd.ExecuteNonQuery()

 

        # Close the SQL connection

        $SQLConn.Close()

    }

}

After the code has been pasted into the editor, save and remember to  publish it after it has been saved.

 image

Now to add a Logic App to execute the Runbook. For this demo, I will simply use the scheduler connector to fire the Logic App. In the designer add a Recurrence trigger and for the next action, search for automation and select the Create Job Azure Automation.

image

The parameters defined in the PowerShell script will be shown in the properties page of the connector as shown below. I will be executing a stored procedure called “dbo.LongProcess” which has a WaitFor Delay ‘00:15’ T-SQL statement. This will simulate a long running query of 15 minutes.

image

To get the results of the SQL query add the Azure Automation Get job output shape after the Create job shape.

image

Testing

To confirm the Logic App and SQL query does not timeout, run the trigger manually. After a period of 15 minutes, which is the delay period defined in the stored procedure, the Logic App will complete as shown below.

image

The output of the stored procedure will be shown in the Get job output shape.

To return a recordset as JSON from the PowerShell script, replace the code $Cmd.ExecuteNonQuery() with the following:

$reader = $Cmd.ExecuteReader()

$table = new-object "System.Data.DataTable"

$table.Load($reader)

 

#--Exporting data to the screen --# 

$table | select $table.Columns.ColumnName | ConvertTo-Json

 
Note if you return a very large recordset, you may exceed the allocated memory,. In this scenario, you will need to batch the results.

Enjoy…

Exposing Azure Service Bus through APIM, generating a SAS Token and setting the Session Id

On one of my recent projects, a client application was required to place a message onto an Azure Service Bus by using a HTTP endpoint rather than using the Service Bus SDK and with the following constraints.

  • The client is unable to generate the Service Bus SAS token.
  • Service Bus Session Id needs to be set to the customer number found in the message to ensure ordered delivery by the consumer.
  • All  messages are to be sent to a Service Bus Topic and require to add a custom Service Bus property called ‘MgsType’.
  • Custom HTTP Request Headers may be used.

I decided upon a solution that uses Azure APIM to expose the Service Bus endpoint.  A custom policy will be used to generate the Service Bus SAS token and to parse the message for the customer Id. The customer Id will then be used to set the SessionId on the Service Bus. The client then only has to register for a subscription key in the Azure Developer portal and then pass this key with each HTTP request in the header.

image

The first step of this solution  is to create an Azure Service Bus with a Topic called ‘transactions’ and to create a Shared access policy called ‘Sender’ which has only Send claims.

image

Next add a subscription to the topic with ‘Require Session’ enabled and the following  rule “MsgType = ‘Deposits’”

image

Before we start developing the custom policies, we need to setup 3 Name Values in the APIM blade as shown below. These values are used for generating the Service Bus SAS token and are obtained from the Service Bus properties.

image

Where,

  • SB_Key – primary key for the ‘Sender’ shared access policies
  • SB_KeyName  – name of the Shared access policy
  • SB_Uri – is the topic URL which can be found by clicking on the topic name under the ‘Topics’ blade shown below.

image

Next is to create an API using the ‘Blank API’ template similar to what I have done below. Note the ‘Web service URL’ value is the base address of the Service Bus topic URL. (ie without the topic name resource location)

image

Then add an operation to the service as below. Note the URL should be the name of your topic with the resource ‘messages’ appended to the end.

image

Once the operation has been created, we can now add the custom policy to the ‘Inbound processing’ stage.

image

The APIM policy consists of several code blocks inside the inbound processing stage which are describe in detail below. To improve performance I will be caching the generated SAS token.

The  first code block looks up cache for a value. If nothing is found then the variable “cachedSasToken” is assigned to null as there is no default value specified.

<cache-lookup-value key="crmsbsas" variable-name="cachedSasToken" />

Next a control flow is used to check the variable “cachedSasToken” for null and if true then a SAS token is generated using the values stored in APIM Name-Value pairs. Once the token is  calculated then it is stored in cache and set to expire in 120 seconds. The cache is then read again to assign the variable “cachedSasToken” with the generated SAS token.

Both the signature and resource URL is required to be UrlEncoded. My first choice was to use the System.Web.UrlEncode function to encode the values. Unfortunately this function is not available in the APIM policy expressions as it only has a subset of the .Net Framework types. To work around this issue, I ended up using the System.Uri.EscapeDataString method instead.

<choose>

            <when condition="@(context.Variables.GetValueOrDefault&lt;string>("cachedSasToken") == null)">

                <cache-store-value key="crmsbsas" value="@{

                        string resourceUri = "{{SB_Uri}}";

                        string keyName = "{{SB_KeyName}}";

                        string key = "{{SB_Key}}";

                        TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);

                        var expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + 120);

                        string stringToSign = System.Uri.EscapeDataString(resourceUri) + "\n" + expiry;

                        HMACSHA256 hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));

                        var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));

                        var sasToken = String.Format("SharedAccessSignature sr={0}&amp;sig={1}&amp;se={2}&amp;skn={3}", 

                                        System.Uri.EscapeDataString(resourceUri),

                                        System.Uri.EscapeDataString(signature), expiry, keyName);

                        return sasToken;

                    }" duration="100" />

                <cache-lookup-value key="crmsbsas" variable-name="cachedSasToken" />

            </when>

        </choose>

The SAS token is then added to the header using the value from the variable ‘cachedSasToken’

<set-header name="Authorization" exists-action="override">

<value>@(context.Variables.GetValueOrDefault<string>("cachedSasToken"))</value>

</set-header>

I then set the message content type to application/json and remove the APIM subscription key header from being sent to the Service Bus endpoint.

<set-header name="Content-type" exists-action="override">

          <value>application/json</value>

      </set-header>

      <set-header name="Ocp-Apim-Subscription-Key" exists-action="delete" />

The last part is to extract the customer number from the message and assign it to the SessionId property of the Service Bus. The standard set of Service Bus properties are required to be added to  a custom header called ‘BrokerProperties’

<set-header name="BrokerProperties" exists-action="override">

            <value>@{

               string reqData = context.Request.Body?.As<string>(true);

               dynamic data =  JsonConvert.DeserializeObject(reqData);

               string order =  data?.CustomerNumber;

               return string.Format("{{\"SessionId\":\"{0}\"}}", order);

            }</value>

        </set-header>

The full code for the custom policy is here:

<!--

    IMPORTANT:

    - Policy elements can appear only within the <inbound>, <outbound>, <backend> section elements.

    - Only the <forward-request> policy element can appear within the <backend> section element.

    - To apply a policy to the incoming request (before it is forwarded to the backend service), place a corresponding policy element within the <inbound> section element.

    - To apply a policy to the outgoing response (before it is sent back to the caller), place a corresponding policy element within the <outbound> section element.

    - To add a policy position the cursor at the desired insertion point and click on the round button associated with the policy.

    - To remove a policy, delete the corresponding policy statement from the policy document.

    - Position the <base> element within a section element to inherit all policies from the corresponding section element in the enclosing scope.

    - Remove the <base> element to prevent inheriting policies from the corresponding section element in the enclosing scope.

    - Policies are applied in the order of their appearance, from the top down.

-->

<policies>

    <inbound>

        <base />

        <cache-lookup-value key="crmsbsas" variable-name="cachedSasToken" />

        <choose>

            <when condition="@(context.Variables.GetValueOrDefault&lt;string>("cachedSasToken") == null)">

                <cache-store-value key="crmsbsas" value="@{

                        string resourceUri = "{{SB_Uri}}";

                        string keyName = "{{SB_KeyName}}";

                        string key = "{{SB_Key}}";

                        TimeSpan sinceEpoch = DateTime.UtcNow - new DateTime(1970, 1, 1);

                        var expiry = Convert.ToString((int)sinceEpoch.TotalSeconds + 120);

                        string stringToSign = System.Uri.EscapeDataString(resourceUri) + "\n" + expiry;

                        HMACSHA256 hmac = new HMACSHA256(Encoding.UTF8.GetBytes(key));

                        var signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(stringToSign)));

                        var sasToken = String.Format("SharedAccessSignature sr={0}&amp;sig={1}&amp;se={2}&amp;skn={3}", 

                                        System.Uri.EscapeDataString(resourceUri),

                                        System.Uri.EscapeDataString(signature), expiry, keyName);

                        return sasToken;

                    }" duration="10" />

                <cache-lookup-value key="crmsbsas" variable-name="cachedSasToken" />

            </when>

        </choose>

        <set-header name="Authorization" exists-action="override">

            <value>@(context.Variables.GetValueOrDefault<string>("cachedSasToken"))</value>

        </set-header>

        <set-header name="Content-type" exists-action="override">

            <value>application/json</value>

        </set-header>

        <set-header name="Ocp-Apim-Subscription-Key" exists-action="delete" />

        <set-header name="BrokerProperties" exists-action="override">

            <value>@{

               string reqData = context.Request.Body?.As<string>(true);

               dynamic data =  JsonConvert.DeserializeObject(reqData);

               string order =  data?.CustomerNumber;

               return string.Format("{{\"SessionId\":\"{0}\"}}", order);

            }</value>

        </set-header>

    </inbound>

    <backend>

        <base />

    </backend>

    <outbound>

        <base />

    </outbound>

    <on-error>

        <base />

    </on-error>

</policies>

 

Now lets use Postman to send a message to the URL endpoint exposed by APIM to test the policy. The headers contain the APIM subscription key and a custom header for the ‘MsgType’ which will be used for the Service Bus subscription filter.

image

The message body simply contains the Customer number and the amount to deposit.

image

After posting the message to the APIM endpoint URL, we can see the message was successfully forwarded to the Service Bus by using Service Bus Explorer to view the message properties and content.

Notice the message custom properties has the MsgType and the ‘SessionId’ is populated with the customer number.

image

Enjoy…

Using Logic App Webhooks to execute long running SQL queries

On one of my projects, I had a requirement to send several large datasets as csv files to an FTP server using a Logic App.

I had thought about 2 options to implement this requirement. The first option was to use the Azure SQL Server API connector to extract the dataset from the database. But the amount of data to be serialised as JSON would certainly cause the SQL API connector to timeout or exceed the maximum payload size without some form of database paging and using paging would not meet the requirement of a single file.

The second option was to utilise the HTTP Webhook API connector available with Logic Apps to call an Azure Function passing the call-back URL and the database stored procedure to execute. The Azure Function would simply place the JSON message body onto an Azure Storage account queue and then return a HTTP response code 200 to put the logic app into a dehydrated state.

Another Azure Function would poll the queue for new messages. When a new message is received from the queue, it would query the database using the stored procedure name sent in the queue message.  A datareader is then used to read the returned recordset for speed and populate a DataTable. The rows in the DataTable are then converted into a csv file which is then written to a blob container in a streaming fashion. Once the function has completed creating the csv file, it would call-back to Logic App using the URL sent in the queue message which will be in a dehydrated state to continue with the workflow and send the file from the Blobstore to the FTP server.

Using the Webhook option meant the Logic App would go into a dehydrated state after calling the Azure Function and I would not be charged for any consumption time Smile  whilst in this state. This meant the Azure Function may take as long as required to create the csv file while executing under a consumption billing plan.

Below is a sequence diagram showing the activation of each process. The whole process is triggered by a scheduler inside a logic app.

image

Creating the Solution

I have broken the solution into 2 projects, one for the Azure Functions and the other for the Helper classes. I typically abstract all the business logic out from the Azure Functions and place them in a separate class library project were I can create unit tests for the business logic.

image

Defining the Helper Libraries

The first helper class is the AzureStorage which has one method defined to return a blob reference object and to create the blob container if it does not exist.

using Microsoft.WindowsAzure.Storage;

using Microsoft.WindowsAzure.Storage.Blob;

using System;

using System.Configuration;

using System.Threading.Tasks;


namespace Helpers

{

    public static class AzureStorage

    {

        public static async Task<CloudBlockBlob> CreateBlobAsync(string fileName, string containerName)

        {

            try

            {

                // Retrieve storage account from connection string.

                CloudStorageAccount storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["BlobStore"]);

                // Create the blob client.

                CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();


                CloudBlobContainer container = blobClient.GetContainerReference(containerName.ToLower());

                //firstly, we need check the container if exists or not. And if not, we need to create one.

                var creaeContainer = await container.CreateIfNotExistsAsync();


                // Retrieve reference to a blob

                return  container.GetBlockBlobReference(fileName.ToLower());

            }

            catch (Exception ex)

            {

                throw ex;

            }

        }

    }

}

The next class is the “DataAccess” for accessing the database using a datareader to load a DataTable which is then returned. Remember the default command timeout is 30 seconds, so this needs to be increased on the SQLCommand object.

Note this is just a simple implementation of reading the data. Ideally you would add transient fault handling code with retry logic built-in for production releases.

using System;

using System.Configuration;

using System.Data;

using System.Data.SqlClient;


namespace Helpers

{

    internal static class DataAccess

    {

        internal static DataTable CreateDataExtract(string sprocName)

        {

            var constr = ConfigurationManager.ConnectionStrings["SQLConnectionString"].ConnectionString;

            DataTable dataTable = null;

            try

            {

                dataTable = new DataTable();

                using (SqlConnection conn = new SqlConnection(constr))

                {

                    conn.Open();

                    SqlCommand cmd = new SqlCommand(sprocName, conn);

                    cmd.CommandType = CommandType.StoredProcedure;

                    cmd.CommandTimeout = 600;

                    dataTable.Load(cmd.ExecuteReader());

                }

            }

            catch(SqlException sqlEx)

            {

                //capture SQL errors

                throw sqlEx;

            }

            catch (Exception genEx)

            {

                //capture general errors

                throw genEx;

            }

            return dataTable;

        }

    }

}

The last helper class “DataExtract” is the entry point for the Azure Function. This calls the other helper methods to read the database and then converts the DataTable into a csv file by writing the output in a streaming manner to the blob container called “csv”

using System;

using System.Data;

using System.IO;

using System.Text;

using System.Threading.Tasks;


namespace Helpers

{

    public static class DataExtracts

    {


        public static async Task<string> GenerateCSVDataExtractAsync(string storedProc, string columnDelimiter = ",")

        {

            string filename;

            try

            {

                //call data acess 

                var dataTable = DataAccess.CreateDataExtract(storedProc);


                //save table as CSV to blobstore

                filename = Guid.NewGuid().ToString("D") + ".csv";

                await WriteDataTableToCSVAsync(dataTable, "csv", filename, columnDelimiter);


            }

            catch(Exception ex)

            {

                throw ex;

            }


            return filename;

        }


        private static async Task WriteDataTableToCSVAsync(DataTable dt, string containerName, string filename, string columnDelimiter)

        {

            if (dt != null)

            {

                using (var ms = new MemoryStream())

                using (var sw = new StreamWriter(ms, Encoding.UTF8))

                {

                    //create the header row       

                    for (int i = 0; i < dt.Columns.Count; i++)

                    {

                        sw.Write(dt.Columns[i].ColumnName);

                        sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                    }


                    //append the data rows

                    foreach (DataRow row in dt.Rows)

                    {

                        for (int i = 0; i < dt.Columns.Count; i++)

                        {

                            sw.Write(EscapeCSV(row[i].ToString()));

                            sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                        }

                    }

                    sw.Flush();

                    ms.Position = 0;


                    //write to blobstore

                    var blob = await BlobStore.CreateBlobAsync(filename, containerName);

                    await blob.UploadFromStreamAsync(ms);

                }

            }

        }


        private static  string EscapeCSV(string colData)

        {

            string quote = "\"";

            string escapedQuote = "\"\"";

            char[] quotedCharacters = new char[] { ',', '"', '\r', '\n', '\t' };



            if (colData == null) return "";

            if (colData.Contains(quote)) colData = colData.Replace(quote, escapedQuote);

            if (colData.IndexOfAny(quotedCharacters) > 1)

                colData = quote + colData + quote;


            return colData;

        }


    }


}


Defining the Azure Functions

The next project called “LongRunningJobs” contains the 2 Azure Functions required by the solution.

Below is the code for the function “SQLJobRequest” which is called by the Logic App. This simply puts the posted JSON request message onto an Azure Storage Queue using an output parameter of type Queue which simplifies the coding to a few lines of code.

using System.Net;

using System.Net.Http;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Extensions.Http;

using Microsoft.Azure.WebJobs.Host;


namespace LongRunningJobs

{


    public static class SQLJobRequest

    {

        [FunctionName("SQLJobRequest")]

        public static async Task<HttpResponseMessage> Run(

                                    [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]HttpRequestMessage req,

                                    [Queue("sqljobs", Connection = "BlobStore")]IAsyncCollector<string> outputQueueItem,

                                    TraceWriter log )

        {

            log.Info("C# HTTP trigger function processed a request.");

            var isParamsValid = false;


            // Get request body

            dynamic data = await req.Content.ReadAsAsync<object>();

            string sprocName = data?.SprocName;

            string callBackUrl = data?.CallBackUrl;


            if (!string.IsNullOrEmpty(sprocName) && !string.IsNullOrEmpty(callBackUrl))

            {

                isParamsValid = true;

                await outputQueueItem.AddAsync(data.ToString());

            }



            return isParamsValid == false

                ? req.CreateResponse(HttpStatusCode.BadRequest, "Please pass the sproc name and callback url.")

                : req.CreateResponse(HttpStatusCode.OK);

        }

    }

}

The line [Queue(“<name-of-queue>“, Connection = “<blob-connection-string>“)]IAsyncCollector<string> outputQueueItem in the function parameter list, uses output attributes to define the name of the queue and the name of the blob connection string to use.

The other function “SQLJobExecute” polls the Storage Queue “sqljobs” for new messages. When a new message is received, it calls the helper class method “DataExtracts.GenerateCSVDataExtractAsync(sprocName)” to create the csv file. If the csv file was created successfully, the filename of the csv file is returned in the HTTP Call-back Post body.

using System;

using System.Net.Http;

using System.Net.Http.Headers;

using System.Text;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Host;

using Newtonsoft.Json;


namespace LongRunningJobs

{

    public static class SQLJobExecute

    {

        [FunctionName("SQLJobExecute")]

        public static void Run([QueueTrigger("sqljobs", Connection = "BlobStore")]string queueItem, TraceWriter log)

        {

            log.Info($"C# Queue trigger function processed: {queueItem}");


            dynamic data = JsonConvert.DeserializeObject(queueItem);


            string sprocName = data.SprocName;

            string callBackUrl = data.CallBackUrl;

            string logicappRunId = data.RunId;


            //check if valid parameters were passed.

            if(string.IsNullOrEmpty(sprocName) || string.IsNullOrEmpty(callBackUrl) || string.IsNullOrEmpty(logicappRunId))

                log.Error("Null value parameters passed.");

            else

                Task.Run(async() => { await ExecuteQueryAsync(callBackUrl, sprocName, logicappRunId, log);});


        }


        private static async Task ExecuteQueryAsync(string callBackUrl, string sprocName, string logicAppRunId, TraceWriter log)

        {

            string blobFilename = string.Empty;

            try

            {

                //call the helper class to create the csv file.

                blobFilename = await DataExtracts.GenerateCSVDataExtractAsync(sprocName);

            }

            catch (Exception ex)

            {

                log.Error(ex.Message, ex);

            }


            try

            {

                //call-back to the Logic App Webhook using the URL passed in the message from the queue.

                using (var httpClient = new HttpClient())

                {

                    var postUrl = new Uri(callBackUrl);

                    httpClient.DefaultRequestHeaders.Accept.Clear();

                    httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

                    var content = new StringContent("{\"SprocName\":\"" + sprocName + "\",\"Filename\":\"" + blobFilename + "\"}", Encoding.UTF8);

                    var response = await httpClient.PostAsync(postUrl, content);

                    response.EnsureSuccessStatusCode();

                }

            }

            catch (Exception ex)

            {

                log.Error(string.Format("Error occurred when executing function for logic app runId:{0}\n{1}", logicAppRunId, ex.Message), ex);

            }


        }

    }

}

Below is the local.settings.json file which has the following values defined for the Blob Store and SQL Database.

image

After the Azure Functions have been deployed to an Azure App Service, the Application Settings for the Blob Store and SQL Server settings highlighted below need to be added.

image

Logic App Definition

Below are Azure resources used for this solution. The Azure Function are deployed to the “dalfunctions” App Service and “dataextracts” Storage account contains the Blob Store for the csv files and the storage queue “sqljobs”.

image

The Logic App is a very simple workflow which is triggered by a scheduler.

image

The HTTP Webhook shape is what calls the function endpoint “SQLJobRequest” and then puts the Logic App into a dehydrated state.

image

The Subscribe-URI can be obtained from the App Service after the Azure Functions have been deployed and using the “Get function URL” link below.

image

The Subscribe-Body property consists of a JSON payload.  This includes the call-back Url, Logic App run Id for debugging latter if required and the SQL stored procedure name to call in the database. Below is the code view for the HTTP Webhook action.

image

After the HTTP Webhook shape is the JSON Parser and a condition to check if a filename was returned in the webhook request body.

image

The schema for the JSON Parser checks for the FileName and the SprocName.

image

The expression for the condition is below:

image

The “True” condition gets the csv file from the Blob Store and sends it to the FTP server. Note we have to prefix the filename with the Blob container name. Once the file has been sent, it is deleted from the Blob Store.

image

If no filename was received when the Logic App comes out of its dehydrated state, the workflow is terminated.

image

Sample runs

As you can see from the sample runs below, some are taking over 6 minutes to complete which would normally cause a HTTP Function action to timeout.image

Conclusion

The solution can easily be modified to pass a list of stored procedures to create several csv files in one call and then have the call-back function pass the list of filenames to send via FTP.

This pattern could also be used for other scenarios that require an Azure Function to execute some long running process which would cause the normal HTTP Function action to timeout while waiting for the response message.

Enjoy…

Using Azure APIM Policies to Route on HTTP Verbs & Resources

The policies available in APIM are indeed very powerful. It provides the ability to modify the backend flow of a request using rules based on the payload contents or the request context properties. I have yet to find a scenario where I could not resolve a requirement using the inbuilt statements.

One example is I had to send a request to different endpoints depending on the request verb (Post,Put,Delete) and the resource location.

image

A policy inspects the incoming request Url and the operation method to determine which Azure Logic App or Function to forward the request onto.

The inbound policy section for this scenario is shown below. It uses the Xslt Choose function to match on the Url path and the operation. If a match is found, it sets the variable “logicappEndpoint” to the name-value variable defined in the Azure APIM blade.

<inbound>

    <choose>

        <when condition="@(context.Request.Url.Path.Equals("inventory")  && context.Operation.Method.Equals("PUT"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXProductUpdateUrl}}" />

        </when>

        <when condition="@(context.Request.Url.Path.Equals("inventory")  && context.Operation.Method.Equals("POST"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXProductCreateUrl}}" />

        </when>

        <when condition="@(context.Request.Url.Path.Equals("inventory") && context.Operation.Method.Equals("DELETE"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXProductDeleteUrl}}" />

        </when>

        <when condition="@(context.Request.Url.Path.Equals("stockmovement") && context.Operation.Method.Equals("PUT"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXStockMovementUpdateUrl}}" />

        </when>

        <when condition="@(context.Request.Url.Path.Equals("stockmovement") && context.Operation.Method.Equals("POST"))">

            <set-variable name="logicappEndpoint" value="{{UAT-AXStockMovementCreateUrl}}" />

        </when>        

    </choose>

    <base />

</inbound>

To reference the Named-Value properties defined in the portal from within a policy, simply wrap the property name with double braces as shown here as an example:-  {{UAT-AXStockMovementCreateUrl}}

These Name-Values pairs are then created in APIM blade of the Azure Portal shown below.

image

Once the Inbound policy section has been created, we then need to create the rule for the Backend section shown below. Here I am also setting a retry condition to 3 times before sending back a status error code of 500.  Also I am always forwarding the request to the Logic App as a POST.

<backend>

    <retry condition="@(context.Response.StatusCode == 500)" count="3" interval="5" max-interval="10" delta="2" first-fast-retry="false">

        <send-request mode="copy" response-variable-name="tokenstate" timeout="120" ignore-error="true">

            <set-url>@(context.Variables.GetValueOrDefault<string>("logicappEndpoint"))</set-url>

            <set-method>POST</set-method>

            <set-header name="Ocp-Apim-Subscription-Key" exists-action="delete" />

        </send-request>

    </retry>

</backend>

 

I hope this short blog gives you a basic understanding of how to redirect a request based on the HTTP verb and a resource location. Enjoy…