Typically when using Azure Functions to consume messages from a Service Bus (SB), the ordering is not guaranteed although the SB is First-In-First-Out (FIFO). This is due to competing consumers, where multiple instances of the function are competing for messages of the service bus.
An example where out of ordering can happen is when a function instance takes longer to process a message than other instances therefore affecting the process ordering. This is represented in the sequence diagram below, where the function instance 1 took longer to update the same record in a database than instance 2.
One option to enforce ordered delivery is to configure the Azure Function to spin up only one instance. The only problem with this solution is it won’t scale very well. A more scalable option is to use sessions.This allows you to have multiple instances of a function executing giving you a higher message throughput.
To enforce message ordering several properties must be set. The property Requires Session must be enabled on the SB queues and topic subscriptions. Messages sent onto the SB must set the context property SessionId to unique value from other non related messages. Some examples of a session Id could be the account number, customer number, batch Id, etc. Azure Functions need to have the IsSessionsEnabled property set to enabled on the SB input binding.
This feature for Azure Functions to use SB sessions only came GA as of mid 2019. Enabling sessions on the Azure Function places a lock on all messages that have the same session Id causing the locked messages to be consumed by that one function instance that placed the lock.
Typical Scenario
A warehouse needs to track the progress of an order from when its first received to when it gets dispatched. Throughout each stage (Ordered, Picked, Packaged, Dispatched) of the ordering process, the status of the order must be updated. This involves placing a new message onto the service bus every time the order status needs to get updated. An Azure function will then pull the messages from the service bus and update the order status in a database where the customer can view the current state of their order.
To simulate the warehouse tracking system, a console app will be used to create messages for each status change (Ordered, Picked, Packaged, Dispatched), for several hundred orders. The session Id of each status message will be set to the order number. The app will then send the messages to a SB Topic where it will have two subscriptions, one with sessions enabled and the other disabled. This is so we can compare the ordering of messages being received with and without sessions enabled.
- class Program
- {
- private static string connectionString = ConfigurationManager.AppSettings[“ServiceBusConnectionString”];
- private static string topicName = ConfigurationManager.AppSettings[“TopicName”];
- private static int orders = 100;
- private static int messagePerSession = 4;
- static async Task Main(string[] args)
- {
- Console.WriteLine(“Creating Service Bus sender….”);
- var taskList = new List<Task>();
- var sender = new MessageSender(connectionString, topicName);
- //create an order
- for (int order = 0; order < orders; order++)
- {
- var orderNumber = $”OrderId-{order.ToString()}“;
- var messageList = new List<Message>();
- //simulate a status update in the correct order
- for (int m = 0; m < messagePerSession; m++)
- {
- var status = string.Empty;
- switch (m)
- {
- case 0 :
- status = “1 – Ordered”;
- break;
- case 1:
- status = “2 – Picked”;
- break;
- case 2:
- status = “3 – Packaged”;
- break;
- case 3:
- status = “4 – Dispatched”;
- break;
- }
- var message = new Message(Encoding.UTF8.GetBytes($”Status-{status}“))
- {
- //set the service bus SessionId property to the current order number
- SessionId = orderNumber
- };
- messageList.Add(message);
- }
- //send the list of status update messages for the order to the service bus
- taskList.Add(sender.SendAsync(messageList));
- }
- Console.WriteLine(“Sending all messages…”);
- await Task.WhenAll(taskList);
- Console.WriteLine(“All messages sent.”);
- }
- }
Two Azure functions will be created, where one has sessions enabled and the other disabled. The functions will have a random delay created from 1 to 10 seconds to simulate some business logic which may be calling out to an external service before updating the order status. Instead of the function writing to a database, each status update message received will be written to an Azure Table storage to create an audit log of when a status update message was processed.
Below is the source code for the function which will process the messages on the service bus using sessions. Note the IsSessionEnabled property is set to true on the ServiceBusTrigger input binding. The randomiser is to simulate some business logic that could vary in time to process a message.
- public static class MsgOrderingSessions
- {
- [FunctionName(“MsgOrderingSessions”)]
- [return: Table(“OrdersSession”, Connection = “StorageConnectionAppSetting”)]
- public static OrderEntity Run([ServiceBusTrigger(“orders”, “OrdersSession”, Connection = “SbConnStr”, IsSessionsEnabled = true)]
- Message sbMesssage, ILogger log)
- {
- log.LogInformation($”C# ServiceBus topic trigger function processed message: {Encoding.UTF8.GetString(sbMesssage.Body)}“);
- Random random = new Random();
- int randNumb = random.Next(1000, 10000);
- System.Threading.Thread.Sleep(randNumb);
- return new OrderEntity { PartitionKey = $”{sbMesssage.SessionId} – {DateTime.Now.Ticks}“ , RowKey = Guid.NewGuid().ToString(), Text = Encoding.UTF8.GetString(sbMesssage.Body) };
- }
- }
Below is the source code for the function which does not use sessions. Here the IsSessionEnabled is set to false.
- public static class MsgOrderingNoSession
- {
- [FunctionName(“MsgOrderingNoSessions”)]
- [return: Table(“OrdersNoSession”, Connection = “StorageConnectionAppSetting”)]
- public static OrderEntity Run([ServiceBusTrigger(“orders”, “OrdersNoSession”, Connection = “SbConnStr”, IsSessionsEnabled = false)]
- Message sbMesssage, ILogger log)
- {
- log.LogInformation($”C# ServiceBus topic trigger function processed message: {Encoding.UTF8.GetString(sbMesssage.Body)}“);
- Random random = new Random();
- int randNumb = random.Next(1000, 10000);
- System.Threading.Thread.Sleep(randNumb);
- return new OrderEntity { PartitionKey = $”{sbMesssage.SessionId} – {DateTime.Now.Ticks}“, RowKey = Guid.NewGuid().ToString(), Text = Encoding.UTF8.GetString(sbMesssage.Body) };
- }
- }
Below is the settings for the service bus topic which has 2 subscriptions and one of them has Requires Session checked.
Running the console app creates 400 messages on both subscriptions, 4 status update messages per 1 order.
Conclusion
The Azure function which had the ServiceBusTrigger, IsSessionsEnabled = false inserted the rows out of order due to multiple function instances competing for the next message on the service bus.
Now the Azure Function which had IsSessionsEnabled = true and read messages from a service bus subscription which also had the Requires Session flag enabled, the messages were processed in the correct sequence as they were placed onto the service bus.
When using sessions, there is a slight performance hit depending on the number of function instances executing. In this example both functions where running under the consumption plan which spun up 6 instances. As you can see the number of messages waiting on each of the subscriptions below, the subscription which had sessions disabled are processing the messages a lot faster.
When sessions are used, each function instance places a locked on all messages having the same session Id which are processed one after another. As there were only 6 instances available, only a maximum of six orders could be processed at one time.
Enjoy…