
.NET: Unit Testing Azure Service Bus
Unit testing an Azure Service Bus consumer is fairly easy. In this tutorial, I will use .NET6, Moq (v4), and Fluent Assertions. The difficult part is constructing the ServiceBusMessageReceived because it has only internal constructors and no interface. However, there is a factory service that can create these objects.
GitHub
I’ve included unit testing in this project.
https://github.com/mrjamiebowman-blog/microservices-application-insights
What should be unit tested?
There are two areas that I focus on when testing consumer service. That is message processing and error handling.
Message Handler
This is the main method for processing incoming data from Azure Service Bus. This can be tested to make sure data is properly deserialized, and distributed to the correct services appropriately.
Error Handler
The Error Handler method can handle specific cases where the service bus client fails. It may be useful to unit test this code if there are certain actions that need to happen during a failure.
ConsumerAzureServiceBus Service Class
This is the code that we will be testing to put things in context.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
public class ConsumerAzureServiceBus : IConsumerService, IConsumerAzureServiceBus { private readonly ILogger<ConsumerAzureServiceBus> _logger; private readonly TelemetryClient _telemetryClient; private readonly AzureServiceBusConsumerConfiguration _azureServiceBusConfiguration; private ServiceBusProcessor _processor; private CancellationToken _cancellationToken; #region Event Delegate /// <summary> /// Note: I typically don't do this since microservices are atomic in design. /// However, for this demo, I will share a common service bus consumer between /// projects. In theory this could be done and would work fine but it creates a /// dependency between the microservices through shared code. This technique /// is something to carefuly consider. /// </summary> // delegates and events for message processing public delegate Task MessageReceivedAsync(string message, string operationId, string parentId, CancellationToken cancellationToken); public event IConsumerService.MessageReceivedAsync ProcessMessageAsync; // object lock object objectLock = new object(); event IConsumerService.MessageReceivedAsync IConsumerService.ProcessMessageAsync { add { lock (objectLock) { ProcessMessageAsync += value; } } remove { lock (objectLock) { ProcessMessageAsync -= value; } } } #endregion public ConsumerAzureServiceBus(ILogger<ConsumerAzureServiceBus> logger, TelemetryClient telemetryClient, AzureServiceBusConsumerConfiguration azureServiceBusConsumerConfiguration) { _logger = logger; _telemetryClient = telemetryClient; _azureServiceBusConfiguration = azureServiceBusConsumerConfiguration; } public void LogStartupInformation() { _logger.LogInformation("Azure Service Bus Consumer Starting."); } public Task StartConsumingAsync(CancellationToken cancellationToken) { LogStartupInformation(); return StartReceivingMessagesAsync(_azureServiceBusConfiguration.QueueOrTopic, _azureServiceBusConfiguration.SubscriptionName, cancellationToken); } [ExcludeFromCodeCoverage] public virtual ServiceBusClient GetServiceBusClient() { return new ServiceBusClient(_azureServiceBusConfiguration.ConnectionString); } public async Task StartReceivingMessagesAsync(string queueOrTopic, string subscriptionName, CancellationToken cancellationToken) { try { // get service bus client var client = GetServiceBusClient(); // create a processor _processor = client.CreateProcessor(queueOrTopic, subscriptionName, new ServiceBusProcessorOptions()); // cancellation token _cancellationToken = cancellationToken; // process message handler _processor.ProcessMessageAsync += MessageHandler; // error handler _processor.ProcessErrorAsync += ErrorHandler; // start processing await _processor.StartProcessingAsync(_cancellationToken); } catch (Exception ex) { _logger.LogError($"ConsumerAzureServiceBus->ReceiveMessageAsync(queueOrTopic: ({queueOrTopic}), subscriptionName: ({subscriptionName}).", ex); } } public Task StopConsumingAsync(CancellationToken token) { throw new NotImplementedException(); } public async Task MessageHandler(ProcessMessageEventArgs args) { // message body var body = args.Message.Body.ToString(); // extract root operation id and parent id (var rootOperationId, var parentId) = args.Message.GetCorrelationIds(); using (var operation = _telemetryClient.StartOperation<RequestTelemetry>("ServiceBusProcessor.ProcessMessage", rootOperationId, parentId)) { // log information _logger.LogInformation($"Received Message (queueOrTopic: ({_azureServiceBusConfiguration.QueueOrTopic}), subscriptionName: ({_azureServiceBusConfiguration.SubscriptionName}), Operation ID: ({rootOperationId}), Parent ID: ({parentId})."); _logger.LogInformation($"{body}"); // update parent id parentId = operation.Telemetry.Id; // process message if (ProcessMessageAsync != null) { await ProcessMessageAsync?.Invoke(body, rootOperationId, parentId, _cancellationToken); } // we can evaluate application logic and use that to determine how to settle the message. await args.CompleteMessageAsync(args.Message); } } public Task ErrorHandler(ProcessErrorEventArgs args) { var ex = args?.Exception; // log error _logger.LogError($"ErrorHandler: {args.Exception.Message}"); // the error source tells me at what point in the processing an error occurred _logger.LogInformation(args.ErrorSource.ToString()); _logger.LogInformation(args.FullyQualifiedNamespace); _logger.LogInformation(args.EntityPath); _logger.LogInformation(args.Exception.ToString()); if (ex is ServiceBusException) { var asbEx = (ServiceBusException)ex; if (asbEx.Reason == ServiceBusFailureReason.MessagingEntityNotFound) { // queue or topic does not exist return Task.CompletedTask; } } return Task.CompletedTask; } } |
Mocking the Service Bus Client
I’ve tried these several ways and I think this is the simplest. Moq can only mock public and virtual methods. An easy and effective way to implement a mocked Service Bus Client is to create a public virtual method that returns the service bus client. I’m also excluding this from code coverage since this is specifically here for unit testing.
1 2 3 4 5 |
[ExcludeFromCodeCoverage] public virtual ServiceBusClient GetServiceBusClient() { return new ServiceBusClient(_azureServiceBusConfiguration.ConnectionString); } |
In the StartReceivingMessageAsync() method I can get the client doing this.
1 2 3 4 5 6 7 8 9 |
public async Task StartReceivingMessagesAsync(string queueOrTopic, string subscriptionName, CancellationToken cancellationToken) { try { // get service bus client var client = GetServiceBusClient(); // create a processor _processor = client.CreateProcessor(queueOrTopic, subscriptionName, new ServiceBusProcessorOptions()); |
Why injecting the ServiceBusClient didn’t work out.
Someone recommended injecting the ServiceBusClient and I thought that was worth trying. That doesn’t work in the end for several reasons. There isn’t a method to connect or reconnect that I know of and the only way a connection is created is through instantiating the ServiceBusClient service. When the subobject object processor is disposed it causes the service bus client to close which will no longer maintain a connection and will cause the service to fail.
ServiceBusModelFactory Factory Class
The Service Bus Model Factory can be used to mock the ServiceBusReceivedMessage class.
1 2 3 4 5 6 7 8 9 10 |
// service bus message received var applicationProperties = new Dictionary<string, object>(); applicationProperties.Add("OperationId", rootOperationId); applicationProperties.Add("ParentId", parentId); // service bus model factory var serviceBusReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage( body: BinaryData.FromString(json), properties: applicationProperties, deliveryCount: 1); |
Message Handler Tests
The general idea here is that we want to test the flow of the message handler to make sure it’s passing and accepting the correct arguments. Typically after consumption either a message is saved in a database or passed to another service for further processing. We want to make sure that everything happens here as expected.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
[Fact] public void MessageHandlerTests() { // arrange CancellationTokenSource cts = new CancellationTokenSource(); // json var json = "{\"OrderId\":92,\"BillingAddress\":{\"CustomerAddressId\":null,\"FirstName\":\"Jamie\",\"LastName\":\"Bowman\",\"StreetAddress1\":\"123 Street\",\"StreetAddress2\":\"Apt #2\",\"City\":\"Saint Louis\",\"State\":\"MO\",\"PostalCode\":\"12345\",\"Country\":\"USA\"},\"ShippingAddress\":{\"CustomerAddressId\":null,\"FirstName\":\"Jamie\",\"LastName\":\"Bowman\",\"StreetAddress1\":\"123 Street\",\"StreetAddress2\":\"Apt #2\",\"City\":\"Saint Louis\",\"State\":\"MO\",\"PostalCode\":\"12345\",\"Country\":\"USA\"},\"Subtotal\":404,\"Tax\":28.28,\"Total\":432.28}"; // mock var mockServiceBusReceiver = new Mock<ServiceBusReceiver>(); // vars var rootOperationId = "operation-id-123"; var parentId = "parent-id-123"; // service bus message received var applicationProperties = new Dictionary<string, object>(); applicationProperties.Add("OperationId", rootOperationId); applicationProperties.Add("ParentId", parentId); // service bus model factory var serviceBusReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage( body: BinaryData.FromString(json), properties: applicationProperties, deliveryCount: 1); // proccess event args var processMessageEventArgs = new ProcessMessageEventArgs(serviceBusReceivedMessage, mockServiceBusReceiver.Object, cts.Token); // logging var logger = new FakeLogger<ConsumerAzureServiceBus>(); var telemetryClient = TelemetryHelper.GetFakeTelemetryClient(); // configuration var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration(); azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic"; azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name"; // consumer var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration); consumer.ProcessMessageAsync += (string message, string operationId, string parentId, CancellationToken cancellationToken) => { // assertions operationId.Should().Be(rootOperationId); parentId.Should().Be(parentId); return Task.CompletedTask; }; // act typeof(ConsumerAzureServiceBus).GetMethod("MessageHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processMessageEventArgs }); // assert logging messages. var message1 = $"Received Message (queueOrTopic: ({azureServiceBusConsumerConfiguration.QueueOrTopic}), subscriptionName: ({azureServiceBusConsumerConfiguration.SubscriptionName}), Operation ID: ({rootOperationId}), Parent ID: ({parentId})."; var message2 = $"{json}"; logger.messages.ToString().Should().Contain(message1); logger.messages.ToString().Should().Contain(message2); } |
Error Handler Tests
This is just a sample of what could be tested.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
[Fact] public void ErrorHandlerTests() { // arrange CancellationTokenSource cts = new CancellationTokenSource(); var exception = new Exception("Error message."); ServiceBusErrorSource errorSource = new ServiceBusErrorSource(); // process error event args var processErrorEventArgs = new ProcessErrorEventArgs(exception, errorSource, "", "", cts.Token); // logging var logger = new FakeLogger<ConsumerAzureServiceBus>(); var telemetryClient = TelemetryHelper.GetFakeTelemetryClient(); // configuration var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration(); azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic"; azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name"; // consumer var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration); typeof(ConsumerAzureServiceBus).GetMethod("ErrorHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processErrorEventArgs }); // assert var messages = logger.messages.ToString(); messages.Should().Contain($"ErrorHandler: {exception.Message}"); } [Fact] public void ErrorHandlerTests() { // arrange CancellationTokenSource cts = new CancellationTokenSource(); var exception = new Exception("Error message."); ServiceBusErrorSource errorSource = new ServiceBusErrorSource(); // process error event args var processErrorEventArgs = new ProcessErrorEventArgs(exception, errorSource, "", "", cts.Token); // logging var logger = new FakeLogger<ConsumerAzureServiceBus>(); var telemetryClient = TelemetryHelper.GetFakeTelemetryClient(); // configuration var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration(); azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic"; azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name"; // consumer var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration); typeof(ConsumerAzureServiceBus).GetMethod("ErrorHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processErrorEventArgs }); // assert var messages = logger.messages.ToString(); messages.Should().Contain($"ErrorHandler: {exception.Message}"); } [Fact] public void ErrorHandlerTests() { // arrange CancellationTokenSource cts = new CancellationTokenSource(); var exception = new Exception("Error message."); ServiceBusErrorSource errorSource = new ServiceBusErrorSource(); // process error event args var processErrorEventArgs = new ProcessErrorEventArgs(exception, errorSource, "", "", cts.Token); // logging var logger = new FakeLogger<ConsumerAzureServiceBus>(); var telemetryClient = TelemetryHelper.GetFakeTelemetryClient(); // configuration var azureServiceBusConsumerConfiguration = new AzureServiceBusConsumerConfiguration(); azureServiceBusConsumerConfiguration.QueueOrTopic = "queue-or-topic"; azureServiceBusConsumerConfiguration.SubscriptionName = "subscription-name"; // consumer var consumer = new ConsumerAzureServiceBus(logger, telemetryClient, azureServiceBusConsumerConfiguration); typeof(ConsumerAzureServiceBus).GetMethod("ErrorHandler", BindingFlags.Public | BindingFlags.Instance).Invoke(consumer, new Object[] { processErrorEventArgs }); // assert var messages = logger.messages.ToString(); messages.Should().Contain($"ErrorHandler: {exception.Message}"); } |
Full Unit Tests
Again all of this is on my GitHub under the Microservice: Application Insights repository.