'Azure.Messaging.ServiceBus Error when completing or abandoning message
Som I'm currently looking into updating our very simple service bus service to the latest version (Asure.Messaging.Servicebus) and I'm running into a smaller problem here.
The thing is I want to complete or abandon received or peaked messages manually by delegating the message back to methods in my service class to handle the job.
Here is my simple class so far, exposed by an interface.
using myProject.Interfaces;
using myProject.Utilities;
using Azure.Messaging.ServiceBus;
using System;
namespace myProject.Services
{
/// <summary>
/// Service bus service controller
/// </summary>
public class ServiceBusService: IServiceBusService
{
private IConfigurationUtility _configurationUtility;
static string connectionString;
static ServiceBusClient client;
static ServiceBusSender sender;
static ServiceBusReceiver receiver;
public ServiceBusService(IConfigurationUtility configurationUtility)
{
_configurationUtility = configurationUtility;
connectionString = _configurationUtility.GetSetting("ServiceBusConnectionString");
client = new ServiceBusClient(connectionString);
}
/// <summary>
/// Sending message.
/// </summary>
/// <param name="messageContent"></param>
/// <param name="queueName"></param>
public void SendMessage(string messageContent, string queueName)
{
sender = client.CreateSender(queueName);
ServiceBusMessage message = new ServiceBusMessage(messageContent);
sender.SendMessageAsync(message).Wait();
}
/// <summary>
/// Receive message.
/// </summary>
/// <returns></returns>
/// <param name="queueName"></param>
public ServiceBusReceivedMessage ReceiveMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage receivedMessage = receiver.ReceiveMessageAsync().Result;
return receivedMessage;
}
/// <summary>
/// Peek message.
/// </summary>
/// <returns></returns>
public ServiceBusReceivedMessage PeekMessage(string queueName)
{
receiver = client.CreateReceiver(queueName);
ServiceBusReceivedMessage peekedMessage = receiver.PeekMessageAsync().Result;
return peekedMessage;
}
/// <summary>
/// Complete message.
/// </summary>
/// <param name="message"></param>
public void CompleteMessage(ServiceBusReceivedMessage message)
{
receiver.CompleteMessageAsync(message).Wait();
}
/// <summary>
/// Abandon message.
/// </summary>
/// <param name="message"></param>
public void AbandonMessage(ServiceBusReceivedMessage message)
{
receiver.AbandonMessageAsync(message).Wait();
}
}
}
everything works well when I'm handling one message at a time, but if I would process two messages at a time for example I get this error:
"The lock supplied is invalid. Either the lock expired, or the message has already been removed from the queue, or was received by a different receiver instance"
And I think the "or was received by a different receiver instance" part is my problem.
Am I thinking about this the wrong way? shouldn't I be able to handle for example: abandoning multiple single received messages at a time?
Thanks in advance.
UPDATE:
So i think i managed to get it working in regards to what Jesse Squire have been suggested to do in reflection to the new Servicebus ducumentation provided by microsoft.
I Made a new class called ServicebusFactory, added the code provided by Jesse below, to test it out and changed the "= new();" part for the initialized "ConcurrentDictionary's" at the top of the example, as initializing them like this resulted in the error below:
CS8400 Feature 'target-typed object creation' is not available in C# 8.0. Please use language version 9.0 or greater.
Result:
public class ServiceBusFactory : IAsyncDisposable, IServiceBusFactory
{
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new ConcurrentDictionary<string, ServiceBusSender>();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new ConcurrentDictionary<string, ServiceBusReceiver>();
public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
public ServiceBusSender GetSender(string entity) =>
_senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
public ServiceBusReceiver GetReceiver(string entity) =>
_receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
}
I then created an interface for my service bus factory and added this as a singelton to the bottom of my "ConfigurationService" method in my "Startup.cs" class, initializing it with my service bus connection string.
Interface:
public interface IServiceBusFactory
{
public ServiceBusSender GetSender(string entity);
public ServiceBusReceiver GetReceiver(string entity);
public ValueTask DisposeAsync();
}
Startup:
var serviceBusConnectionString = "[CONNECTION STRING]";
services.AddSingleton<IServiceBusFactory>(new ServiceBusFactory(serviceBusConnectionString));
Finally it was a matter of dependency inject the servicebusFactory interface to my controller constructor and then use it to get a "sender" and using it to send a message to my queue called "Add".
Constructor:
private readonly IServiceBusFactory _serviceBusFactory;
public ServicebusController(IServiceBusFactory serviceBusFactory)
{
_serviceBusFactory = serviceBusFactory;
}
Controller Method/Action implementation:
var test = new List<ServiceBusMessage>();
test.Add(new ServiceBusMessage("This is a test message."));
var sender = _serviceBusFactory.GetSender("Add");
sender.SendMessagesAsync(test);
Solution 1:[1]
Service Bus associates a message lock with the AMQP link from which the message was received. For the SDK, this means that you must settle the message with the same ServiceBusReceiver
instance that you used to receive it.
In your code, you're creating a new receiver for each ReceiveMessage
call - so when you attempt to complete or abandon the message, you're using a link for which the message is not valid if any other call to ReceiveMessage
has taken place.
Generally, you want to avoid the pattern of creating short-lived Service Bus client objects. They're intended to be long-lived and reused over the lifetime of the application. In your code, you're also implicitly abandoning the senders/receivers without closing them. This is going to orphan the AMQP link until the service force-closes it for being idle after 20 minutes.
I'd recommend pooling your senders/receivers and keeping each as a singleton for the associated queue. Each call to SendMessage
or ReceiveMessage
should for a given queue should use the same sender/receiver instance.
When your application closes, be sure to close or dispose the ServiceBusClient
, which will ensure that all of its child senders/receivers are also cleaned up appropriately.
I'd also very strongly recommend refactoring your class to be async. The sync-over-async pattern that you're using is going to put additional pressure on the thread pool and is likely to result in thread starvation and/or deadlocks under load.
UPDATE
To add some additional context, I'd advise not wrapping Service Bus operations but, instead, have a factory that focuses on managing clients and letting callers interact directly with them.
This ensures that clients are pooled and their lifetimes are managed correctly, while also giving flexibility to callers to hold onto the sender/receiver reference and use for multiple operations rather than paying the cost to retrieve it.
As an example, the following is a simple factory class that you'd create and manage as a singleton in your application. Callers are able to request a sender/receiver for a specific queue/topic/subscription and they'll be created as needed and then pooled for reuse.
// This class is intended to be treated as a singleton for most
// scenarios. Each instance created will open an independent connection
// to the Service Bus namespace, shared by senders and receivers spawned from it.
public class ServiceBusFactory : IAsyncDisposable
{
// If throughput needs scale beyond a single connection, the factory can
// manage multiple clients and ensure that child entities are evenly distributed
// among them.
private readonly ServiceBusClient _client;
private readonly ConcurrentDictionary<string, ServiceBusSender> _senders = new();
private readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers = new();
public ServiceBusFactory(string fullyQualifiedNamespace, TokenCredential credential) => _client = new ServiceBusClient(fullyQualifiedNamespace, credential);
public ServiceBusFactory(string connectionString) => _client = new ServiceBusClient(connectionString);
public ServiceBusSender GetSender(string entity) =>
_senders.GetOrAdd(entity, entity => _client.CreateSender(entity));
public ServiceBusReceiver GetReceiver(string entity) =>
_receivers.GetOrAdd(entity, entity => _client.CreateReceiver(entity));
public async ValueTask DisposeAsync()
{
await _client.DisposeAsync().ConfigureAwait(false);
GC.SuppressFinalize(this);
}
}
Solution 2:[2]
- Please upvote Jesse Squire's answer before or instead of mine.
I have created a slightly different version.
I have the need for multiple service-bus (fully qualified namespaces) instead of just one. (Aka, I need multiple service bus connection strings).
And I needed dead-letter support under the "new-style" of using SubQueues.
Here is my version of the altered code.
at the time of writing, I was using
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.7.0" />
..
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Messaging.ServiceBus;
namespace Me.MyServiceBusFactories
{
// This class is intended to be treated as a singleton for most
// scenarios. Each instance created will open an independent connection
// to the Service Bus namespace, shared by senders and receivers spawned from it.
public class ServiceBusMultipleConnectionsFactory : IAsyncDisposable
{
/* create a (hopefully) thread safe ConcurrentDictionary of Lazy<ServiceBusClient>> */
private static readonly ConcurrentDictionary<string, Lazy<ServiceBusClient>> _clients =
new ConcurrentDictionary<string, Lazy<ServiceBusClient>>();
private static readonly ConcurrentDictionary<string, ServiceBusSender> _senders =
new ConcurrentDictionary<string, ServiceBusSender>();
private static readonly ConcurrentDictionary<string, ServiceBusReceiver> _receivers =
new ConcurrentDictionary<string, ServiceBusReceiver>();
public ServiceBusSender GetSender(string connectionString, string entity) =>
_senders.GetOrAdd(entity,
vf => this.LazyInitAndFindServiceBusClient(connectionString).CreateSender(entity));
public ServiceBusReceiver GetReceiver(string fullyQualifiedNamespace, TokenCredential credential, string entity) =>
_receivers.GetOrAdd(this.ComputeReceiverKeyName(entity, SubQueue.None, ServiceBusReceiveMode.PeekLock),
vf => this.LazyInitAndFindServiceBusClient(fullyQualifiedNamespace, credential).CreateReceiver(entity));
public ServiceBusReceiver GetReceiver(string connectionString, string entity) =>
_receivers.GetOrAdd(this.ComputeReceiverKeyName(entity, SubQueue.None, ServiceBusReceiveMode.PeekLock),
vf => this.LazyInitAndFindServiceBusClient(connectionString).CreateReceiver(entity));
public ServiceBusReceiver GetReceiver(string connectionString, string entity, SubQueue sq,
ServiceBusReceiveMode receiveMode) =>
_receivers.GetOrAdd(this.ComputeReceiverKeyName(entity, sq, receiveMode),
vf => this.LazyInitAndFindServiceBusClient(connectionString).CreateReceiver(entity,
new ServiceBusReceiverOptions() {SubQueue = sq, ReceiveMode = receiveMode}));
private ServiceBusClient LazyInitAndFindServiceBusClient(string fullyQualifiedNamespace,
TokenCredential credential)
{
Lazy<ServiceBusClient> valueFound = _clients.GetOrAdd(
this.ComputeServiceBusClientDictionaryKeyName(null, fullyQualifiedNamespace, credential),
x => new Lazy<ServiceBusClient>(
() => new ServiceBusClient(fullyQualifiedNamespace, credential)));
return valueFound.Value;
}
private ServiceBusClient LazyInitAndFindServiceBusClient(string connectionString)
{
Lazy<ServiceBusClient> valueFound = _clients.GetOrAdd(
this.ComputeServiceBusClientDictionaryKeyName(connectionString, null, null),
x => new Lazy<ServiceBusClient>(
() => new ServiceBusClient(connectionString)));
return valueFound.Value;
}
public async ValueTask DisposeAsync()
{
foreach (KeyValuePair<string, Lazy<ServiceBusClient>> currentClient in _clients)
{
await currentClient.Value.Value.DisposeAsync().ConfigureAwait(false);
}
GC.SuppressFinalize(this);
}
/* Create a Key-Name ConcurrentDictionary based on the multiple pieces of information of uniqueness, some will be null at times based on connection-string VS (namespace and token-credential) */
private string ComputeServiceBusClientDictionaryKeyName(string connectionString, string fullyQualifiedNamespace,
TokenCredential tc)
{
string returnValue =
$"{connectionString?.GetHashCode()}{fullyQualifiedNamespace?.GetHashCode()}{tc?.GetHashCode()}";
return returnValue;
}
/* Create a Key-Name ConcurrentDictionary based on the multiple pieces of information of uniqueness */
private string ComputeReceiverKeyName(string entity, SubQueue sq, ServiceBusReceiveMode receiveMode)
{
string returnValue = $"{entity}{sq.GetHashCode()}{receiveMode.GetHashCode()}";
return returnValue;
}
}
}
...
"usage"
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
public class MyThing : IMyThing
{
private readonly ILogger<MyThing> _logger;
private readonly ServiceBusMultipleConnectionsFactory _busFactory;
public DeadLetterProcessor(ILoggerFactory loggerFactory,
ServiceBusMultipleConnectionsFactory busFactory)
{
this._busFactory = busFactory;
/* not shown, create ILogger from ILoggerFactory */
}
public void DoSomething(string myQueueName)
{
/* I actually inject the POCO that has "MyConnectionString" , but beyond the scope of this answer */
ServiceBusReceiver myReceiver = this._busFactory.GetReceiver("MyConnectionString", myQueueName, SubQueue.DeadLetter,
ServiceBusReceiveMode.ReceiveAndDelete);
}
}
and I DI inject the Factory.
private static IServiceProvider BuildDi(IConfiguration configuration)
{
////setup our DI
IServiceCollection servColl = new ServiceCollection()
.AddLogging()
.AddSingleton<IMyThing, MyThing>()
.AddSingleton<ServiceBusMultipleConnectionsFactory>()
;
Other articles of interest:
https://andrewlock.net/making-getoradd-on-concurrentdictionary-thread-safe-using-lazy/
What happened to EntityNameFormatter (and FormatDeadLetterPath ) ???
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | granadaCoder |