Skip to content
José Novais Web site
  • Home
  • Blog
  • Projects
  • Astro photos
  • About
    • About me
    • Contact
  • Login
2021-10-17 by José Novais
Projects, RabbitMq

JN.RabbitMQClient – RabbitMQ consumer and sender

JN.RabbitMQClient – RabbitMQ consumer and sender
2021-10-17 by José Novais
Projects, RabbitMq
logo

This is a simple implementation of RabbitMQ consumer and sender. It was initially developed with the aim of learning how to use the official RabbitMQ c# library in simple scenarios. It is in use in several projects in production environments and has proved to be stable. These projects include Rest APIs, windows services, .net Core services (windows and Linux) and others.

  

Features

  • Sender implementation
  • Multiple consumer instances supported
  • Multiple processing options for received messages
  • Random expiration for messages sent to an holding queue (depending on the processing option)
  • TLS connection support
  • Limiter for message processing
  • Message properties for more advanced scenarios such as queues with support for priority messages, messages Headers, etc.

Current version

Current version id 2.4.3

Release notes for version 2.4.3

  • Added service extensions AddConsumersService() and AddSenderService() (in namespace JN.RabbitMQClient.Extensions)
  • Added ConnectionDetails readonly property
  • Renamed property GetTotalConsumers to TotalConsumers
  • Renamed property GetTotalRunningConsumers to TotalRunningConsumers
  • Updated RabbitMQ.Client to the latest version

Release notes for version 2.4.2

  • Update RabbitMQ.Client to latest version.
  • Added ConsumersPrefetch property (in consumer service)
  • Bug fixes

Release notes for version 2.4.1

  • Update RabbitMQ.Client to latest version.
  • Bug fixes

Release notes for version 2.4.0

  • Added support for message properties (in sender and consumer classes); messages can now be sent for more advanced scenarios, such as queues with support for priority messages, messages Headers, etc.
  • Merged sender classes; feature for keep connection open was imported to the main sender class
  • Changed type for MaxChannelsPerConnection property (in consumer service)
  • Bug fixes

Release notes for version 2.3.4

  • Added support for additional information to be passed to the processing delegate; the processing instruction is now an object MessageProcessInstruction  where that additional information can be passed. Useful for when a message is requeued with delay to pass information to the next processing attempt.

Release notes for version 2.3.3

  • Update target frameworks; added .NETFramework4.6.1
  • Update RabbitMQ.Client to latest version
  • Update consumer to expose MaxChannelsPerConnection property

Release notes for version 2.3.2

  • Added sender service that keeps connection open (RabbitMqSenderService2 class)

Release notes for version 2.3.0

  • Update RabbitMQ.Client Library to 6.2.1
  • Changed namespace for IRabbitMqConsumerService and IRabbitMqSenderService
  • Changed behavior for StopConsumers(consumerTag) – now stops all consumers with tag starting with ‘consumerTag’
  • Added limiter feature

Release notes for version 2.2.1

  • Update RabbitMQ.Client Library to 6.0.0
  • Upgrade to .NET Standard 2.1
  • Solved bug in connect port
  • TLS connection support

Install

Download the package from NuGet:

Install-Package JN.RabbitMQClient -version [version number]

The package is available here and source code is available here.

About sending messages

There are two services available to send messages:

  • service RabbitMqSenderService that implements IRabbitMqSenderService interface. It will create one connection per message sent. It is not suitable for sending large amounts of messages.
  • service RabbitMqSenderService2 that implements IRabbitMqSenderServiceKeepConnection interface. It will keep the connection open while the object is not disposed. It is most suitable for sending large amounts of messages.

Usage – consuming messages

First, you must create the RabbitMqConsumerService and then define delegates for ReceiveMessage, ShutdownConsumer and ReceiveMessageError. The service will start the required number of consumers when StartConsumers is called. To use a retry queue, the method StartConsumers should be called with a RetryQueueDetails object.

Message processing instructions

The ReceiveMessage delegate receives and processes the message. After the message is processed it returns a message processing instruction (object MessageProcessInstruction ).

Instructions

OK – message is considered as successfully processed

RequeueMessageWithDelay – message is removed from the queue, but sent to a retry queue for later processing (typically with a dead letter configuration)

IgnoreMessage – message is removed from the queue and ignored

IgnoreMessageWithRequeue – message is rejected and sent back to the queue

Requeue message with delay

The RequeueMessageWithDelay processing instructions allows a message to be processed later. This is to be used with a secondary queue that will receive the message to be processed. When the message is sent to that queue the timestamp and expiration properties are set. Later, when the message expires on the secondary queue, it is sent back to the main queue. When that happens, the timestamp can be verified and if the elapsed time is longer than allowed, then the message can be ignored (with IgnoreMessage instruction).

For this to work, a configuration like the following could be used.

Example

  • MainQeue – main queue where consumers are connected
  • HoldingQueue – secondary queue to hold retry messages; when a message needs to be processed later it will be sent to this queue.
  • TestExchangeHolding – a dead letter exchange to redirect messages from HoldingQueue to MainQeue when they expire

Configuration

  • HoldingQueue should be configured with “x-dead-letter-exchange” parameter as “TestExchangeHolding”.
  • TestExchangeHolding exchange should have a binding to MainQeue

Consumer configuration

To use a retry queue, consumers must be configured. When consumers are started a RetryQueueDetails object must be provided.

Example:

var details = new RetryQueueDetails
{
    RetryQueue="HoldingQueue",
    RetentionPeriodInRetryQueueMilliseconds = 1000,
    RetentionPeriodInRetryQueueMillisecondsMax = 5000
};

This will define the retry queue as “HoldingQueue” and the retention period for each message will be a random value from 1 to 5 seconds. To disabled the random value RetentionPeriodInRetryQueueMillisecondsMax can be set to 0 or to same value as RetentionPeriodInRetryQueueMilliseconds.

About TLS connect support

It is possible to connect to a RabbitMQ using TLS. For this, UseTLS must be true in the configuration object. See the example below.

Client certificates are not supported.

Processing limiter

We can limit the processing of messages. This can be useful if consumers are unable to process all messages or simply need to slow down the processing of messages.

For this we have to provide an implementation of the ILimiter interface to the consumer service. Please see next example.

public class MyApp
{
    private readonly IRabbitMqConsumerService _consumerService;
    private readonly IRabbitMqSenderService _senderService;
    private readonly AppConfig _config;
	
    public MyApp(IRabbitMqConsumerService consumerService, IRabbitMqSenderService senderService, ILimiter limiter)
    {
        _consumerService = consumerService;
        _senderService = senderService;

        _consumerService.ServiceDescription = "Consumer Service";
        _consumerService.ReceiveMessage += ProcessMessage;
        _consumerService.ShutdownConsumer += ProcessShutdown;
        _consumerService.ReceiveMessageError += ProcessError;

        _consumerService.Limiter = limiter; // setup the limiter

        _senderService.ServiceDescription = "Sender Service";

    }

    //... ...
}

It’s important to note that messages are always removed from the queue. The ILimiter provided to consumer service will decide if the received message can be processed or not – method IsAllowed(). If the message can’t be processed then the processing delegate will not be executed and the processing instruction defined by DeniedProcessInstruction property is returned.

This feature can be useful when combined with an holding queue. In this case, messages that can’t be processed are sent to the holding queue for later processing.

A default ILimiter implementation is provided. That is the WindowLimiter class that limits processing to N messages in the defined time window. The next example will return a limiter that allows 3 messages per second. If the message can’t be processed then it will be requeded with a delay (sent to the holding queue).

private static WindowLimiter GetLimiter()
{
    const int maxAllowed = 3; // number of items to process in the time window
    const int windowSeconds = 1;
    const Constants.MessageProcessInstruction deniedInstruction = Constants.MessageProcessInstruction.RequeueMessageWithDelay;

    return new WindowLimiter(maxAllowed, windowSeconds, deniedInstruction);
}

Utilites service

A small utilites service class RabbitMqUtilitiesService is provided with methods to create, delete and get the total number of items in a queue.

Example

Example for consumer and sender services:

 class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");

            // consumer

            var consumerService = new RabbitMqConsumerService(GetBrokerConfigConsumers());

            consumerService.ReceiveMessage += ReceiveMessage;
            consumerService.ShutdownConsumer += ShutdownConsumer;
            consumerService.ReceiveMessageError += ReceiveMessageError;
            consumerService.MaxChannelsPerConnection = 5; // default is 3
            consumerService.ConsumersPrefetch = 2;  // default is 1
            consumerService.ServiceDescription = "test consumer service";

            consumerService.StartConsumers("my consumer");
 
            // sender

            var senderService = new RabbitMqSenderService(GetBrokerConfigSender());

            IMessageProperties properties = new MessageProperties { Priority = 3 };

            senderService.Send("my message", properties);

            Console.WriteLine("Press any key to exit...");
            Console.ReadKey();

            consumerService.Dispose();
        }

        private static IBrokerConfigSender GetBrokerConfigSender()
        {
            IBrokerConfigSender configSender = new BrokerConfigSender
            {
                Username = "test",
                Password = "123",
                Host = hostName,
                VirtualHost = "MyVirtualHost",
                RoutingKeyOrQueueName = "MyTestQueue",
                KeepConnectionOpen = true
            };
            return configSender;
        }

        private static IBrokerConfigConsumers GetBrokerConfigConsumers()
        {
            IBrokerConfigConsumers configConsumers = new BrokerConfigConsumers
            {
                Username = "test",
                Password = "123",
                Host = hostName,
                VirtualHost = "MyVirtualHost",
                RoutingKeyOrQueueName = "MyTestQueue",
                ShuffleHostList = false,
                Port = 0,
                TotalInstances = 4
            };
            return configConsumers;
        }

        private static async Task ReceiveMessageError(string routingKeyOrQueueName, string consumerTag, string exchange, string message, string errorMessage)
        {
            await Console.Out.WriteLineAsync($"Error: '{consumerTag}' | Queued message: {message} | Error message: {errorMessage}").ConfigureAwait(false);
        }

        private static async Task ShutdownConsumer(string consumerTag, ushort errorCode, string shutdownInitiator, string errorMessage)
        {
            await Console.Out.WriteLineAsync($"Shutdown '{consumerTag}' | {errorCode} | {shutdownInitiator} | {errorMessage}").ConfigureAwait(false);
        }

        private static async Task<MessageProcessInstruction> ReceiveMessage(string routingKeyOrQueueName, string consumerTag, long firstErrorTimestamp, string exchange, string message, string additionalInfo, IMessageProperties properties)
        {
            var priorityReceived = properties.Priority;

            var newPriority = (byte)(priorityReceived <= 3 ? 5 : priorityReceived);

            await Console.Out.WriteLineAsync($"Message received by '{consumerTag}' from queue '{routingKeyOrQueueName}': {message}; Priority received: {properties.Priority} ").ConfigureAwait(false);
            
            return new MessageProcessInstruction
            {
                Value = Constants.MessageProcessInstruction.OK,
                Priority = newPriority,
                AdditionalInfo = "id: 123"
            };
        }
    }
Share

c# consumer projects RabbitMQ sender

Previous articleJN.Authentication - Simple Authentication implementation for ASP.NET Core

Categories

Tags

apikey API Key Custom Authentication asp.net core aspnetcore authentication authentication-middleware basic-authentication Basic Authentication Scheme c# configuration consumer Dynamic IP filter firewall ip projects RabbitMQ sender Ubuntu UFW VPS

Recent Posts

  • JN.RabbitMQClient – RabbitMQ consumer and sender 2021-10-17
  • JN.Authentication – Simple Authentication implementation for ASP.NET Core 2021-01-01
  • JN.IpFilter – Simple IP Filter for ASP.NET Core 2020-12-30
  • UFW: allow traffic from a dynamic IP address 2020-12-29
  • Micro-tutorial: 3 things to do before using a VPS hosting 2020-12-29

Archives

  • October 2021
  • January 2021
  • December 2020

Categories

  • ASP.NET
  • c#
  • Linux
  • Projects
  • RabbitMq
  • Ubuntu
  • Uncategorized
José Novais - 2022
Privacy Policy