Overcoming Message Queue (MQ) โœ‰๏ธ obstacles: Tips and Solutions

ยท

5 min read

Hey guys,

Message Queue (MQ) has been around for decades, hasn't it? MQ helps us a lot in terms of:

  • Communication between services โ˜Ž๏ธ

  • Delegate heavy/expensive tasks to background workers ๐Ÿƒโ€โ™‚๏ธ

  • ...

In the tech world, there are both pros and cons. Let's discuss some of them and the solutions to overcome these challenges. ๐Ÿ˜Ž

A bit of background, I've worked with a system that serves 18k+ businesses (B2B) and handles 1 ~ 1.5 million messages per day.

Low throughput

Low throughput in Message Queues (MQ) can be caused by inefficient message processing, limited number of consumers or worker processes, lack of batching, and suboptimal configuration settings.

Cloud

Nowadays, we live and breathe the Cloud, so the low throughput is not really coming from the Queue services (SQS or others).

So, try to monitor your worker processes, and check out the performance. Slow processing would decrease the throughput.

Self-hosted

For self-hosted MQ, follow the best practices & recommendations for tuning up your MQ service to the moon.

Always monitor your workers too.

Failure to send out Queue Message

Have you ever thought that when doing something like this, it could end up failing?

queue.dispatch(new MyTask(...));

Yes, there is no guarantee that your messages will be sent out 100%, could be:

  • Our network problems

  • MQ network problems

  • Intermittent issues

  • ...

To overcome this, you need an Outbox pattern layer

The Outbox pattern is a technique used to ensure reliable message delivery in distributed systems. It involves storing messages in a local database, called the outbox, before they are sent to the message queue. This approach helps to overcome message delivery failures by providing durability and retry mechanisms, ensuring messages are eventually sent to their intended recipients.

So basically, you won't interact with MQ directly, you would do this instead:

outbox.dispatch(new MyTask(...)); // this is just a simple DB write

Then, you'd add a Processor, to pull the unsent messages and dispatch them. If there is an error, stop and retry again, until the message is sent.

// processor
async function sendMessage(messages: Messages[]) {
    for (const message of messages) {
        try {
            await queue.dispatch(message.toQueueMesage());
            await message.markAsSent();
        } catch (e) {
            logger.error('Failed to dispatch message', e);
            throw e;
        }
    }
}

Exactly-once

Even though some message queues claim to guarantee no duplication, we can't entirely rely on that. After all, we're dealing with distributed systems, which inherently present challenges.

To address this issue, simply add an Idempotency Layer before processing any message.

  • Especially for mission-critical/sensitive messages, e.g.: transferring money, receiving money, and sending out packages, etc.

It doesn't have to be ugly, you can have a simple table:

CREATE TABLE idempotency (
    key TEXT PRIMARY KEY
);

Using PRIMARY KEY here means the key must be unique, and act as the PRIMARY KEY, 2 birds in one stone.

With fewer elements in the table and only one index, appending data will be extremely fast (not to mention the in-memory database ๐Ÿฅน)

Before processing occurs, simply append a unique key based on the payload of your message.

// transfer money
function handle(message: Message) {
    try {
        await idempotency.acquireLock(`send-money-out-${message.payment.id}`);
    } catch (e) {
        if (e instanceof QueryException) {
        { 
            // acquire lock failed, stop
            return;
        }

        // this message could be error or something related to DB
        // here we can stop, log or retry depend on the priority
        return;
    }

    await transferMoney(...);
}

Simple yet effective. We require database assistance to accomplish this (and I believe databases are present in every application nowadays ๐Ÿ˜‰)

A disadvantage is that you would likely need to clear the table monthly/bi-annually/annually in order to free up disk space.

(For MQ DB) DB got hammered

It is normal to see high CPU usage or high connections from DB when using the DB as a MQ.

Workers would hit the DB really frequently to get the message, open a transaction to do SKIP LOCKED, and then lock,... Those are expensive tasks. The more workers, the quicker you will see the problem.

There are 2 solutions/approaches for this:

  • (Preferable) Use another MQ, yes, don't put the hard tasks to DB.

  • (Optional) Lower the worker processes, tunning up DB.

This is a partial problem of the "Low throughput" above too.

Prioritization

Here it is, the priority queue, one of the most popular topics frequently discussed in large companies.

A priority queue is a data structure that stores elements with associated priorities, allowing for efficient retrieval of the highest-priority element. It is commonly used in scenarios where certain tasks or messages need to be processed before others based on their importance or urgency.

In short, the same MQ topic but different priorities depend on the business logic (Business A pays us more, so their messages must be processed first, Business B is our VIP customer,...)

We have several solutions to tackle this

Use different topic

Example: transfer , transfer-low, transfer-high

Each topic would have a different set of workers, e.g.:

  • transfer: normal one, 10 worker processes

  • transfer-low: lowest priority, 2~5 worker processes

  • transfer-high: highest priority, 5~10 worker processes (based on how many messages we could resolve)

This is the simplest way, from the code, we just need to create a smart-routing-queue-dispatcher and that's all we need.

PROs: Easiest solution, no big work.

CONs: there will be a lot of topics to manage, set up and monitor.

Implement Priority Queue

This one is for those who love to build stuff haha. But let's always try to research some popular MQ out there before building your own.

Building a Priority Queue is simple, you can build it easily using DB with this table

CREATE TABLE priority_queue_jobs (
    id SERIAL PRIMARY KEY,
    payload JSONB,
    priority INT DEFAULT 5,
    created_at TIMESTAMP DEFAULT NOW()
);

# add index for priority DESC, created_at ASC

Things that need to be covered:

  • SKIP LOCKED to ensure no duplication when pulling messages for processing

  • Build your desired pull message query

  • Build your queue dispatcher & worker

PROs: let DB help us retrieve the records, lock,...

CONs: take time, a bit of reinventing the wheel,...

Conclusion

These problems & solutions above, I experienced them all and I'd love to note down them here, and share them with the whole world.

So together, we'll continue to build awesome & reliable software ๐Ÿ˜Ž

Cheers guys!

ย