Hey guys,
Message Queue (MQ) has been around for decades, hasn't it? MQ helps us a lot in terms of:
Communication between services โ๏ธ
Delegate heavy/expensive tasks to background workers ๐โโ๏ธ
...
In the tech world, there are both pros and cons. Let's discuss some of them and the solutions to overcome these challenges. ๐
A bit of background, I've worked with a system that serves 18k+ businesses (B2B) and handles 1 ~ 1.5 million messages per day.
Low throughput
Low throughput in Message Queues (MQ) can be caused by inefficient message processing, limited number of consumers or worker processes, lack of batching, and suboptimal configuration settings.
Cloud
Nowadays, we live and breathe the Cloud, so the low throughput is not really coming from the Queue services (SQS or others).
So, try to monitor your worker processes, and check out the performance. Slow processing would decrease the throughput.
Self-hosted
For self-hosted MQ, follow the best practices & recommendations for tuning up your MQ service to the moon.
Always monitor your workers too.
Failure to send out Queue Message
Have you ever thought that when doing something like this, it could end up failing?
queue.dispatch(new MyTask(...));
Yes, there is no guarantee that your messages will be sent out 100%, could be:
Our network problems
MQ network problems
Intermittent issues
...
To overcome this, you need an Outbox pattern layer
The Outbox pattern is a technique used to ensure reliable message delivery in distributed systems. It involves storing messages in a local database, called the outbox, before they are sent to the message queue. This approach helps to overcome message delivery failures by providing durability and retry mechanisms, ensuring messages are eventually sent to their intended recipients.
So basically, you won't interact with MQ directly, you would do this instead:
outbox.dispatch(new MyTask(...)); // this is just a simple DB write
Then, you'd add a Processor, to pull the unsent messages and dispatch them. If there is an error, stop and retry again, until the message is sent.
// processor
async function sendMessage(messages: Messages[]) {
for (const message of messages) {
try {
await queue.dispatch(message.toQueueMesage());
await message.markAsSent();
} catch (e) {
logger.error('Failed to dispatch message', e);
throw e;
}
}
}
Exactly-once
Even though some message queues claim to guarantee no duplication, we can't entirely rely on that. After all, we're dealing with distributed systems, which inherently present challenges.
To address this issue, simply add an Idempotency Layer before processing any message.
- Especially for mission-critical/sensitive messages, e.g.: transferring money, receiving money, and sending out packages, etc.
It doesn't have to be ugly, you can have a simple table:
CREATE TABLE idempotency (
key TEXT PRIMARY KEY
);
Using PRIMARY KEY
here means the key
must be unique, and act as the PRIMARY KEY, 2 birds in one stone.
With fewer elements in the table and only one index, appending data will be extremely fast (not to mention the in-memory database ๐ฅน)
Before processing occurs, simply append a unique key based on the payload of your message.
// transfer money
function handle(message: Message) {
try {
await idempotency.acquireLock(`send-money-out-${message.payment.id}`);
} catch (e) {
if (e instanceof QueryException) {
{
// acquire lock failed, stop
return;
}
// this message could be error or something related to DB
// here we can stop, log or retry depend on the priority
return;
}
await transferMoney(...);
}
Simple yet effective. We require database assistance to accomplish this (and I believe databases are present in every application nowadays ๐)
A disadvantage is that you would likely need to clear the table monthly/bi-annually/annually in order to free up disk space.
(For MQ DB) DB got hammered
It is normal to see high CPU usage or high connections from DB when using the DB as a MQ.
Workers would hit the DB really frequently to get the message, open a transaction to do SKIP LOCKED
, and then lock,... Those are expensive tasks. The more workers, the quicker you will see the problem.
There are 2 solutions/approaches for this:
(Preferable) Use another MQ, yes, don't put the hard tasks to DB.
(Optional) Lower the worker processes, tunning up DB.
This is a partial problem of the "Low throughput" above too.
Prioritization
Here it is, the priority queue, one of the most popular topics frequently discussed in large companies.
A priority queue is a data structure that stores elements with associated priorities, allowing for efficient retrieval of the highest-priority element. It is commonly used in scenarios where certain tasks or messages need to be processed before others based on their importance or urgency.
In short, the same MQ topic but different priorities depend on the business logic (Business A pays us more, so their messages must be processed first, Business B is our VIP customer,...)
We have several solutions to tackle this
Use different topic
Example: transfer
, transfer-low
, transfer-high
Each topic would have a different set of workers, e.g.:
transfer
: normal one, 10 worker processestransfer-low
: lowest priority, 2~5 worker processestransfer-high
: highest priority, 5~10 worker processes (based on how many messages we could resolve)
This is the simplest way, from the code, we just need to create a smart-routing-queue-dispatcher and that's all we need.
PROs: Easiest solution, no big work.
CONs: there will be a lot of topics to manage, set up and monitor.
Implement Priority Queue
This one is for those who love to build stuff haha. But let's always try to research some popular MQ out there before building your own.
Building a Priority Queue is simple, you can build it easily using DB with this table
CREATE TABLE priority_queue_jobs (
id SERIAL PRIMARY KEY,
payload JSONB,
priority INT DEFAULT 5,
created_at TIMESTAMP DEFAULT NOW()
);
# add index for priority DESC, created_at ASC
Things that need to be covered:
SKIP LOCKED
to ensure no duplication when pulling messages for processingBuild your desired pull message query
Build your queue dispatcher & worker
PROs: let DB help us retrieve the records, lock,...
CONs: take time, a bit of reinventing the wheel,...
Conclusion
These problems & solutions above, I experienced them all and I'd love to note down them here, and share them with the whole world.
So together, we'll continue to build awesome & reliable software ๐
Cheers guys!