Problem
How to use Azure Service Bus in .NET Core.
Solution
Create a class library and add NuGet package: Microsoft.Azure.ServiceBus
.
Add a class
to encapsulate settings:
public class AzureQueueSettings
{
public AzureQueueSettings(string connectionString, string queueName)
{
if (string.IsNullOrEmpty(connectionString))
throw new ArgumentNullException("connectionString");
if (string.IsNullOrEmpty(queueName))
throw new ArgumentNullException("queueName");
this.ConnectionString = connectionString;
this.QueueName = queueName;
}
public string ConnectionString { get; }
public string QueueName { get; }
}
Add a class
to wrap functionality of sending messages to queue:
public class AzureQueueSender<T> : IAzureQueueSender<T> where T : class
{
public AzureQueueSender(AzureQueueSettings settings)
{
this.settings = settings;
Init();
}
public async Task SendAsync(T item, Dictionary<string, object> properties)
{
var json = JsonConvert.SerializeObject(item);
var message = new Message(Encoding.UTF8.GetBytes(json));
if (properties != null)
{
foreach (var prop in properties)
{
message.UserProperties.Add(prop.Key, prop.Value);
}
}
await client.SendAsync(message);
}
private AzureQueueSettings settings;
private QueueClient client;
private void Init()
{
client = new QueueClient(
this.settings.ConnectionString, this.settings.QueueName);
}
}
Add a class
to wrap functionality of receiving messages from the queue:
public void Receive(
Func<T, MessageProcessResponse> onProcess,
Action<Exception> onError,
Action onWait)
{
var options = new MessageHandlerOptions(e =>
{
onError(e.Exception);
return Task.CompletedTask;
})
{
AutoComplete = false,
MaxAutoRenewDuration = TimeSpan.FromMinutes(1)
};
client.RegisterMessageHandler(
async (message, token) =>
{
try
{
var data = Encoding.UTF8.GetString(message.Body);
T item = JsonConvert.DeserializeObject<T>(data);
var result = onProcess(item);
if (result == MessageProcessResponse.Complete)
await client.CompleteAsync(message.SystemProperties.LockToken);
else if (result == MessageProcessResponse.Abandon)
await client.AbandonAsync(message.SystemProperties.LockToken);
else if (result == MessageProcessResponse.Dead)
await client.DeadLetterAsync(message.SystemProperties.LockToken);
onWait();
}
catch (Exception ex)
{
await client.DeadLetterAsync(message.SystemProperties.LockToken);
onError(ex);
}
}, options);
}
Now you can use these wrapper class
es to send message:
var settings = new AzureQueueSettings(
connectionString: config["ServiceBus_ConnectionString"],
queueName: config["ServiceBus_QueueName"]);
var message = new Message { Text = "Hello Queue" };
IAzureQueueSender<Message> sender = new AzureQueueSender<Message>(settings);
await sender.SendAsync(message);
And receive messages:
IAzureQueueReceiver<Message> receiver =
new AzureQueueReceiver<Message>(settings);
receiver.Receive(
message =>
{
Console.WriteLine(message.Text);
return MessageProcessResponse.Complete;
},
ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Waiting..."));
NOTE: The sample code also includes wrappers for topics and subscriptions.
Discussion
The sample code will require you to setup Azure account and Service Bus. Instructions for these could be found here.