.NET: Unit Testing Azure Service Bus
Unit testing an Azure Service Bus consumer is fairly easy. In this tutorial, I will use .NET6, Moq (v4), and Fluent Assertions. The difficult part is constructing the ServiceBusMessageReceived because it has only internal constructors and no interface. However, there is a factory service that can create these objects.
GitHub
I’ve included unit testing in this project.
https://github.com/mrjamiebowman-blog/microservices-application-insights
What should be unit tested?
There are two areas that I focus on when testing consumer service. That is message processing and error handling.
Message Handler
This is the main method for processing incoming data from Azure Service Bus. This can be tested to make sure data is properly deserialized, and distributed to the correct services appropriately.
Error Handler
The Error Handler method can handle specific cases where the service bus client fails. It may be useful to unit test this code if there are certain actions that need to happen during a failure.
ConsumerAzureServiceBus Service Class
This is the code that we will be testing to put things in context.
public class ConsumerAzureServiceBus : IConsumerService, IConsumerAzureServiceBus
{
private readonly ILogger<ConsumerAzureServiceBus> _logger;
private readonly TelemetryClient _telemetryClient;
private readonly AzureServiceBusConsumerConfiguration _azureServiceBusConfiguration;
private ServiceBusProcessor _processor;
private CancellationToken _cancellationToken;
#region Event Delegate
/// <summary>
/// Note: I typically don't do this since microservices are atomic in design.
/// However, for this demo, I will share a common service bus consumer between
/// projects. In theory this could be done and would work fine but it creates a
/// dependency between the microservices through shared code. This technique
/// is something to carefuly consider.
/// </summary>
// delegates and events for message processing
public delegate Task MessageReceivedAsync(string message, string operationId, string parentId, CancellationToken cancellationToken);
public event IConsumerService.MessageReceivedAsync ProcessMessageAsync;
// object lock
object objectLock = new object();
event IConsumerService.MessageReceivedAsync IConsumerService.ProcessMessageAsync
{
add
{
lock (objectLock)
{
ProcessMessageAsync += value;
}
}
remove
{
lock (objectLock)
{
ProcessMessageAsync -= value;
}
}
}
#endregion
public ConsumerAzureServiceBus(ILogger<ConsumerAzureServiceBus> logger, TelemetryClient telemetryClient, AzureServiceBusConsumerConfiguration azureServiceBusConsumerConfiguration)
{
_logger = logger;
_telemetryClient = telemetryClient;
_azureServiceBusConfiguration = azureServiceBusConsumerConfiguration;
}
public void LogStartupInformation()
{
_logger.LogInformation("Azure Service Bus Consumer Starting.");
}
public Task StartConsumingAsync(CancellationToken cancellationToken)
{
LogStartupInformation();
return StartReceivingMessagesAsync(_azureServiceBusConfiguration.QueueOrTopic, _azureServiceBusConfiguration.SubscriptionName, cancellationToken);
}
[ExcludeFromCodeCoverage]
public virtual ServiceBusClient GetServiceBusClient()
{
return new ServiceBusClient(_azureServiceBusConfiguration.ConnectionString);
}
public async Task StartReceivingMessagesAsync(string queueOrTopic, string subscriptionName, CancellationToken cancellationToken)
{
try
{
// get service bus client
var client = GetServiceBusClient();
// create a processor
_processor = client.CreateProcessor(queueOrTopic, subscriptionName, new ServiceBusProcessorOptions());
// cancellation token
_cancellationToken = cancellationToken;
// process message handler
_processor.ProcessMessageAsync += MessageHandler;
// error handler
_processor.ProcessErrorAsync += ErrorHandler;
// start processing
await _processor.StartProcessingAsync(_cancellationToken);
}
catch (Exception ex)
{
_logger.LogError($"ConsumerAzureServiceBus->ReceiveMessageAsync(queueOrTopic: ({queueOrTopic}), subscriptionName: ({subscriptionName}).", ex);
}
}
public Task StopConsumingAsync(CancellationToken token)
{
throw new NotImplementedException();
}
public async Task MessageHandler(ProcessMessageEventArgs args)
{
// message body
var body = args.Message.Body.ToString();
// extract root operation id and parent id
(var rootOperationId, var parentId) = args.Message.GetCorrelationIds();
using (var operation = _telemetryClient.StartOperation<RequestTelemetry>("ServiceBusProcessor.ProcessMessage", rootOperationId, parentId))
{
// log information
_logger.LogInformation($"Received Message (queueOrTopic: ({_azureServiceBusConfiguration.QueueOrTopic}), subscriptionName: ({_azureServiceBusConfiguration.SubscriptionName}), Operation ID: ({rootOperationId}), Parent ID: ({parentId}).");
_logger.LogInformation($"{body}");
// update parent id
parentId = operation.Telemetry.Id;
// process message
if (ProcessMessageAsync != null) {
await ProcessMessageAsync?.Invoke(body, rootOperationId, parentId, _cancellationToken);
}
// we can evaluate application logic and use that to determine how to settle the message.
await args.CompleteMessageAsync(args.Message);
}
}
public Task ErrorHandler(ProcessErrorEventArgs args)
{
var ex = args?.Exception;
// log error
_logger.LogError($"ErrorHandler: {args.Exception.Message}");
// the error source tells me at what point in the processing an error occurred
_logger.LogInformation(args.ErrorSource.ToString());
_logger.LogInformation(args.FullyQualifiedNamespace);
_logger.LogInformation(args.EntityPath);
_logger.LogInformation(args.Exception.ToString());
if (ex is ServiceBusException)
{
var asbEx = (ServiceBusException)ex;
if (asbEx.Reason == ServiceBusFailureReason.MessagingEntityNotFound)
{
// queue or topic does not exist
return Task.CompletedTask;
}
}
return Task.CompletedTask;
}
}
Mocking the Service Bus Client
I’ve tried these several ways and I think this is the simplest. Moq can only mock public and virtual methods. An easy and effective way to implement a mocked Service Bus Client is to create a public virtual method that returns the service bus client. I’m also excluding this from code coverage since this is specifically here for unit testing.
[ExcludeFromCodeCoverage]
public virtual ServiceBusClient GetServiceBusClient()
{
return new ServiceBusClient(_azureServiceBusConfiguration.ConnectionString);
}
In the StartReceivingMessageAsync() method I can get the client doing this.
public async Task StartReceivingMessagesAsync(string queueOrTopic, string subscriptionName, CancellationToken cancellationToken)
{
try
{
// get service bus client
var client = GetServiceBusClient();
// create a processor
_processor = client.CreateProcessor(queueOrTopic, subscriptionName, new ServiceBusProcessorOptions());
Why injecting the ServiceBusClient didn’t work out.
Someone recommended injecting the ServiceBusClient and I thought that was worth trying. That doesn’t work in the end for several reasons. There isn’t a method to connect or reconnect that I know of and the only way a connection is created is through instantiating the ServiceBusClient service. When the subobject object processor is disposed it causes the service bus client to close which will no longer maintain a connection and will cause the service to fail.
ServiceBusModelFactory Factory Class
The Service Bus Model Factory can be used to mock the ServiceBusReceivedMessage class.
// service bus message received
var applicationProperties = new Dictionary<string, object>();
applicationProperties.Add("OperationId", rootOperationId);
applicationProperties.Add("ParentId", parentId);
// service bus model factory
var serviceBusReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(
body: BinaryData.FromString(json),
properties: applicationProperties,
deliveryCount: 1);
Message Handler Tests
The general idea here is that we want to test the flow of the message handler to make sure it’s passing and accepting the correct arguments. Typically after consumption either a message is saved in a database or passed to another service for further processing. We want to make sure that everything happens here as expected.
[Fact]
public void MessageHandlerTests()
{
// arrange
CancellationTokenSource cts = new CancellationTokenSource();
// json
var json = "{\"OrderId\":92,\"BillingAddress\":{\"CustomerAddressId\":null,\"FirstName\":\"Jamie\",\"LastName\":\"Bowman\",\"StreetAddress1\":\"123 Street\",\"StreetAddress2\":\"Apt #2\",\"City\":\"Saint Louis\",\"State\":\"MO\",\"PostalCode\":\"12345\",\"Country\":\"USA\"},\"ShippingAddress\":{\"CustomerAddressId\":null,\"FirstName\":\"Jamie\",\"LastName\":\"Bowman\",\"StreetAddress1\":\"123 Street\",\"StreetAddress2\":\"Apt #2\",\"City\":\"Saint Louis\",\"State\":\"MO\",\"PostalCode\":\"12345\",\"Country\":\"USA\"},\"Subtotal\":404,\"Tax\":28.28,\"Total\":432.28}";
// mock
var mockServiceBusReceiver = new Mock<ServiceBusReceiver>();
// vars
var rootOperationId = "operation-id-123";
var parentId = "parent-id-123";
// service bus message received
var applicationProperties = new Dictionary<string, object>();
applicationProperties.Add("OperationId", rootOperationId);
applicationProperties.Add("ParentId", parentId);
// service bus model factory
var serviceBusReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(
body: BinaryData.FromString(json),
properties: applicationProperties,
deliveryCount: 1);
// proccess event args
var processMessageEventArgs = new ProcessMessageEventArgs(serviceBusReceivedMessage, mockServiceBusReceiver.Object, cts.Token);
// logging
var logger = new FakeLogger<ConsumerAzureServiceBus>();
var telemetryClient = TelemetryHelper.GetFakeTelemetryClient();
// configuration
var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration();
azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic";
azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name";
// consumer
var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration);
consumer.ProcessMessageAsync += (string message, string operationId, string parentId, CancellationToken cancellationToken) =>
{
// assertions
operationId.Should().Be(rootOperationId);
parentId.Should().Be(parentId);
return Task.CompletedTask;
};
// act
typeof(ConsumerAzureServiceBus).GetMethod("MessageHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processMessageEventArgs });
// assert logging messages.
var message1 = $"Received Message (queueOrTopic: ({azureServiceBusConsumerConfiguration.QueueOrTopic}), subscriptionName: ({azureServiceBusConsumerConfiguration.SubscriptionName}), Operation ID: ({rootOperationId}), Parent ID: ({parentId}).";
var message2 = $"{json}";
logger.messages.ToString().Should().Contain(message1);
logger.messages.ToString().Should().Contain(message2);
}
Error Handler Tests
This is just a sample of what could be tested.
[Fact]
public void ErrorHandlerTests()
{
// arrange
CancellationTokenSource cts = new CancellationTokenSource();
var exception = new Exception("Error message.");
ServiceBusErrorSource errorSource = new ServiceBusErrorSource();
// process error event args
var processErrorEventArgs = new ProcessErrorEventArgs(exception, errorSource, "", "", cts.Token);
// logging
var logger = new FakeLogger<ConsumerAzureServiceBus>();
var telemetryClient = TelemetryHelper.GetFakeTelemetryClient();
// configuration
var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration();
azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic";
azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name";
// consumer
var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration);
typeof(ConsumerAzureServiceBus).GetMethod("ErrorHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processErrorEventArgs });
// assert
var messages = logger.messages.ToString();
messages.Should().Contain($"ErrorHandler: {exception.Message}");
}
[Fact]
public void ErrorHandlerTests()
{
// arrange
CancellationTokenSource cts = new CancellationTokenSource();
var exception = new Exception("Error message.");
ServiceBusErrorSource errorSource = new ServiceBusErrorSource();
// process error event args
var processErrorEventArgs = new ProcessErrorEventArgs(exception, errorSource, "", "", cts.Token);
// logging
var logger = new FakeLogger<ConsumerAzureServiceBus>();
var telemetryClient = TelemetryHelper.GetFakeTelemetryClient();
// configuration
var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration();
azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic";
azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name";
// consumer
var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration);
typeof(ConsumerAzureServiceBus).GetMethod("ErrorHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processErrorEventArgs });
// assert
var messages = logger.messages.ToString();
messages.Should().Contain($"ErrorHandler: {exception.Message}");
} [Fact]
public void ErrorHandlerTests()
{
// arrange
CancellationTokenSource cts = new CancellationTokenSource();
var exception = new Exception("Error message.");
ServiceBusErrorSource errorSource = new ServiceBusErrorSource();
// process error event args
var processErrorEventArgs = new ProcessErrorEventArgs(exception, errorSource, "", "", cts.Token);
// logging
var logger = new FakeLogger<ConsumerAzureServiceBus>();
var telemetryClient = TelemetryHelper.GetFakeTelemetryClient();
// configuration
var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration();
azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic";
azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name";
// consumer
var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration);
typeof(ConsumerAzureServiceBus).GetMethod("ErrorHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processErrorEventArgs });
// assert
var messages = logger.messages.ToString();
messages.Should().Contain($"ErrorHandler: {exception.Message}");
}
Full Unit Tests
Again all of this is on my GitHub under the Microservice: Application Insights repository.