Consumer Groups
Consumer groups enable independent consumption of the same messages by multiple systems. Without consumer groups, a single dequeue operation removes a message from a topic for all consumers. With consumer groups, each group independently tracks the processing state of every message, allowing the same message to be delivered and processed by multiple consumer systems in parallel.
Key Behaviors
- Group independence — Each group maintains its own delivery state for every message. Acking a message in one group does not affect other groups.
- Parallel processing — Multiple groups can process the same message concurrently without blocking each other.
- Message lifecycle per group — A message is deleted from the queue only when all registered groups have acknowledged it. If any group has not yet processed (or nacked) a message, it remains available.
- Legacy mode — When using the default consumer group (or no group specified in older client versions), queue-ti behaves as a single-consumer queue, maintaining backward compatibility.
Registering a Consumer Group
Register a group via the HTTP admin API:
# Register a new group for a topic
curl -X POST http://localhost:8080/api/topics/orders/consumer-groups \
-H "Content-Type: application/json" \
-d '{"consumer_group": "warehouse"}'
# List all groups for a topic
curl http://localhost:8080/api/topics/orders/consumer-groups
# Unregister a group
curl -X DELETE http://localhost:8080/api/topics/orders/consumer-groups/warehouseOnce registered, a group automatically receives all pending messages that were enqueued before registration (backfill). Future messages are delivered to all registered groups.
Using Consumer Groups in Client Libraries
Go Client
consumer := client.NewConsumer("orders",
queueti.WithConsumerGroup("warehouse"),
queueti.WithConcurrency(4),
)
err := consumer.Consume(ctx, func(ctx context.Context, msg *queueti.Message) error {
// Process message...
return nil // Ack; return error to Nack
})For batch consumption:
consumer := client.NewConsumer("orders",
queueti.WithConsumerGroup("warehouse"),
)
err := consumer.ConsumeBatch(ctx, "orders", 50,
func(ctx context.Context, messages []*queueti.Message) error {
for _, msg := range messages {
// Process...
msg.Ack(ctx)
}
return nil
},
)Node.js Client
const consumer = client.consumer('orders', {
consumerGroup: 'warehouse',
concurrency: 4,
})
await consumer.consume(async (msg) => {
// Process message...
// Return normally to Ack; throw to Nack
})For batch consumption:
await consumer.consumeBatch(
{ batchSize: 50, consumerGroup: 'warehouse' },
async (messages) => {
for (const msg of messages) {
// Process...
await msg.ack()
}
},
)Python Client
import asyncio
from queueti import connect, ConnectOptions, ConsumerOptions
async def main():
client = await connect("localhost:50051", options=ConnectOptions(insecure=True))
consumer = client.consumer(
"orders",
options=ConsumerOptions(consumer_group="warehouse", concurrency=4),
)
async def handler(msg):
# Process message...
pass
await consumer.consume(handler)
asyncio.run(main())For batch consumption:
from queueti import BatchOptions
async def handle_batch(messages):
for msg in messages:
# Process...
await msg.ack()
await consumer.consume_batch(
options=BatchOptions(batch_size=50, consumer_group="warehouse"),
handler=handle_batch,
)Sync variant:
from queueti import connect_sync, ConnectOptions, ConsumerOptions
client = connect_sync("localhost:50051", options=ConnectOptions(insecure=True))
consumer = client.consumer(
"orders",
options=ConsumerOptions(consumer_group="warehouse"),
)
consumer.consume(handler) # Blocks until interruptedConsumer Group Grants
Consumer group grants let you restrict a user to a specific named consumer group on a topic. This is useful when multiple teams consume from the same topic and you want to enforce that each team only processes its own group.
See the Authentication guide for detailed information on consumer group grants and how to create them.