CodeNewbie Community 🌱

Cover image for How to Implement Delayed Messages with RabbitMQ? Code Examples
Fora Soft
Fora Soft

Posted on • Originally published at forasoft.com

How to Implement Delayed Messages with RabbitMQ? Code Examples

Sometimes you need to implement scheduled or repeating actions into your app. For example, sending a push-notification in 10 minutes or clearing a temporary folder every day.
To do this, you can use cron-tasks, that run scripts on your server automatically, or node-schedule package (a task-planning library for Node.js).
But with both these solutions there's a scaling problem:

  • There're several servers so it might be unclear on which one to run the task

  • The selected server might crash

  • The node might get deleted for freed up resources

One of possible solutions here is RabbitMQ, a message broker. Check out the overall delayed messages implementation scheme in this example on GitHub. And here's what it's like in detail, step by step:

1. Create 2 exchangers: regular and delayed one.

export const HELLO_EXCHANGE = Object.freeze({
    name: 'hello',
    type: 'direct',
    options: {
        durable: true,
    },
    queues: {},
});
export const HELLO_DELAYED_EXCHANGE = Object.freeze({
    name: 'helloDelayed',
    type: 'direct',
    options: {
        durable: true,
    },
    queues: {},
});
Enter fullscreen mode Exit fullscreen mode

2. In each of the exchangers create queues with the same binding type but different names.
For HELLO_EXCHANGE:

queues: {
        WORLD: {
            name: 'hello.world', // subscribe to this queue
            binding: 'hello.world',
            options: {
                durable: true,
        },
    },
},
Enter fullscreen mode Exit fullscreen mode

For HELLO_DELAYED_EXCHANGE:

queues: {
      WORLD: {
            name: 'helloDelayed.world',
            binding: 'hello.world',
            options: {
                durable: true,
                queueMode: 'lazy', // set the message to remain in the hard memory
        },
    }
Enter fullscreen mode Exit fullscreen mode

For the delayed-exchanger's queue, set the x-dead-letter-exchange argument with the regular queue's name. The argument tells the RabbitMQ broker to transfer the message to this exchanger if it's not processed.

arguments: {
                'x-dead-letter-exchange': HELLO_EXCHANGE.name, // set the queue to transfer the message to once it's dead
}
Enter fullscreen mode Exit fullscreen mode

3. Publish the message to the delayed-exchanger's queue with the expiration period

// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
    exchangeName: exchangeNameDelayed,
    queue: WORLD_DELAYED,
    expirationInMs: 30000, //set when the message dies (in 30s)
});
Enter fullscreen mode Exit fullscreen mode

Once the delayed message expires, it will go to the regular exchanger's queue.
Now you only have to set a consumer for the regular exchanger's queue:

// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
    createConsumer(
        {
            queueName: HELLO_EXCHANGE.queues.WORLD.name,
            prefetch: 50,
            log: true,
        },
        controller.consumeHelloWorld,
    ),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
    const result = await world({ name: payload.name });
    logger.info(result.message);
    // await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again
};
Enter fullscreen mode Exit fullscreen mode

Profit!
If you need to run the action periodically, publish the message to the delayed exchanger again at the end of the consumer section.

// await publishHelloDelayedWorld({ name: payload.name });
Enter fullscreen mode Exit fullscreen mode

NOTE: RabbitMQ operates on FIFO (first in, first out) - it processes commands in the same order they were set. So if you publish a delayed message with 1 day expiration and a message with 1 minute expiration in the same queue, it will process the second message after the first one, and the target action for the second message will happen a minute after the first.
Eventually, this is what you get:
1. Create the exchangers and queues

// services/base-service/src/broker/const/exchanges.ts
export const HELLO_EXCHANGE = Object.freeze({
    name: 'hello',
    type: 'direct',
    options: {
        durable: true,
    },
    queues: {
        WORLD: {
            name: 'hello.world', // subscribe to this queue
            binding: 'hello.world',
            options: {
            durable: true,
            },
        },
    },
});
export const HELLO_DELAYED_EXCHANGE = Object.freeze({
    name: 'helloDelayed',
    type: 'direct',
    options: {
        durable: true,
        queueMode: 'lazy', // specify that the hard memory must store this message
},
queues: {
    WORLD: {
        name: 'helloDelayed.world',
        binding: 'hello.world',
        options: {
            durable: true,
            queueMode: 'lazy', // specify that the hard memory must store this message                arguments: {
                'x-dead-letter-exchange': HELLO_EXCHANGE.name, // specify the queue to which the message must relocate after its death
                },
            },
        },
    },
});
Enter fullscreen mode Exit fullscreen mode

2. Add the publisher that will send the message to the delayed queue

// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
    exchangeName: exchangeNameDelayed,
    queue: WORLD_DELAYED, 
    expirationInMs: 30000, // set when the message dies (in 30s)
});
Enter fullscreen mode Exit fullscreen mode

3. Add the consumer for the regular exchanger's queue

// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
    createConsumer(
        {
            queueName: HELLO_EXCHANGE.queues.WORLD.name,
            prefetch: 50,
            log: true,
        },
        controller.consumeHelloWorld,
    ),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
    const result = await world({ name: payload.name });
    logger.info(result.message);
    // await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again
};
Enter fullscreen mode Exit fullscreen mode

4. Profit!
There's also a plugin that does this work for you and makes the implementation easier. You only create one exchanger, one queue, one publisher, and one consumer.
When publishing, the plugin will process the delayed message and, once it's expired, will transfer the message to the right queue. All on its own.
With this plugin the scheduled messages are processed in the order of the expiration time. That means, if you publish a message with a 1-day delay and then a message with 1-minute delay, the second one will be processed before the first.

// services/base-service/src/broker/const/exchanges.ts
export const HELLO_PLUGIN_DELAYED_EXCHANGE = Object.freeze({
   name: 'helloPluginDelayed',
   type: 'x-delayed-message', // specify the delayed queue
   options: {
       durable: true,
        arguments: {
          'x-delayed-type': 'direct', // set the recipient      
        },
    },
    queues: {
        WORLD_PLUGIN_DELAYED: {
            name: 'helloPluginDelayed.world', // subscribe to the queue
            binding: 'helloPluginDelayed.world',
            options: {
                durable: true,
            },
        },
    },
});
Enter fullscreen mode Exit fullscreen mode

Add publisher that sends the messages to the delayed queue:

export const publishHelloPluginDelayedWorld = createPublisher({
    exchangeName: exchangeNamePluginDelayed,
    queue: WORLD_PLUGIN_DELAYED,
    delayInMs: 60000,  // specify when the message should die (60s)
});
Enter fullscreen mode Exit fullscreen mode

Add consumer to the queue:

// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
    createConsumer(
        {
            queueName: HELLO_PLUGIN_DELAYED_EXCHANGE.queues.WORLD_PLUGIN_DELAYED.name,
            prefetch: 50,
            log: true,
        },
        controller.consumeHelloWorld,
    ),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
    const result = await world({ name: payload.name });
    logger.info(result.message);
};
Enter fullscreen mode Exit fullscreen mode

Aaand - you're done!

We regularly use RabbitMQ in our projects. For instance, check out its use case in Janson Media internet TV portfolio. It's a movie renting service, but make it digital.

Here we used RabbitMQ delayed messages for the app's 3 essential features: sending emails and SMS-messages to notify users that, for example, their lease period is almost over; sending messages about completed payments to the socket and sending a notification to the user; sending uploaded videos for further processing.

Hopefully, implementing delayed messages won't be like falling down the rabbit hole for you anymore (if it ever was) 🙂

Top comments (1)