NodeJS simple background task handler  ๐Ÿ˜Ž

NodeJS simple background task handler ๐Ÿ˜Ž

ยท

4 min read

Hi guys,

Coming from the world of Laravel, where I can dispatch a simple job into the queue and run the worker, my job will be resolved in the background which is cool and awesome, IKR?

But for NodeJS, we can install libraries or build our own simple task handler.

So let's build a simple one today ๐Ÿ˜† The less dependency, the better, right?

I am using

  • Node 18

  • TypeScript

  • PostgreSQL

  • Functional approach

The Strategy

So we have:

  • 1 dedicated process for HTTP

  • 1 dedicated process to pull & process the tasks - Background Task

The Database

Always the first thing, right, is the database structure ๐Ÿ˜Ž

CREATE TABLE IF NOT EXISTS public.task {
    id BIGSERIAL PRIMARY KEY,
    type text NOT NULL,
    payload jsonb NOT NULL,
    created_at TIMESTAMPTZ NOT NULL DEFAULT CLOCK_TIMESTAMP(),
    processing_at TIMESTAMPTZ NULL,
    processed_at TIMESTAMPTZ NULL
}

-- 2 indexes to speed up retrieval query
CREATE INDEX idx_task_processed_processing_at ON public.task(processed_at, processing_at);
CREATE INDEX idx_task_created_at ON public.task(created_at);

You'd know how it goes after checking out this table, right? ๐Ÿ˜‰

The Code - Repository

First, we'll need a simple repository to get & retrieve the tasks:

type Task = {
    id: int;
    type: string;
    payload: Record<string, unknown>;
};

type TaskRepo {
    add(task: Omit<Task, 'id'>): Promise<Task>;
    get(perPage: int): Promise<Task[]>;
    markAsProcessing(taskId: number): Promise<void>;
    markAsProcessed(taskId: number): Promise<void>;
}

export const taskRepo: TaskRepo = {
    async add() {
        // simple insert query here
    },

    async get(perPage = 10) {
        // SELECT id, task, payload
        // FROM public.task
        // WHERE processed_at IS NULL AND processing_at IS NULL
        // ORDER BY created_at ASC
        // LIMIT ${perPage} 
    },

    async markAsProcessing(taskId: number) {
        // UPDATE public.task
        // SET processing_at = NOW()
        // WHERE id = $1
        // [taskId]
    },

    async markAsProcessed(taskId: number) {
        // UPDATE public.task
        // SET processed_at = NOW()
        // WHERE id = $1
        // [taskId]
    }
};

Plz help me to complete the implementation ๐Ÿ˜†

The Code - The Tasks Definition

We need to define a list of tasks that can be added and resolved under the background.

For this, we will use the discriminated union from TS. Btw I'm using zod to achieve this. With zod, I can safely parse the payload into the structured object and start to use the data safely.

For example, I have 2 tasks:

const sendVerificationEmailTask = z.object({
    type: z.literal('SEND_VERIFICATION_EMAIL'),
    payload: z.object({
        user: z.object({
            id: z.string().uuid(),
            email: z.string().email(),
        }),
        verificationKey: z.string(),
    }),
});

const sendForgotPasswordEmail = z.object({
    type: z.literal('SEND_FORGOT_PASSWORD_EMAIL'),
    payload: z.object({
        user: z.object({
            id: z.string().uuid(),
            email: z.string().email(),
        }),
        forgotPasswordKey: z.string(),
    }),
});

// both must have "type" while payload can be vary
const backgroundTask = z.discriminatedUnion('type', [
    sendVerificationEmailTask,
    sendForgotPasswordEmail,
]);

type BackgroundTask = z.infer<typeof backgroundTask>;

So if I need more tasks later, I just simply create another zod object and add it into the backgroundTask union.

The Code - Add Task to DB

Now we need a reusable function to receive the BackgroundTask and do the insert using the repo.

Ideally, we don't need to return anything

export async function createTask(task: BackgroundTask): Promise<void>
{
    await taskRepo.add(task);
}

Haha super easy ๐Ÿ˜†

From TaskRepo, just simply pick the type, payload and we'll do a simple INSERT.

The Final Boss - Task Management

This is where we create a dedicated process to pull tasks and process them concurrently using Event Loop.

export async function initBackgroundTaskManagement() {
    while (true) {
        const tasks = await taskRepo.get(10);

        if (!tasks.length) {
            await wait(5_000); // wait 5s
            continue;
        }

        const handlers = tasks.map(handleTask);

        await Promise.allSettled(handers);
    }
}

The handleTask would return a Promise and handle the given task:

export function handleTask(task: DbTask): Promise<void> {
    const hydratedTask = backgroundTask.safeParse({
        type: task.type,
        payload: task.payload,
    });

    await taskRepo.markAsProcessing(task.id);

    // unable to parse/hydrate the task, mark as processed
    if (!hydratedTask.success) {
        // if you have a logger, better to log the error here
        logger.warning('Parse task failed', { hydratedTask });
        await taskRepo.markAsProcessed(task.id);
        return;
    }

    // parsed, let's handle based on the "type"
    switch (hydratedTask.type) {
        case 'SEND_VERIFICATION_EMAIL': {
            // I can use the valid structured payload here
            await sendVerificationEmailHandler(hydratedTask.payload);
            break;
        }

        case 'SEND_FORGOT_PASSWORD_EMAIL': {
            // I can use the valid structured payload here
            await sendForgotPasswordEmail(hydratedTask.payload);
            break;
        }
    }

    // we don't care what was going on in the specific handler
    // if we reach here => success
    await taskRepo.markAsProcessed(task.id);
}

The Future Follow-Ups / Scaling

Reliability

  • Add integration tests (yes I did in my project hehehe)

  • Add a simple lifecycle management

    • So when we have SIGINT (or others), we would stop the task management gratefully.
  • Store more information for success, failure,...

Scaling

  • Increase the number of tasks from 10 to a higher number.

  • Increase the workers, by:

    • Use FOR UPDATE SKIP LOCKED to get the available tasks (without locked)

      • Use this to ensure 100% of tasks won't handle duplicated.
    • Pull and handle like today

    • Run more processes

Let's always add the optimization when the time is right, no need to strap a Rocket while nobody using our app at the time being ๐Ÿ˜†.

Conclusion

So this is my simple task management implementation for my recent projects, I'm moving to use NodeJS more lately and totally love it, from the performance to the ecosystem, community,...

Cheers!

ย