Here’s everything you need to securely pull messages from an Azure Storage queue as part of creating a reliable, scalable and extendable asynchronous application.
In my last few posts, I’ve walked through creating an Azure Storage Queue and showed how to add messages to that queue from either an ASP.NET Core or TypeScript/JavaScript frontend. This post is about how to create the backend processor that will read messages from that queue and process them.
One option is to use an Azure Function App or a Logic App with a trigger tied to the Storage queue to run a function in the Function App when a message is added to the queue. However, the best part of using Function Apps is how easy it is to configure and manage them—primarily because you give up a lot of control both in how you’ll process a message and how you’ll manage the service. You can get some of that control back by deploying your Function App to an App Service.
Having said that, for this post, I’m going to create my backend processor as a Worker Service in an App Service. That choice gives me access to more functionality in processing messages than I get with a Function App (for example, peeking at a message rather than reading it) and all the features of an App Service in managing my backend.
Fundamentally, you create your App Service exactly as you would create an App Service for a backend Web Service. For your development App Service, you may want to enable Basic Authentication to support configuring your App Service.
After creating the service, go to the Settings | Configuration menu choice. From the settings page that displays on the right, find the Always On option and set it to On (that will increase your costs but will also make sure that your application doesn’t shut down because you went a long time without seeing a message). You should also, for your development App Service, enable logging from the Monitoring | App Service logs menu down the left side of your App Service.
You will also want to make sure only your approved backend processors can read your queue. Because I’m creating my backend processor as a server-side application, my preferred tool (and Microsoft’s recommendation) is to use an Azure Managed Identity. When you create your Managed Identity, make a note of its client id—you’ll need it later.
Still in the portal, surf to your Storage queue, click its Access Control (IAM) menu choice from the menu down the left side, and use the Add role assignment button on the resulting page to give your Managed Identity the Queue Storage Contributor role for your queue.
Finally, surf to the App Service that you will be deploying your backend processor to. From the App Service’s Settings | Identity choice in the menu down the left side, use the User Assigned tab to assign your Managed Identity to the service.
You’re now ready to create your backend processor.
First, of course, you need to create your project in Visual Studio or Visual Studio Code. In Visual Studio, you want to use the Worker Service template; in Visual Studio Code, you want to use this command to create your project.
I named my project WarehouseMgmtProductsProcessor. You’ll want to swap in your own project name:
dotnet new worker -n WarehouseMgmtProductsProcessor
Once your project is created, you’ll need to add the NuGet package Azure.Storage.Queues to your project (if you’re using Visual Studio’s Manage NuGet packages tab, search for “azure storage queues”). You’ll also need to add the Microsoft.Identity.Web package so that your backend processor can get authorization to access the queue.
Once your project is created, open the Worker.cs file that’s automatically added to your project. The loop inside the ExecuteAsync method will execute automatically when you start or stop the App Service you deploy your backend. You just need to add the code to check for a message and process it.
Before you can read from the queue, you need to create a QueueClient object, passing the URL for your queue (wrapped inside a Uri object) and a credential object that authorizes access to the queue.
If you’re using a Managed Identity to provide authorization (and you should), you can use the ManagedIdentityCredential object to authorize your client, as I do in the following code. You’ll need to pass your ManagedIdentityCredential the client id of the Managed Identity you created.
Typical code in the Worker class’s constructor to create the QueueClient and put it in a local field would look like this:
private QueueClient qc;
public Worker(ILogger<Worker> logger)
{
qc = new QueueClient(
new Uri("https://warehousemgmtphv.queue.core.windows.net/updateproductinventory"),
new ManagedIdentityCredential("ee8…-…-…-…-…be7"));
With the QueueClient object created, you can then use its ReadMessagesAsync method to read, by default, the next message from the queue and return it wrapped inside a Response object—assuming a message exists on the queue. If there is no message on the queue, then ReadMessagesAsync returns a Response object that has its Value property set to null.
If there is a message on the queue, you can retrieve the value of the message (almost certainly the JSON representation of some object) from the Value property’s Body property. The message will be invisible to any other processor for 30 seconds (assuming there is another processor).
It’s your responsibility to delete the message from the queue either when you’ve completed processing or have an error processing the message (if you have an error, you should probably write the message out to some “dead letter” space to be reviewed later). If you don’t find a message, your code should wait some reasonable period of time before trying again.
This code assumes that the queue is holding the JSON representation of a class I called QueueDTO:
while (!stoppingToken.IsCancellationRequested)
{
Response<QueueMessage> msg = await qc.ReceiveMessageAsync();
if (msg.Value != null)
{
try
{
QueueDTO? qDto = JsonSerializer.Deserialize<QueueDTO>(msg.Value.Body);
//...processing
}
catch (Exception ex)
{
_logger.LogError(ex.Message, ex.InnerException);
}
finally
{
await qc.DeleteMessageAsync(msg.Value.MessageId, msg.Value.PopReceipt);
//…write message to some location for review
}
}
else
{
await Task.Delay(5000, stoppingToken);
}
You can pass a TimeSpan object as the second parameter to the ReceiveMessageAsync method to change how long the message remains invisible to other processors after being received.
If your queue is going to empty for significant periods of time, you might want to implement a more sophisticated wait pattern than the “Always 5 seconds” that I’ve used in my sample code (e.g., if you go three reads without a message, extend the wait period to 30 seconds, at 20 reads, extend the wait to 60 seconds and so on). If you know when your application won’t be running, you could create a recurring Logic App to start up and shut down your service.
You can also have the ReceiveMessageAsync retrieve more than one message by passing a maximum messages value to the method (up to a maximum of 32 messages). However, if you do want to receive multiple messages, I think it’s easier to use the ReceiveMessagesAsync method because it returns an array of QueueMessage objects.
This code, for example, uses ReceiveMessagesAsync to retrieve up to 10 messages at a time and then processes each message before reading the next batch:
Response<QueueMessage[]> msgs = await qc.ReceiveMessagesAsync(10);
if (msg.Value != null)
{
foreach (QueueMessage qmsg in msgs.Value)
{
try
{
QueueDTO? qDto = JsonSerializer.Deserialize<QueueDTO>(qmsg.Body);
…
}
catch { … }
finally { … }
}
}
Since each receive is actually a separate HTTP request to the queue, retrieving multiple messages is probably a good idea.
After you deploy your backend to an App Service, you’ll be able to stop it and start it from the Azure Portal. Under the hood, the portal passes a cancellation token to your Worker, causing the loop you’ve put your code inside of to terminate.
You can also pass that token onto your ReceiveMessageAsync method to terminate it and stop your Worker a little faster. You should not, however, pass the token to your DeleteMessageAsync method. Unless your messages are idempotent (i.e., processing the same message twice won’t cause a problem), you want read and processed messages to be deleted.
You can deploy your backend Worker to your App Service as either a triggered or continuous service (these two options show up as different slots in your App Service—you can deploy to both).
A continuous Worker will be started automatically after deployment while a triggered Worker must be started manually. For testing and debugging purposes, you should deploy as a triggered service—that will allow you to, for example, load test data into your queue before starting your backend. In production, you may want to switch to continuous.
To deploy from Visual Studio, from the Build menu, select Publish Selection to open a Publish tab for your project. Click the Add a publish profile link to start a wizard for creating your profile.
You’ll find that, after you select Azure as your target, you’re given a choice of picking the appropriate WebJobs (either Azure WebJobs (Windows) or Azure WebJobs (Linux)) rather than the App Services choices you may be used to. Just pick the category that matches the platform you used for your App Service (e.g., Azure WebJobs (Windows) if you created a Windows-based App Service) and you’ll be taken to a list of App Services. Select the App Service you want to deploy your backend to and finish creating your profile.
When your completed publish profile is displayed, click on its “Show all settings” link to display the Profile settings dialog. On that dialog, you’ll see a WebJob Type dropdown list that will let you choose between deploying your application into the service’s Continuous or Triggered slot. For testing and development, your best bet is to select Triggered.
With your profile configured, close the Profile Setting dialog by clicking the Save button and, back in your profile, click the Publish button to deploy your backend to its App Service.
If you do want to publish the production version of your backend as continuous, rather than triggered, your best choice is to create a second publish profile (call it something clever, like “Publish to Prod”) and set its WebJob Type to Continuous.
To deploy your app from Visual Studio Code, you’ll first need to create a publish package using dotnet publish. Once that publish package is created (and assuming that you’ve added the Azure Extensions to your copy of Visual Studio Code), you can open Visual Studio Code’s Azure Extensions panel, right-click on the App Service you intend to deploy to and select Deploy Now. That will open a file browser dialog in Visual Studio Code—drill down through the project’s bin folder until you find the publish folder and select that folder.
You’ll then be asked to reselect your App Service and, after that, your backend will be deployed (you’ll also be asked if you always want to deploy your project to that App Service. Take the option—it will save you time later).
By default, your backend will be deployed to your App Service’s Triggered slot which is what you want for testing and development. To switch your deployment to Continuous, open your project’s Properties/PublishProfiles/<project name>-WebDeploy.pubxml file and, in that file, set the <WebJobType> element to Continuous.
To try out your backend, in the Azure Portal, surf to your App Service and, in the menu down the left side, select the Settings | WebJobs menu choice to display the Triggered/Continuous slots where you’ve deployed applications. To run a backend in the Triggered slot, click the run icon in the Run column near the right end of the slot.
Assuming that you’ve enabled logging for your App Service, you can view any log messages generated by your application by clicking the clipboard icon in the Logs column of the slot.
You can stop your backend by clicking the Refresh button on the menu across the top of the list of slots and then clicking on the stop icon in the Run column.
In addition to receiving and deleting messages, you can also update a message, changing either its body or the time that the message will be invisible after being received and then leave the message on the queue. The PeekMessage method is an alternative to the ReceiveMessageAsync method. PeekMessage lets you retrieve a message without making it invisible to any other processor.
These options can be useful if you have multiple processors, each of which performs different processing for the messages on the queue. Since each of the processors needs to see every message, you could use PeekMessage to read a message and, after processing the message, update the message instead of deleting it. In this pattern, messages would stay on the queue until every processor had processed it (you’ll need some processor to regularly sweep through the queue, find all the messages that have had all their processing done and delete those messages).
If that sounds messy/complicated, it might be easier just to have your frontend write the message to multiple queues with each queue having its own, dedicated processor. Alternatively, you could start thinking about moving to an Azure Service Bus which supports having multiple processors natively. I’ll be covering Service Buses in my next post.
Peter Vogel is both the author of the Coding Azure series and the instructor for Coding Azure in the Classroom. Peter’s company provides full-stack development from UX design through object modeling to database design. Peter holds multiple certifications in Azure administration, architecture, development and security and is a Microsoft Certified Trainer.