Hi guys,
Coming from the world of Laravel, where I can dispatch a simple job into the queue and run the worker, my job will be resolved in the background which is cool and awesome, IKR?
But for NodeJS, we can install libraries or build our own simple task handler.
So let's build a simple one today ๐ The less dependency, the better, right?
I am using
Node 18
TypeScript
PostgreSQL
Functional approach
The Strategy
So we have:
1 dedicated process for HTTP
1 dedicated process to pull & process the tasks - Background Task
The Database
Always the first thing, right, is the database structure ๐
CREATE TABLE IF NOT EXISTS public.task {
id BIGSERIAL PRIMARY KEY,
type text NOT NULL,
payload jsonb NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT CLOCK_TIMESTAMP(),
processing_at TIMESTAMPTZ NULL,
processed_at TIMESTAMPTZ NULL
}
-- 2 indexes to speed up retrieval query
CREATE INDEX idx_task_processed_processing_at ON public.task(processed_at, processing_at);
CREATE INDEX idx_task_created_at ON public.task(created_at);
You'd know how it goes after checking out this table, right? ๐
The Code - Repository
First, we'll need a simple repository to get & retrieve the tasks:
type Task = {
id: int;
type: string;
payload: Record<string, unknown>;
};
type TaskRepo {
add(task: Omit<Task, 'id'>): Promise<Task>;
get(perPage: int): Promise<Task[]>;
markAsProcessing(taskId: number): Promise<void>;
markAsProcessed(taskId: number): Promise<void>;
}
export const taskRepo: TaskRepo = {
async add() {
// simple insert query here
},
async get(perPage = 10) {
// SELECT id, task, payload
// FROM public.task
// WHERE processed_at IS NULL AND processing_at IS NULL
// ORDER BY created_at ASC
// LIMIT ${perPage}
},
async markAsProcessing(taskId: number) {
// UPDATE public.task
// SET processing_at = NOW()
// WHERE id = $1
// [taskId]
},
async markAsProcessed(taskId: number) {
// UPDATE public.task
// SET processed_at = NOW()
// WHERE id = $1
// [taskId]
}
};
Plz help me to complete the implementation ๐
The Code - The Tasks Definition
We need to define a list of tasks that can be added and resolved under the background.
For this, we will use the discriminated union from TS. Btw I'm using zod to achieve this. With zod, I can safely parse the payload into the structured object and start to use the data safely.
For example, I have 2 tasks:
const sendVerificationEmailTask = z.object({
type: z.literal('SEND_VERIFICATION_EMAIL'),
payload: z.object({
user: z.object({
id: z.string().uuid(),
email: z.string().email(),
}),
verificationKey: z.string(),
}),
});
const sendForgotPasswordEmail = z.object({
type: z.literal('SEND_FORGOT_PASSWORD_EMAIL'),
payload: z.object({
user: z.object({
id: z.string().uuid(),
email: z.string().email(),
}),
forgotPasswordKey: z.string(),
}),
});
// both must have "type" while payload can be vary
const backgroundTask = z.discriminatedUnion('type', [
sendVerificationEmailTask,
sendForgotPasswordEmail,
]);
type BackgroundTask = z.infer<typeof backgroundTask>;
So if I need more tasks later, I just simply create another zod object and add it into the backgroundTask
union.
The Code - Add Task to DB
Now we need a reusable function to receive the BackgroundTask
and do the insert using the repo.
Ideally, we don't need to return anything
export async function createTask(task: BackgroundTask): Promise<void>
{
await taskRepo.add(task);
}
Haha super easy ๐
From TaskRepo, just simply pick the type
, payload
and we'll do a simple INSERT.
The Final Boss - Task Management
This is where we create a dedicated process to pull tasks and process them concurrently using Event Loop.
export async function initBackgroundTaskManagement() {
while (true) {
const tasks = await taskRepo.get(10);
if (!tasks.length) {
await wait(5_000); // wait 5s
continue;
}
const handlers = tasks.map(handleTask);
await Promise.allSettled(handers);
}
}
The handleTask
would return a Promise and handle the given task:
export function handleTask(task: DbTask): Promise<void> {
const hydratedTask = backgroundTask.safeParse({
type: task.type,
payload: task.payload,
});
await taskRepo.markAsProcessing(task.id);
// unable to parse/hydrate the task, mark as processed
if (!hydratedTask.success) {
// if you have a logger, better to log the error here
logger.warning('Parse task failed', { hydratedTask });
await taskRepo.markAsProcessed(task.id);
return;
}
// parsed, let's handle based on the "type"
switch (hydratedTask.type) {
case 'SEND_VERIFICATION_EMAIL': {
// I can use the valid structured payload here
await sendVerificationEmailHandler(hydratedTask.payload);
break;
}
case 'SEND_FORGOT_PASSWORD_EMAIL': {
// I can use the valid structured payload here
await sendForgotPasswordEmail(hydratedTask.payload);
break;
}
}
// we don't care what was going on in the specific handler
// if we reach here => success
await taskRepo.markAsProcessed(task.id);
}
The Future Follow-Ups / Scaling
Reliability
Add integration tests (yes I did in my project hehehe)
Add a simple lifecycle management
- So when we have SIGINT (or others), we would stop the task management gratefully.
Store more information for success, failure,...
Scaling
Increase the number of tasks from 10 to a higher number.
Increase the workers, by:
Use
FOR UPDATE SKIP LOCKED
to get the available tasks (without locked)- Use this to ensure 100% of tasks won't handle duplicated.
Pull and handle like today
Run more processes
Let's always add the optimization when the time is right, no need to strap a Rocket while nobody using our app at the time being ๐.
Conclusion
So this is my simple task management implementation for my recent projects, I'm moving to use NodeJS more lately and totally love it, from the performance to the ecosystem, community,...
Cheers!