.NET: Unit Testing Azure Service Bus

Unit testing an Azure Service Bus consumer is fairly easy. In this tutorial, I will use .NET6, Moq (v4), and Fluent Assertions. The difficult part is constructing the ServiceBusMessageReceived because it has only internal constructors and no interface. However, there is a factory service that can create these objects.

GitHub

I’ve included unit testing in this project.

https://github.com/mrjamiebowman-blog/microservices-application-insights

https://github.com/mrjamiebowman-blog/microservices-application-insights/blob/main/test/MrJB.MS.Common.Tests/Services/ConsumerAzureServiceBusTests.cs

What should be unit tested?

There are two areas that I focus on when testing consumer service. That is message processing and error handling.

Message Handler

This is the main method for processing incoming data from Azure Service Bus. This can be tested to make sure data is properly deserialized, and distributed to the correct services appropriately.

Error Handler

The Error Handler method can handle specific cases where the service bus client fails. It may be useful to unit test this code if there are certain actions that need to happen during a failure.

ConsumerAzureServiceBus Service Class

This is the code that we will be testing to put things in context.

public class ConsumerAzureServiceBus : IConsumerService, IConsumerAzureServiceBus
{
    private readonly ILogger<ConsumerAzureServiceBus> _logger;
    private readonly TelemetryClient _telemetryClient;

    private readonly AzureServiceBusConsumerConfiguration _azureServiceBusConfiguration;

    private ServiceBusProcessor _processor;
    private CancellationToken _cancellationToken;

    #region Event Delegate

    /// <summary>
    /// Note: I typically don't do this since microservices are atomic in design.
    /// However, for this demo, I will share a common service bus consumer between
    /// projects. In theory this could be done and would work fine but it creates a
    /// dependency between the microservices through shared code. This technique
    /// is something to carefuly consider.
    /// </summary>

    // delegates and events for message processing
    public delegate Task MessageReceivedAsync(string message, string operationId, string parentId, CancellationToken cancellationToken);
    public event IConsumerService.MessageReceivedAsync ProcessMessageAsync;

    // object lock
    object objectLock = new object();

    event IConsumerService.MessageReceivedAsync IConsumerService.ProcessMessageAsync
    {
        add
        {
            lock (objectLock)
            {
                ProcessMessageAsync += value;
            }
        }

        remove
        {
            lock (objectLock)
            {
                ProcessMessageAsync -= value;
            }
        }
    }

    #endregion

    public ConsumerAzureServiceBus(ILogger<ConsumerAzureServiceBus> logger, TelemetryClient telemetryClient, AzureServiceBusConsumerConfiguration azureServiceBusConsumerConfiguration)
    {
        _logger = logger;
        _telemetryClient = telemetryClient;
        _azureServiceBusConfiguration = azureServiceBusConsumerConfiguration;
    }

    public void LogStartupInformation()
    {
        _logger.LogInformation("Azure Service Bus Consumer Starting.");
    }

    public Task StartConsumingAsync(CancellationToken cancellationToken)
    {
        LogStartupInformation();
        return StartReceivingMessagesAsync(_azureServiceBusConfiguration.QueueOrTopic, _azureServiceBusConfiguration.SubscriptionName, cancellationToken);
    }

    [ExcludeFromCodeCoverage]
    public virtual ServiceBusClient GetServiceBusClient()
    {
        return new ServiceBusClient(_azureServiceBusConfiguration.ConnectionString);
    }

    public async Task StartReceivingMessagesAsync(string queueOrTopic, string subscriptionName, CancellationToken cancellationToken)
    {
        try
        {
            // get service bus client
            var client = GetServiceBusClient();

            // create a processor
            _processor = client.CreateProcessor(queueOrTopic, subscriptionName, new ServiceBusProcessorOptions());

            // cancellation token
            _cancellationToken = cancellationToken;

            // process message handler
            _processor.ProcessMessageAsync += MessageHandler;

            // error handler
            _processor.ProcessErrorAsync += ErrorHandler;

            // start processing
            await _processor.StartProcessingAsync(_cancellationToken);
        }
        catch (Exception ex)
        {
            _logger.LogError($"ConsumerAzureServiceBus->ReceiveMessageAsync(queueOrTopic: ({queueOrTopic}), subscriptionName: ({subscriptionName}).", ex);
        }
    }

    public Task StopConsumingAsync(CancellationToken token)
    {
        throw new NotImplementedException();
    }

    public async Task MessageHandler(ProcessMessageEventArgs args)
    {
        // message body
        var body = args.Message.Body.ToString();

        // extract root operation id and parent id
        (var rootOperationId, var parentId) = args.Message.GetCorrelationIds();

        using (var operation = _telemetryClient.StartOperation<RequestTelemetry>("ServiceBusProcessor.ProcessMessage", rootOperationId, parentId))
        {
            // log information
            _logger.LogInformation($"Received Message (queueOrTopic: ({_azureServiceBusConfiguration.QueueOrTopic}), subscriptionName: ({_azureServiceBusConfiguration.SubscriptionName}), Operation ID: ({rootOperationId}), Parent ID: ({parentId}).");
            _logger.LogInformation($"{body}");

            // update parent id
            parentId = operation.Telemetry.Id;

            // process message
            if (ProcessMessageAsync != null) {
                await ProcessMessageAsync?.Invoke(body, rootOperationId, parentId, _cancellationToken);
            }

            // we can evaluate application logic and use that to determine how to settle the message.
            await args.CompleteMessageAsync(args.Message);
        }
    }

    public Task ErrorHandler(ProcessErrorEventArgs args)
    {
        var ex = args?.Exception;

        // log error
        _logger.LogError($"ErrorHandler: {args.Exception.Message}");

        // the error source tells me at what point in the processing an error occurred
        _logger.LogInformation(args.ErrorSource.ToString());
        _logger.LogInformation(args.FullyQualifiedNamespace);
        _logger.LogInformation(args.EntityPath);
        _logger.LogInformation(args.Exception.ToString());

        if (ex is ServiceBusException)
        {
            var asbEx = (ServiceBusException)ex;

            if (asbEx.Reason == ServiceBusFailureReason.MessagingEntityNotFound)
            {
                // queue or topic does not exist
                return Task.CompletedTask;
            }
        }

        return Task.CompletedTask;
    }
}

Mocking the Service Bus Client

I’ve tried these several ways and I think this is the simplest. Moq can only mock public and virtual methods. An easy and effective way to implement a mocked Service Bus Client is to create a public virtual method that returns the service bus client. I’m also excluding this from code coverage since this is specifically here for unit testing.

    [ExcludeFromCodeCoverage]
    public virtual ServiceBusClient GetServiceBusClient()
    {
        return new ServiceBusClient(_azureServiceBusConfiguration.ConnectionString);
    }

In the StartReceivingMessageAsync() method I can get the client doing this.

    public async Task StartReceivingMessagesAsync(string queueOrTopic, string subscriptionName, CancellationToken cancellationToken)
    {
        try
        {
            // get service bus client
            var client = GetServiceBusClient();

            // create a processor
            _processor = client.CreateProcessor(queueOrTopic, subscriptionName, new ServiceBusProcessorOptions());

Why injecting the ServiceBusClient didn’t work out.

Someone recommended injecting the ServiceBusClient and I thought that was worth trying. That doesn’t work in the end for several reasons. There isn’t a method to connect or reconnect that I know of and the only way a connection is created is through instantiating the ServiceBusClient service. When the subobject object processor is disposed it causes the service bus client to close which will no longer maintain a connection and will cause the service to fail.

ServiceBusModelFactory Factory Class

The Service Bus Model Factory can be used to mock the ServiceBusReceivedMessage class.

            // service bus message received
            var applicationProperties = new Dictionary<string, object>();
            applicationProperties.Add("OperationId", rootOperationId);
            applicationProperties.Add("ParentId", parentId);

            // service bus model factory
            var serviceBusReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(
                    body: BinaryData.FromString(json),
                    properties: applicationProperties,
                    deliveryCount: 1);

Message Handler Tests

The general idea here is that we want to test the flow of the message handler to make sure it’s passing and accepting the correct arguments. Typically after consumption either a message is saved in a database or passed to another service for further processing. We want to make sure that everything happens here as expected.

        [Fact]
        public void MessageHandlerTests()
        {
            // arrange
            CancellationTokenSource cts = new CancellationTokenSource();

            // json
            var json = "{\"OrderId\":92,\"BillingAddress\":{\"CustomerAddressId\":null,\"FirstName\":\"Jamie\",\"LastName\":\"Bowman\",\"StreetAddress1\":\"123 Street\",\"StreetAddress2\":\"Apt #2\",\"City\":\"Saint Louis\",\"State\":\"MO\",\"PostalCode\":\"12345\",\"Country\":\"USA\"},\"ShippingAddress\":{\"CustomerAddressId\":null,\"FirstName\":\"Jamie\",\"LastName\":\"Bowman\",\"StreetAddress1\":\"123 Street\",\"StreetAddress2\":\"Apt #2\",\"City\":\"Saint Louis\",\"State\":\"MO\",\"PostalCode\":\"12345\",\"Country\":\"USA\"},\"Subtotal\":404,\"Tax\":28.28,\"Total\":432.28}";

            // mock
            var mockServiceBusReceiver = new Mock<ServiceBusReceiver>();

            // vars
            var rootOperationId = "operation-id-123";
            var parentId = "parent-id-123";

            // service bus message received
            var applicationProperties = new Dictionary<string, object>();
            applicationProperties.Add("OperationId", rootOperationId);
            applicationProperties.Add("ParentId", parentId);

            // service bus model factory
            var serviceBusReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(
                    body: BinaryData.FromString(json),
                    properties: applicationProperties,
                    deliveryCount: 1);

            // proccess event args
            var processMessageEventArgs = new ProcessMessageEventArgs(serviceBusReceivedMessage, mockServiceBusReceiver.Object, cts.Token);

            // logging
            var logger = new FakeLogger<ConsumerAzureServiceBus>();
            var telemetryClient = TelemetryHelper.GetFakeTelemetryClient();

            // configuration
            var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration();
            azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic";
            azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name";

            // consumer
            var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration);

            consumer.ProcessMessageAsync += (string message, string operationId, string parentId, CancellationToken cancellationToken) =>
            {
                // assertions
                operationId.Should().Be(rootOperationId);
                parentId.Should().Be(parentId);

                return Task.CompletedTask;
            };

            // act
            typeof(ConsumerAzureServiceBus).GetMethod("MessageHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processMessageEventArgs });

            // assert logging messages.

            var message1 = $"Received Message (queueOrTopic: ({azureServiceBusConsumerConfiguration.QueueOrTopic}), subscriptionName: ({azureServiceBusConsumerConfiguration.SubscriptionName}), Operation ID: ({rootOperationId}), Parent ID: ({parentId}).";
            var message2 = $"{json}";

            logger.messages.ToString().Should().Contain(message1);
            logger.messages.ToString().Should().Contain(message2);
        }

Error Handler Tests

This is just a sample of what could be tested.

    [Fact]
    public void ErrorHandlerTests()
    {
        // arrange
        CancellationTokenSource cts = new CancellationTokenSource();

        var exception = new Exception("Error message.");
        ServiceBusErrorSource errorSource = new ServiceBusErrorSource();

        // process error event args
        var processErrorEventArgs = new ProcessErrorEventArgs(exception, errorSource, "", "", cts.Token);

        // logging
        var logger = new FakeLogger<ConsumerAzureServiceBus>();
        var telemetryClient = TelemetryHelper.GetFakeTelemetryClient();

        // configuration
        var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration();
        azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic";
        azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name";

        // consumer
        var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration);

        typeof(ConsumerAzureServiceBus).GetMethod("ErrorHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processErrorEventArgs });

        // assert
        var messages = logger.messages.ToString();

        messages.Should().Contain($"ErrorHandler: {exception.Message}");

    }
        [Fact]
        public void ErrorHandlerTests()
        {
            // arrange
            CancellationTokenSource cts = new CancellationTokenSource();

            var exception = new Exception("Error message.");
            ServiceBusErrorSource errorSource = new ServiceBusErrorSource();

            // process error event args
            var processErrorEventArgs = new ProcessErrorEventArgs(exception, errorSource, "", "", cts.Token);

            // logging
            var logger = new FakeLogger<ConsumerAzureServiceBus>();
            var telemetryClient = TelemetryHelper.GetFakeTelemetryClient();

            // configuration
            var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration();
            azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic";
            azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name";

            // consumer
            var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration);

            typeof(ConsumerAzureServiceBus).GetMethod("ErrorHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processErrorEventArgs });

            // assert
            var messages = logger.messages.ToString();

            messages.Should().Contain($"ErrorHandler: {exception.Message}");
        }    [Fact]
    public void ErrorHandlerTests()
    {
        // arrange
        CancellationTokenSource cts = new CancellationTokenSource();

        var exception = new Exception("Error message.");
        ServiceBusErrorSource errorSource = new ServiceBusErrorSource();

        // process error event args
        var processErrorEventArgs = new ProcessErrorEventArgs(exception, errorSource, "", "", cts.Token);

        // logging
        var logger = new FakeLogger<ConsumerAzureServiceBus>();
        var telemetryClient = TelemetryHelper.GetFakeTelemetryClient();

        // configuration
        var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration();
        azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic";
        azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name";

        // consumer
        var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration);

        typeof(ConsumerAzureServiceBus).GetMethod("ErrorHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processErrorEventArgs });

        // assert
        var messages = logger.messages.ToString();

        messages.Should().Contain($"ErrorHandler: {exception.Message}");

    }

Full Unit Tests

Again all of this is on my GitHub under the Microservice: Application Insights repository.

https://github.com/mrjamiebowman-blog/microservices-application-insights/blob/main/test/MrJB.MS.Common.Tests/Services/ConsumerAzureServiceBusTests.cs