Extracting Claims from an OAuth Token in an Azure Function

This is a follow up from my previous blog on adding custom application claims to an App registered in AAD (https://connectedcircuits.blog/2020/08/13/setting-up-custom-claims-for-an-api-registered-in-azure-ad/). In this article I will be developing an Azure Function that accepts an OAuth token which has custom claims. The function will validate the token and return all the claims found in the bearer token as the response message.

Instead of using the built-in Azure AD authentication and authorization support in Azure Functions, I will be using the NuGet packages Microsoft.IdentityModel.Protocols and System.IdentityModel.Tokens.Jwt to validate the JWT token. This will allow decoding of bearer tokens from other authorisation providers.

These packages uses the JSON Web Key Set (JWKS) endpoint from the authorisation server to obtain the public key. The key is then used to validate the token to ensure it has not been tampered with. The main advantage of this option is you don’t need to worry about storing the issuer’s public key and remembering to update the certs before they expire.

 

Function Code

The full code for this solution can found on my github repository https://github.com/connectedcircuits/azOAuthClaimsFunct.

Below is a function which returns the list of signing keys from the jwks_uri endpoint. Ideally the response should be cached, as downloading the keys from the endpoint can take some time.

// Get the public keys from the jwks endpoint      
private static async Task<ICollection<SecurityKey>> GetSecurityKeysAsync(string idpEndpoint )
{
var openIdConfigurationEndpoint = $"{idpEndpoint}.well-known/openid-configuration";
var configurationManager = new ConfigurationManager<OpenIdConnectConfiguration>(openIdConfigurationEndpoint, new OpenIdConnectConfigurationRetriever());
var openIdConfig = await configurationManager.GetConfigurationAsync(CancellationToken.None);
return openIdConfig.SigningKeys;
}

The next part of the code is to configure the TokenValidationParameters properties with the authorisation server address, the audiences and the signing keys obtained from the GetSecurityKeysAsync function mentioned above.

TokenValidationParameters validationParameters = new TokenValidationParameters
{
ValidIssuer = issuer,
ValidAudiences = new[] { audiences },
IssuerSigningKeys = keys
};

Next is to validate the token and acquire the claims found in the token which is assigned to the Claims Principal object.

//Grab the claims from the token.
JwtSecurityTokenHandler handler = new JwtSecurityTokenHandler();
SecurityToken validatedToken;
ClaimsPrincipal principal;
try
{
principal = handler.ValidateToken(token, validationParameters, out validatedToken);
}
catch(SecurityTokenExpiredException ex)
{
log.LogError(ex.Message);
req.HttpContext.Response.Headers.Add("X-Error-Message", $"Token expired at {ex.Expires}");
return new UnauthorizedResult();
}
catch(Exception ex)
{
log.LogError(ex.Message);
return new UnauthorizedResult();
}

Once you have the principle object instantiated, you can use the IsInRole(“<role_name>”) method to check if the token contains the role. This method will return a boolean true value if the role is found.

 

Runtime results

This is the token request for an  app registered in Azure AD that has the crm.read and crm.write claims assigned.

image

This is the response from the Azure Function using the bearer token attained from AAD. Here you can see the two custom application claims crm.read and crm.write listed amongst the default claims.

 

image 

 

 

This is another example of using Auth0 (https://auth0.com/) as an alternative authorisation server. The API was registered with full crud permissions added whilst the client was only given access to read & update roles. Below is the request sent to Auth0 for a token.

image

 

This is the response from calling the Azure Function using the Bearer token from Auth0 with the two custom claims crm:read and crm:update returned with the other default claims.

 

image

 

Conclusion

As you can see, you can use any authorisation server that supports a jwks_uri endpoint to acquire the public signing keys and when the generated token uses RS256 as the algorithm for signing and verifying the token signatures.

Enjoy…

Using Logic App Webhooks to execute long running SQL queries

On one of my projects, I had a requirement to send several large datasets as csv files to an FTP server using a Logic App.

I had thought about 2 options to implement this requirement. The first option was to use the Azure SQL Server API connector to extract the dataset from the database. But the amount of data to be serialised as JSON would certainly cause the SQL API connector to timeout or exceed the maximum payload size without some form of database paging and using paging would not meet the requirement of a single file.

The second option was to utilise the HTTP Webhook API connector available with Logic Apps to call an Azure Function passing the call-back URL and the database stored procedure to execute. The Azure Function would simply place the JSON message body onto an Azure Storage account queue and then return a HTTP response code 200 to put the logic app into a dehydrated state.

Another Azure Function would poll the queue for new messages. When a new message is received from the queue, it would query the database using the stored procedure name sent in the queue message.  A datareader is then used to read the returned recordset for speed and populate a DataTable. The rows in the DataTable are then converted into a csv file which is then written to a blob container in a streaming fashion. Once the function has completed creating the csv file, it would call-back to Logic App using the URL sent in the queue message which will be in a dehydrated state to continue with the workflow and send the file from the Blobstore to the FTP server.

Using the Webhook option meant the Logic App would go into a dehydrated state after calling the Azure Function and I would not be charged for any consumption time Smile  whilst in this state. This meant the Azure Function may take as long as required to create the csv file while executing under a consumption billing plan.

Below is a sequence diagram showing the activation of each process. The whole process is triggered by a scheduler inside a logic app.

image

Creating the Solution

I have broken the solution into 2 projects, one for the Azure Functions and the other for the Helper classes. I typically abstract all the business logic out from the Azure Functions and place them in a separate class library project were I can create unit tests for the business logic.

image

Defining the Helper Libraries

The first helper class is the AzureStorage which has one method defined to return a blob reference object and to create the blob container if it does not exist.

using Microsoft.WindowsAzure.Storage;

using Microsoft.WindowsAzure.Storage.Blob;

using System;

using System.Configuration;

using System.Threading.Tasks;


namespace Helpers

{

    public static class AzureStorage

    {

        public static async Task<CloudBlockBlob> CreateBlobAsync(string fileName, string containerName)

        {

            try

            {

                // Retrieve storage account from connection string.

                CloudStorageAccount storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["BlobStore"]);

                // Create the blob client.

                CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();


                CloudBlobContainer container = blobClient.GetContainerReference(containerName.ToLower());

                //firstly, we need check the container if exists or not. And if not, we need to create one.

                var creaeContainer = await container.CreateIfNotExistsAsync();


                // Retrieve reference to a blob

                return  container.GetBlockBlobReference(fileName.ToLower());

            }

            catch (Exception ex)

            {

                throw ex;

            }

        }

    }

}

The next class is the “DataAccess” for accessing the database using a datareader to load a DataTable which is then returned. Remember the default command timeout is 30 seconds, so this needs to be increased on the SQLCommand object.

Note this is just a simple implementation of reading the data. Ideally you would add transient fault handling code with retry logic built-in for production releases.

using System;

using System.Configuration;

using System.Data;

using System.Data.SqlClient;


namespace Helpers

{

    internal static class DataAccess

    {

        internal static DataTable CreateDataExtract(string sprocName)

        {

            var constr = ConfigurationManager.ConnectionStrings["SQLConnectionString"].ConnectionString;

            DataTable dataTable = null;

            try

            {

                dataTable = new DataTable();

                using (SqlConnection conn = new SqlConnection(constr))

                {

                    conn.Open();

                    SqlCommand cmd = new SqlCommand(sprocName, conn);

                    cmd.CommandType = CommandType.StoredProcedure;

                    cmd.CommandTimeout = 600;

                    dataTable.Load(cmd.ExecuteReader());

                }

            }

            catch(SqlException sqlEx)

            {

                //capture SQL errors

                throw sqlEx;

            }

            catch (Exception genEx)

            {

                //capture general errors

                throw genEx;

            }

            return dataTable;

        }

    }

}

The last helper class “DataExtract” is the entry point for the Azure Function. This calls the other helper methods to read the database and then converts the DataTable into a csv file by writing the output in a streaming manner to the blob container called “csv”

using System;

using System.Data;

using System.IO;

using System.Text;

using System.Threading.Tasks;


namespace Helpers

{

    public static class DataExtracts

    {


        public static async Task<string> GenerateCSVDataExtractAsync(string storedProc, string columnDelimiter = ",")

        {

            string filename;

            try

            {

                //call data acess 

                var dataTable = DataAccess.CreateDataExtract(storedProc);


                //save table as CSV to blobstore

                filename = Guid.NewGuid().ToString("D") + ".csv";

                await WriteDataTableToCSVAsync(dataTable, "csv", filename, columnDelimiter);


            }

            catch(Exception ex)

            {

                throw ex;

            }


            return filename;

        }


        private static async Task WriteDataTableToCSVAsync(DataTable dt, string containerName, string filename, string columnDelimiter)

        {

            if (dt != null)

            {

                using (var ms = new MemoryStream())

                using (var sw = new StreamWriter(ms, Encoding.UTF8))

                {

                    //create the header row       

                    for (int i = 0; i < dt.Columns.Count; i++)

                    {

                        sw.Write(dt.Columns[i].ColumnName);

                        sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                    }


                    //append the data rows

                    foreach (DataRow row in dt.Rows)

                    {

                        for (int i = 0; i < dt.Columns.Count; i++)

                        {

                            sw.Write(EscapeCSV(row[i].ToString()));

                            sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                        }

                    }

                    sw.Flush();

                    ms.Position = 0;


                    //write to blobstore

                    var blob = await BlobStore.CreateBlobAsync(filename, containerName);

                    await blob.UploadFromStreamAsync(ms);

                }

            }

        }


        private static  string EscapeCSV(string colData)

        {

            string quote = "\"";

            string escapedQuote = "\"\"";

            char[] quotedCharacters = new char[] { ',', '"', '\r', '\n', '\t' };



            if (colData == null) return "";

            if (colData.Contains(quote)) colData = colData.Replace(quote, escapedQuote);

            if (colData.IndexOfAny(quotedCharacters) > 1)

                colData = quote + colData + quote;


            return colData;

        }


    }


}


Defining the Azure Functions

The next project called “LongRunningJobs” contains the 2 Azure Functions required by the solution.

Below is the code for the function “SQLJobRequest” which is called by the Logic App. This simply puts the posted JSON request message onto an Azure Storage Queue using an output parameter of type Queue which simplifies the coding to a few lines of code.

using System.Net;

using System.Net.Http;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Extensions.Http;

using Microsoft.Azure.WebJobs.Host;


namespace LongRunningJobs

{


    public static class SQLJobRequest

    {

        [FunctionName("SQLJobRequest")]

        public static async Task<HttpResponseMessage> Run(

                                    [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]HttpRequestMessage req,

                                    [Queue("sqljobs", Connection = "BlobStore")]IAsyncCollector<string> outputQueueItem,

                                    TraceWriter log )

        {

            log.Info("C# HTTP trigger function processed a request.");

            var isParamsValid = false;


            // Get request body

            dynamic data = await req.Content.ReadAsAsync<object>();

            string sprocName = data?.SprocName;

            string callBackUrl = data?.CallBackUrl;


            if (!string.IsNullOrEmpty(sprocName) && !string.IsNullOrEmpty(callBackUrl))

            {

                isParamsValid = true;

                await outputQueueItem.AddAsync(data.ToString());

            }



            return isParamsValid == false

                ? req.CreateResponse(HttpStatusCode.BadRequest, "Please pass the sproc name and callback url.")

                : req.CreateResponse(HttpStatusCode.OK);

        }

    }

}

The line [Queue(“<name-of-queue>“, Connection = “<blob-connection-string>“)]IAsyncCollector<string> outputQueueItem in the function parameter list, uses output attributes to define the name of the queue and the name of the blob connection string to use.

The other function “SQLJobExecute” polls the Storage Queue “sqljobs” for new messages. When a new message is received, it calls the helper class method “DataExtracts.GenerateCSVDataExtractAsync(sprocName)” to create the csv file. If the csv file was created successfully, the filename of the csv file is returned in the HTTP Call-back Post body.

using System;

using System.Net.Http;

using System.Net.Http.Headers;

using System.Text;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Host;

using Newtonsoft.Json;


namespace LongRunningJobs

{

    public static class SQLJobExecute

    {

        [FunctionName("SQLJobExecute")]

        public static void Run([QueueTrigger("sqljobs", Connection = "BlobStore")]string queueItem, TraceWriter log)

        {

            log.Info($"C# Queue trigger function processed: {queueItem}");


            dynamic data = JsonConvert.DeserializeObject(queueItem);


            string sprocName = data.SprocName;

            string callBackUrl = data.CallBackUrl;

            string logicappRunId = data.RunId;


            //check if valid parameters were passed.

            if(string.IsNullOrEmpty(sprocName) || string.IsNullOrEmpty(callBackUrl) || string.IsNullOrEmpty(logicappRunId))

                log.Error("Null value parameters passed.");

            else

                Task.Run(async() => { await ExecuteQueryAsync(callBackUrl, sprocName, logicappRunId, log);});


        }


        private static async Task ExecuteQueryAsync(string callBackUrl, string sprocName, string logicAppRunId, TraceWriter log)

        {

            string blobFilename = string.Empty;

            try

            {

                //call the helper class to create the csv file.

                blobFilename = await DataExtracts.GenerateCSVDataExtractAsync(sprocName);

            }

            catch (Exception ex)

            {

                log.Error(ex.Message, ex);

            }


            try

            {

                //call-back to the Logic App Webhook using the URL passed in the message from the queue.

                using (var httpClient = new HttpClient())

                {

                    var postUrl = new Uri(callBackUrl);

                    httpClient.DefaultRequestHeaders.Accept.Clear();

                    httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

                    var content = new StringContent("{\"SprocName\":\"" + sprocName + "\",\"Filename\":\"" + blobFilename + "\"}", Encoding.UTF8);

                    var response = await httpClient.PostAsync(postUrl, content);

                    response.EnsureSuccessStatusCode();

                }

            }

            catch (Exception ex)

            {

                log.Error(string.Format("Error occurred when executing function for logic app runId:{0}\n{1}", logicAppRunId, ex.Message), ex);

            }


        }

    }

}

Below is the local.settings.json file which has the following values defined for the Blob Store and SQL Database.

image

After the Azure Functions have been deployed to an Azure App Service, the Application Settings for the Blob Store and SQL Server settings highlighted below need to be added.

image

Logic App Definition

Below are Azure resources used for this solution. The Azure Function are deployed to the “dalfunctions” App Service and “dataextracts” Storage account contains the Blob Store for the csv files and the storage queue “sqljobs”.

image

The Logic App is a very simple workflow which is triggered by a scheduler.

image

The HTTP Webhook shape is what calls the function endpoint “SQLJobRequest” and then puts the Logic App into a dehydrated state.

image

The Subscribe-URI can be obtained from the App Service after the Azure Functions have been deployed and using the “Get function URL” link below.

image

The Subscribe-Body property consists of a JSON payload.  This includes the call-back Url, Logic App run Id for debugging latter if required and the SQL stored procedure name to call in the database. Below is the code view for the HTTP Webhook action.

image

After the HTTP Webhook shape is the JSON Parser and a condition to check if a filename was returned in the webhook request body.

image

The schema for the JSON Parser checks for the FileName and the SprocName.

image

The expression for the condition is below:

image

The “True” condition gets the csv file from the Blob Store and sends it to the FTP server. Note we have to prefix the filename with the Blob container name. Once the file has been sent, it is deleted from the Blob Store.

image

If no filename was received when the Logic App comes out of its dehydrated state, the workflow is terminated.

image

Sample runs

As you can see from the sample runs below, some are taking over 6 minutes to complete which would normally cause a HTTP Function action to timeout.image

Conclusion

The solution can easily be modified to pass a list of stored procedures to create several csv files in one call and then have the call-back function pass the list of filenames to send via FTP.

This pattern could also be used for other scenarios that require an Azure Function to execute some long running process which would cause the normal HTTP Function action to timeout while waiting for the response message.

Enjoy…

Content based message routing using Azure Logic Apps, Function and Service Bus

Content Based Routing (CBR) is another pattern used in the integration world. The contents of the message determines the endpoint of the message.

This article will describe one option to develop this pattern using an Azure Service Bus, an Azure Function and a Logic App.

Basically the service bus will be used to route the message to the correct endpoint using topics and subscriptions. The Azure Function is used to host and execute the business rules to inspect the message contents and set the routing properties. A logic app is used to accept the message and call the function passing the received message as an argument. Once the function executes the business rules, it will return the routing properties in the response body. The routing information is then used to set the properties on the service bus API connector in the Logic App before publishing the message onto the bus.

image

Scenario

To demonstrate a typical use-case of this pattern we have 2 message types, Sales Orders (SO) and Purchase Orders (PO). For the SO I want to send the order to a priority queue if the total sales amount is over a particular value. And for a PO, it should be sent to a pre-approval queue if the order value is under a specified amount.

Here is an example of a SO message to be routed:

image

And an example of a PO being sent:

image

Solution

The real smarts of this solution is the function which will return a common JSON response message to set the values for the Topic name and the custom properties on the service bus connector. The fields of the response message are described below.

  • TopicName – the name of service bus topic to send the message to.
  • CBRFilter_1 – used by the subscription rule to filter the value on. Depending on your own requirements you may need more fields to filter more granular.
  • RuleSetVersion – used by the subscription rule to filter the value on. It’s a good idea to have this field as you may have several versions of this rule in play at any one time.

Let’s start with provisioning the service bus topics and subscriptions for the 2 types of messages. First create 2 topics called purchaseorder and salesorder.

 

image

Now add the following subscriptions and rules for each of the topics.

Topic Name Subscription Name Rule
purchaseorder Approved_V1.00 CBRFilter_1 = ‘Approved’ and RuleSetVersion = ‘1.00’
purchaseorder NotApproved_V1.00 CBRFilter_1 = ‘ApprovedNot’ and RuleSetVersion = ‘1.00’
salesorder HighPriority_V1.00 CBRFilter_1 = ‘PriorityHigh’ and RuleSetVersion = ‘1.00’
salesorder LowPriority_V1.00 CBRFilter_1 = ‘PriorityLow’ and RuleSetVersion = ‘1.00’

Next is the development of the Azure function. This is best developed in Visual Studio where you can include a Unit Test project to each of the rules. Add a new Azure Function project to your solution.

image

After the project has been created, right click on the function and click Add -> New Item. Choose Azure Function, give it a name and select the Http trigger option.

image

Below is code for the HTTP trigger function which includes the class definition for the RoutingProperties object. I am checking for specific elements SalesOrderNumber, PurchaseOrderNumber in the JSON message to determine the type of message and which determines what rule code block to execute. Each rule block code will first set the TopicName and RuleSetVersion properties.

public static class CBRRule

   {

       [FunctionName("CBRRule")]

       public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Function,  "post", Route = null)]HttpRequestMessage req, TraceWriter log)

       {

           log.Info("C# HTTP trigger function processed a request.");

           var routingProperties = new RoutingProperties();


           // Get request body

           JObject data = await req.Content.ReadAsAsync<JObject>();


           //Is this a  sales order message type           

           if (data != null && data["SalesOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "PriorityLow";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "SalesOrder";


               var lineItems = data["Lines"];

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);


               //if the total sales is greater than $1000 send the message to the high priority queue

               if (totalSaleAmount > 1000)

                   routingProperties.CBRFilter_1 = "PriorityHigh";

           }


           //Is this a purchase order message type           

           if (data != null && data["PurchaseOrderNumber"] != null)

           {

               routingProperties.CBRFilter_1 = "ApprovedNot";

               routingProperties.RuleSetVersion = "1.00";

               routingProperties.TopicName = "PurchaseOrder";


               var lineItems = data["Lines"];

               var totalSaleAmount = lineItems.Sum(x => (decimal)x["UnitPrice"] * (decimal)x["Qty"]);


               //Approve PO if the total order price is less than $500

               if (totalSaleAmount < 500)

                   routingProperties.CBRFilter_1 = "Approved";

           }


           return req.CreateResponse(HttpStatusCode.OK, routingProperties);

       }


       /// <summary>

       /// Response message to set the custom routing properties of the service bus

       /// </summary>

       public class RoutingProperties

       {

           public string TopicName { get; set; }

           public string CBRFilter_1 { get; set; }

           public string RuleSetVersion { get; set; }


           public RoutingProperties()

           {

               this.CBRFilter_1 = "Unknown";

               this.RuleSetVersion = "Unknown";

               this.TopicName = "Unknown";

           }

       }


   }

The business rule for a SO aggregates all the line items and checks if the total amount is greater than 1000, if it is then set the property CBRFilter_1 to “PriorityHigh”.

The business rule for a PO also aggregates all the line items and checks if the total amount is less than 500, if it is then set the property CBRFilter to “Approved”.

With the following input message sent to the function:

clip_image001

The output of the function should look similar to this below

:clip_image001[6]

Now we need to publish the function from Visual Studio to your Azure resource group using the publishing wizard.

clip_image002

The last component of this solution is the Logic App which is triggered by an HTTP Request API and then calls the Azure function created above. The basic flow looks like this below.

clip_image004

The HTTP Request trigger has no request body JSON schema created. The trigger must accept any type of message.

clip_image006

Add an Azure Function after the trigger action and select the method called “CBRRule”

clip_image008

Set the Request Body to the trigger body content and the Method to “POST”

clip_image010

Next add a Service Bus action and set the properties as shown. Both the Queue/Topic name and Properties are set from the function response message

.clip_image012

Here is the code view of the Send Message action showing how the properties are set.

clip_image014

Testing

Using PostMan we can send sample messages to the Logic App and then see them sitting on the service bus waiting to be processed by some other method.

At the moment the service bus should have no messages waiting to be processed as shown

.clip_image015

Using PostMan to send the following PO, we should see this message end up in the purchaseorder/NotApproved subscription.

clip_image016

All going well, the message will arrive at the correct subscription waiting to be processed as shown below using Service Bus Explorer.

clip_image018

Sending the following SO will also route the message to the correct subscriber.

clip_image019

clip_image021

Conclusion

CBR can be easily achievable using an Azure Function to execute your business rules on a message to set up the routing properties. Taking this a step further, I would abstract the business rules for each message type in its own class for manageability.

Also it is advisable to setup a Unit Test project for each of the classes holding the business rules to ensure 100% code testing coverage.

Enjoy…