Using Logic App Webhooks to execute long running SQL queries

On one of my projects, I had a requirement to send several large datasets as csv files to an FTP server using a Logic App.

I had thought about 2 options to implement this requirement. The first option was to use the Azure SQL Server API connector to extract the dataset from the database. But the amount of data to be serialised as JSON would certainly cause the SQL API connector to timeout or exceed the maximum payload size without some form of database paging and using paging would not meet the requirement of a single file.

The second option was to utilise the HTTP Webhook API connector available with Logic Apps to call an Azure Function passing the call-back URL and the database stored procedure to execute. The Azure Function would simply place the JSON message body onto an Azure Storage account queue and then return a HTTP response code 200 to put the logic app into a dehydrated state.

Another Azure Function would poll the queue for new messages. When a new message is received from the queue, it would query the database using the stored procedure name sent in the queue message.  A datareader is then used to read the returned recordset for speed and populate a DataTable. The rows in the DataTable are then converted into a csv file which is then written to a blob container in a streaming fashion. Once the function has completed creating the csv file, it would call-back to Logic App using the URL sent in the queue message which will be in a dehydrated state to continue with the workflow and send the file from the Blobstore to the FTP server.

Using the Webhook option meant the Logic App would go into a dehydrated state after calling the Azure Function and I would not be charged for any consumption time Smile  whilst in this state. This meant the Azure Function may take as long as required to create the csv file while executing under a consumption billing plan.

Below is a sequence diagram showing the activation of each process. The whole process is triggered by a scheduler inside a logic app.

image

Creating the Solution

I have broken the solution into 2 projects, one for the Azure Functions and the other for the Helper classes. I typically abstract all the business logic out from the Azure Functions and place them in a separate class library project were I can create unit tests for the business logic.

image

Defining the Helper Libraries

The first helper class is the AzureStorage which has one method defined to return a blob reference object and to create the blob container if it does not exist.

using Microsoft.WindowsAzure.Storage;

using Microsoft.WindowsAzure.Storage.Blob;

using System;

using System.Configuration;

using System.Threading.Tasks;


namespace Helpers

{

    public static class AzureStorage

    {

        public static async Task<CloudBlockBlob> CreateBlobAsync(string fileName, string containerName)

        {

            try

            {

                // Retrieve storage account from connection string.

                CloudStorageAccount storageAccount = CloudStorageAccount.Parse(ConfigurationManager.AppSettings["BlobStore"]);

                // Create the blob client.

                CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();


                CloudBlobContainer container = blobClient.GetContainerReference(containerName.ToLower());

                //firstly, we need check the container if exists or not. And if not, we need to create one.

                var creaeContainer = await container.CreateIfNotExistsAsync();


                // Retrieve reference to a blob

                return  container.GetBlockBlobReference(fileName.ToLower());

            }

            catch (Exception ex)

            {

                throw ex;

            }

        }

    }

}

The next class is the “DataAccess” for accessing the database using a datareader to load a DataTable which is then returned. Remember the default command timeout is 30 seconds, so this needs to be increased on the SQLCommand object.

Note this is just a simple implementation of reading the data. Ideally you would add transient fault handling code with retry logic built-in for production releases.

using System;

using System.Configuration;

using System.Data;

using System.Data.SqlClient;


namespace Helpers

{

    internal static class DataAccess

    {

        internal static DataTable CreateDataExtract(string sprocName)

        {

            var constr = ConfigurationManager.ConnectionStrings["SQLConnectionString"].ConnectionString;

            DataTable dataTable = null;

            try

            {

                dataTable = new DataTable();

                using (SqlConnection conn = new SqlConnection(constr))

                {

                    conn.Open();

                    SqlCommand cmd = new SqlCommand(sprocName, conn);

                    cmd.CommandType = CommandType.StoredProcedure;

                    cmd.CommandTimeout = 600;

                    dataTable.Load(cmd.ExecuteReader());

                }

            }

            catch(SqlException sqlEx)

            {

                //capture SQL errors

                throw sqlEx;

            }

            catch (Exception genEx)

            {

                //capture general errors

                throw genEx;

            }

            return dataTable;

        }

    }

}

The last helper class “DataExtract” is the entry point for the Azure Function. This calls the other helper methods to read the database and then converts the DataTable into a csv file by writing the output in a streaming manner to the blob container called “csv”

using System;

using System.Data;

using System.IO;

using System.Text;

using System.Threading.Tasks;


namespace Helpers

{

    public static class DataExtracts

    {


        public static async Task<string> GenerateCSVDataExtractAsync(string storedProc, string columnDelimiter = ",")

        {

            string filename;

            try

            {

                //call data acess 

                var dataTable = DataAccess.CreateDataExtract(storedProc);


                //save table as CSV to blobstore

                filename = Guid.NewGuid().ToString("D") + ".csv";

                await WriteDataTableToCSVAsync(dataTable, "csv", filename, columnDelimiter);


            }

            catch(Exception ex)

            {

                throw ex;

            }


            return filename;

        }


        private static async Task WriteDataTableToCSVAsync(DataTable dt, string containerName, string filename, string columnDelimiter)

        {

            if (dt != null)

            {

                using (var ms = new MemoryStream())

                using (var sw = new StreamWriter(ms, Encoding.UTF8))

                {

                    //create the header row       

                    for (int i = 0; i < dt.Columns.Count; i++)

                    {

                        sw.Write(dt.Columns[i].ColumnName);

                        sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                    }


                    //append the data rows

                    foreach (DataRow row in dt.Rows)

                    {

                        for (int i = 0; i < dt.Columns.Count; i++)

                        {

                            sw.Write(EscapeCSV(row[i].ToString()));

                            sw.Write(i == dt.Columns.Count - 1 ? Environment.NewLine : columnDelimiter);

                        }

                    }

                    sw.Flush();

                    ms.Position = 0;


                    //write to blobstore

                    var blob = await BlobStore.CreateBlobAsync(filename, containerName);

                    await blob.UploadFromStreamAsync(ms);

                }

            }

        }


        private static  string EscapeCSV(string colData)

        {

            string quote = "\"";

            string escapedQuote = "\"\"";

            char[] quotedCharacters = new char[] { ',', '"', '\r', '\n', '\t' };



            if (colData == null) return "";

            if (colData.Contains(quote)) colData = colData.Replace(quote, escapedQuote);

            if (colData.IndexOfAny(quotedCharacters) > 1)

                colData = quote + colData + quote;


            return colData;

        }


    }


}


Defining the Azure Functions

The next project called “LongRunningJobs” contains the 2 Azure Functions required by the solution.

Below is the code for the function “SQLJobRequest” which is called by the Logic App. This simply puts the posted JSON request message onto an Azure Storage Queue using an output parameter of type Queue which simplifies the coding to a few lines of code.

using System.Net;

using System.Net.Http;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Extensions.Http;

using Microsoft.Azure.WebJobs.Host;


namespace LongRunningJobs

{


    public static class SQLJobRequest

    {

        [FunctionName("SQLJobRequest")]

        public static async Task<HttpResponseMessage> Run(

                                    [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)]HttpRequestMessage req,

                                    [Queue("sqljobs", Connection = "BlobStore")]IAsyncCollector<string> outputQueueItem,

                                    TraceWriter log )

        {

            log.Info("C# HTTP trigger function processed a request.");

            var isParamsValid = false;


            // Get request body

            dynamic data = await req.Content.ReadAsAsync<object>();

            string sprocName = data?.SprocName;

            string callBackUrl = data?.CallBackUrl;


            if (!string.IsNullOrEmpty(sprocName) && !string.IsNullOrEmpty(callBackUrl))

            {

                isParamsValid = true;

                await outputQueueItem.AddAsync(data.ToString());

            }



            return isParamsValid == false

                ? req.CreateResponse(HttpStatusCode.BadRequest, "Please pass the sproc name and callback url.")

                : req.CreateResponse(HttpStatusCode.OK);

        }

    }

}

The line [Queue(“<name-of-queue>“, Connection = “<blob-connection-string>“)]IAsyncCollector<string> outputQueueItem in the function parameter list, uses output attributes to define the name of the queue and the name of the blob connection string to use.

The other function “SQLJobExecute” polls the Storage Queue “sqljobs” for new messages. When a new message is received, it calls the helper class method “DataExtracts.GenerateCSVDataExtractAsync(sprocName)” to create the csv file. If the csv file was created successfully, the filename of the csv file is returned in the HTTP Call-back Post body.

using System;

using System.Net.Http;

using System.Net.Http.Headers;

using System.Text;

using System.Threading.Tasks;

using Helpers;

using Microsoft.Azure.WebJobs;

using Microsoft.Azure.WebJobs.Host;

using Newtonsoft.Json;


namespace LongRunningJobs

{

    public static class SQLJobExecute

    {

        [FunctionName("SQLJobExecute")]

        public static void Run([QueueTrigger("sqljobs", Connection = "BlobStore")]string queueItem, TraceWriter log)

        {

            log.Info($"C# Queue trigger function processed: {queueItem}");


            dynamic data = JsonConvert.DeserializeObject(queueItem);


            string sprocName = data.SprocName;

            string callBackUrl = data.CallBackUrl;

            string logicappRunId = data.RunId;


            //check if valid parameters were passed.

            if(string.IsNullOrEmpty(sprocName) || string.IsNullOrEmpty(callBackUrl) || string.IsNullOrEmpty(logicappRunId))

                log.Error("Null value parameters passed.");

            else

                Task.Run(async() => { await ExecuteQueryAsync(callBackUrl, sprocName, logicappRunId, log);});


        }


        private static async Task ExecuteQueryAsync(string callBackUrl, string sprocName, string logicAppRunId, TraceWriter log)

        {

            string blobFilename = string.Empty;

            try

            {

                //call the helper class to create the csv file.

                blobFilename = await DataExtracts.GenerateCSVDataExtractAsync(sprocName);

            }

            catch (Exception ex)

            {

                log.Error(ex.Message, ex);

            }


            try

            {

                //call-back to the Logic App Webhook using the URL passed in the message from the queue.

                using (var httpClient = new HttpClient())

                {

                    var postUrl = new Uri(callBackUrl);

                    httpClient.DefaultRequestHeaders.Accept.Clear();

                    httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));

                    var content = new StringContent("{\"SprocName\":\"" + sprocName + "\",\"Filename\":\"" + blobFilename + "\"}", Encoding.UTF8);

                    var response = await httpClient.PostAsync(postUrl, content);

                    response.EnsureSuccessStatusCode();

                }

            }

            catch (Exception ex)

            {

                log.Error(string.Format("Error occurred when executing function for logic app runId:{0}\n{1}", logicAppRunId, ex.Message), ex);

            }


        }

    }

}

Below is the local.settings.json file which has the following values defined for the Blob Store and SQL Database.

image

After the Azure Functions have been deployed to an Azure App Service, the Application Settings for the Blob Store and SQL Server settings highlighted below need to be added.

image

Logic App Definition

Below are Azure resources used for this solution. The Azure Function are deployed to the “dalfunctions” App Service and “dataextracts” Storage account contains the Blob Store for the csv files and the storage queue “sqljobs”.

image

The Logic App is a very simple workflow which is triggered by a scheduler.

image

The HTTP Webhook shape is what calls the function endpoint “SQLJobRequest” and then puts the Logic App into a dehydrated state.

image

The Subscribe-URI can be obtained from the App Service after the Azure Functions have been deployed and using the “Get function URL” link below.

image

The Subscribe-Body property consists of a JSON payload.  This includes the call-back Url, Logic App run Id for debugging latter if required and the SQL stored procedure name to call in the database. Below is the code view for the HTTP Webhook action.

image

After the HTTP Webhook shape is the JSON Parser and a condition to check if a filename was returned in the webhook request body.

image

The schema for the JSON Parser checks for the FileName and the SprocName.

image

The expression for the condition is below:

image

The “True” condition gets the csv file from the Blob Store and sends it to the FTP server. Note we have to prefix the filename with the Blob container name. Once the file has been sent, it is deleted from the Blob Store.

image

If no filename was received when the Logic App comes out of its dehydrated state, the workflow is terminated.

image

Sample runs

As you can see from the sample runs below, some are taking over 6 minutes to complete which would normally cause a HTTP Function action to timeout.image

Conclusion

The solution can easily be modified to pass a list of stored procedures to create several csv files in one call and then have the call-back function pass the list of filenames to send via FTP.

This pattern could also be used for other scenarios that require an Azure Function to execute some long running process which would cause the normal HTTP Function action to timeout while waiting for the response message.

Enjoy…