Stream Queues: Super Stream. Part two

Stream Queues: Super Stream. Part two

In the first part, we have seen how to create and send messages to a Super Stream.

Now let's see how to consume messages from a super stream with the feature "Single active consumer".

To define a consumer for the super stream is enough to:

var consumers_group = await Consumer.Create(new ConsumerConfig(system, "invoices"){ 
IsSuperStream = true,
IsSingleActiveConsumer = true,     
Reference = "Consumers Group",           

The client automatically creates the consumers for all the streams, like:

Article content

From the management UI you can see one active consumer per stream :

Article content

One consumer for each partition. ( invoices-0,invoices-1,invoices-2)

Inside the messages handler, you will receive all the messages for the three partitions:

MessageHandler = async (stream, consumerSource, context, message) =>        

The partition stream source is always available

Okay, what happens if you add another consumer with the same name (Reference) to the super stream in single active consumer mode?

Consumers are divided into two groups: some are active, and others are inactive, like:

Article content


From the management UI:

Article content

Active consumers continue to consume the messages.

Inactive consumers are in "waiting" mode. They can become active when:

  • One of the active consumers stopped working for some reason. ( Passive Failover )
  • Add another group of consumers given the same name (reference). ( Rebalance)

So there are always three active consumers (one for the partition stream)

The client is notified by the server when a change occurs. When a consumer moves from Inactive to Active or vice versa, an event called ConsumerUpdateListener occurs. Here, it is up to the user to decide how to use the offset. The ConsumerUpdateListener needs an offset to start consuming.

ConsumerUpdateListener = async (reference, stream, isActive) =>{
return my_offset....;
}        

It is possible to add different groups of consumers with different names ( Reference):

var consumers_group = await Consumer.Create(new ConsumerConfig(system, "invoices"){IsSuperStream = true,
IsSingleActiveConsumer = true,     
Reference = "Another Group",        

They work independently of each other. Reference is the value to aggregate the consumers and is mandatory for the SAC.

Note:

  • Super stream can be used without single active consumer feature.
  • The single active consumer can be used for a single stream without a super stream.

Conclusions

Super stream is an easy way to scale your stream application. The SAC feature is one of the most interesting features in the stream and super stream. In this post, I used the .NET client, but you can pick your preferred language 😀

Please see also:

Need help? Check https://www.rabbitmq.com/contact#consulting for community and commercial support.

Speak Italian? Here is a Telegram channel.

Happy RabbitMQ messaging! 🎉


  


Carlos Alberto Balseiro Mayi

MQ (Messaging Oriented Middleware) Specialist

2mo

The link for the first part isn't working for me.

To view or add a comment, sign in

Others also viewed

Explore topics