Concurrence (BullMQ + cron)
Jobs async — queues, workers, retries, DLQ, Bull Board, @elysiajs/cron
Deux mécanismes complémentaires (ADR-04) :
@elysiajs/cronpour les tâches périodiques in-process, BullMQ + Redis pour les jobs déclenchés avec retries et persistence.
Les 4 queues BullMQ
| Queue | Rôle | Concurrency | Retries | DLQ après |
|---|---|---|---|---|
mews-sync | Sync full ou partiel depuis Mews | 3 | 3, exp 30s | 3 |
pms-bridge | Push vers PMS (check-in, check-out, charges, room-status) | 10 | 5, exp 5s | 5 |
email | Emails transactionnels (check-in, invitation, magic link, reset) | 20 | 3, exp 60s | 3 |
stripe-webhook | Process webhooks Stripe (capture, refund, dispute) | 5 | 5, exp 10s | 5 |
Scaler l'une sans les autres en ajoutant un worker dédié pour cette queue (env var WORKER_CONCURRENCY_<QUEUE>).
Architecture
Queues definition
import { Queue } from "bullmq";
import { redis } from "../lib/redis";
const defaultJobOptions = {
removeOnComplete: { age: 24 * 3600, count: 10000 }, // keep 24h ou 10k max
removeOnFail: false, // keep failed pour debug
};
export const queues = {
mewsSync: new Queue("mews-sync", {
connection: redis,
defaultJobOptions: {
...defaultJobOptions,
attempts: 3,
backoff: { type: "exponential", delay: 30_000 },
},
}),
pmsBridge: new Queue("pms-bridge", {
connection: redis,
defaultJobOptions: {
...defaultJobOptions,
attempts: 5,
backoff: { type: "exponential", delay: 5_000 },
},
}),
email: new Queue("email", {
connection: redis,
defaultJobOptions: {
...defaultJobOptions,
attempts: 3,
backoff: { type: "exponential", delay: 60_000 },
},
}),
stripeWebhook: new Queue("stripe-webhook", {
connection: redis,
defaultJobOptions: {
...defaultJobOptions,
attempts: 5,
backoff: { type: "exponential", delay: 10_000 },
},
}),
};Workers
Chaque worker dans un fichier dédié, compose dans apps/worker/src/index.ts.
import { initOtel } from "@bell/observability";
await initOtel({ serviceName: "bell-worker", environment: process.env.DEPLOYMENT_ENV });
import { mewsSyncWorker } from "./workers/mews-sync.worker";
import { pmsBridgeWorker } from "./workers/pms-bridge.worker";
import { emailWorker } from "./workers/email.worker";
import { stripeWebhookWorker } from "./workers/stripe-webhook.worker";
const workers = [mewsSyncWorker, pmsBridgeWorker, emailWorker, stripeWebhookWorker];
// Graceful shutdown
for (const sig of ["SIGINT", "SIGTERM"] as const) {
process.on(sig, async () => {
logger.info("Worker shutting down");
await Promise.all(workers.map((w) => w.close()));
process.exit(0);
});
}
logger.info("All workers ready");Exemple : pms-bridge.worker.ts
import { Worker } from "bullmq";
import { redis } from "@bell/api/lib/redis";
import { createAdapter } from "@bell/api/services/integrations";
import { loadIntegration } from "@bell/api/services/integrations/repo";
export const pmsBridgeWorker = new Worker(
"pms-bridge",
async (job) => {
const { type, organizationId, guestId, items, roomId, status } = job.data;
const integration = await loadIntegration(organizationId);
if (!integration || !integration.isActive) {
// Hôtel sans PMS — no-op, pas d'erreur
return { skipped: true, reason: "no-active-integration" };
}
const adapter = createAdapter(integration);
switch (type) {
case "check-in": {
const reservation = await findReservationByGuest(guestId);
if (!reservation) throw new Error(`No external reservation for guest ${guestId}`);
await adapter.startReservation!(reservation.externalId);
return { ok: true };
}
case "check-out": {
const reservation = await findReservationByGuest(guestId);
if (!reservation) throw new Error(`No external reservation for guest ${guestId}`);
await adapter.processReservation!(reservation.externalId);
return { ok: true };
}
case "post-charges": {
const guest = await loadGuest(guestId);
await adapter.addOrder!({
guestId: guest.externalGuestId!,
items,
reservationId: guest.externalReservationId ?? undefined,
});
return { ok: true };
}
case "update-room-status": {
const room = await loadRoom(roomId);
if (!room.externalRoomId) return { skipped: true };
await adapter.updateRoomStatus!({ roomId: room.externalRoomId, status });
return { ok: true };
}
default:
throw new Error(`Unknown bridge type: ${type}`);
}
},
{
connection: redis,
concurrency: Number(process.env.WORKER_CONCURRENCY_PMS_BRIDGE ?? 10),
},
);@elysiajs/cron
Les tâches périodiques tournent dans le process Elysia (apps/server), pas dans le worker. Elles enqueuent des jobs BullMQ pour tout ce qui est lourd.
import { cron } from "@elysiajs/cron";
import { Elysia } from "elysia";
import { queues } from "../jobs/queues";
import { db } from "@bell/db";
import { session } from "@bell/db/schema/auth";
import { integration } from "@bell/db/schema/integrations";
import { lt, and, eq } from "drizzle-orm";
export const cronPlugin = new Elysia()
// Cleanup sessions expirées
.use(cron({
name: "cleanup-expired-sessions",
pattern: "0 3 * * *", // 3h AM
run: async () => {
const deleted = await db.delete(session).where(lt(session.expiresAt, new Date()));
logger.info({ deleted }, "Expired sessions cleaned");
},
}))
// Sync Mews journalière
.use(cron({
name: "mews-daily-sync",
pattern: "0 4 * * *", // 4h AM
run: async () => {
const integrations = await db.select()
.from(integration)
.where(and(eq(integration.isActive, true), eq(integration.provider, "mews")));
for (const int of integrations) {
await queues.mewsSync.add("full-sync", { integrationId: int.id });
}
logger.info({ count: integrations.length }, "Enqueued Mews daily syncs");
},
}))
// Reconciliation Stripe (captures restées en pending)
.use(cron({
name: "stripe-reconciliation",
pattern: "*/30 * * * *", // toutes les 30 min
run: async () => {
await reconcileStalePaymentIntents();
},
}))
// Email digest hebdo pour les managers
.use(cron({
name: "weekly-digest",
pattern: "0 8 * * 1", // lundi 8h AM
run: async () => {
const managers = await getActiveManagers();
for (const m of managers) {
await queues.email.add("weekly-digest", { userId: m.id });
}
},
}));Règle
Le cron enqueue, il n'exécute pas. Pour 2 raisons :
- Si le process Elysia crash pendant le cron run, le job est perdu. BullMQ = persistent.
- Exécuter 50 orgs × sync en série in-process = bloque le serveur HTTP pendant la durée. BullMQ = background.
Bull Board
UI web pour voir les queues, jobs actifs, failed, DLQ :
import { createBullBoard } from "@bull-board/api";
import { BullMQAdapter } from "@bull-board/api/bullMQAdapter";
import { ElysiaAdapter } from "@bull-board/elysia";
const serverAdapter = new ElysiaAdapter("/admin/queues");
createBullBoard({
queues: [
new BullMQAdapter(queues.mewsSync),
new BullMQAdapter(queues.pmsBridge),
new BullMQAdapter(queues.email),
new BullMQAdapter(queues.stripeWebhook),
],
serverAdapter,
});
export const queuesAdmin = new Elysia().use(requireAdmin).use(serverAdapter.registerPlugin());Accès : bell-api.hoaiy.com/admin/queues — protégé par role admin, visible uniquement par l'équipe HOAIY.
Dead Letter Queue (DLQ)
Un job qui échoue N fois (selon attempts de la queue) est gardé en failed — pas supprimé. Bull Board affiche les failed, permet de les rejouer manuellement.
Alertes Signoz : si la DLQ (queue.failed.count) dépasse 0, ops HOAIY reçoit un email.
Idempotency
Les webhooks Stripe et Mews peuvent être reçus plusieurs fois (retry upstream). Chaque job utilise une clé d'idempotency pour éviter le double traitement :
// Webhook Stripe
await queues.stripeWebhook.add(event.type, { event }, {
jobId: event.id, // BullMQ rejette les doublons (même jobId)
});
// Webhook Mews
await queues.mewsSync.add("webhook-entity", { event }, {
jobId: `mews-${event.id}`,
});Côté DB : toutes les mutations sont idempotentes (upsert par externalId, UPDATE payment_status = captured idempotent).
Redis en production
- Redis 7 avec
appendonly yes(persistence AOF) → pas de perte de jobs en cas de restart - Password en prod via env (Redis ACL ou
requirepass) - Pas exposé au WAN — accessible uniquement via réseau Docker
- maxmemory 2GB,
maxmemory-policy allkeys-lru(évite OOM si explosion de jobs) - Snapshot hebdo sur R2 (backup)
Scaling
Verticalement
Augmenter la concurrency par queue via env :
WORKER_CONCURRENCY_MEWS_SYNC=5
WORKER_CONCURRENCY_PMS_BRIDGE=20
WORKER_CONCURRENCY_EMAIL=50Horizontalement
Ajouter un 2e container bell-worker-2. BullMQ gère naturellement le load balancing entre workers qui consomment la même queue.
Sharder par queue
Si une queue devient hot-spot (ex: email sature), dédier un container par queue :
docker-compose.prod.yml:
worker-email:
<<: *worker
command: ["bun", "apps/worker/dist/index.js", "--only", "email"]Le worker ne lance que les workers de la queue spécifiée.
Observabilité des jobs
- Métriques automatiques par BullMQ + instrumentation OTel :
bullmq.queue.size{queue}(waiting + active + delayed)bullmq.job.duration_ms{queue, status}bullmq.job.failed_total{queue}
- Traces : chaque job crée un span (attributs
job.id,job.name,queue) - Logs : worker logs filtré par
queuedans Signoz
Debug : un job en DLQ
- Ouvrir Bull Board (
bell-api.hoaiy.com/admin/queues) - Trouver le job failed → voir l'erreur + stack trace
- Cliquer "Show input data" pour voir les args
- Reproduire localement :
bun scripts/run-job.ts <queue> <jobId> - Fix, déployer, cliquer "Retry" dans Bull Board
Tests
- Unitaires : le job est une fonction
processor(job)qu'on teste comme n'importe quelle fonction avec un mock job - Intégration : push un vrai job dans une queue de test, attendre le résultat, vérifier la DB
// apps/worker/src/workers/pms-bridge.worker.test.ts
import { Queue, Worker } from "bullmq";
import { pmsBridgeWorker } from "./pms-bridge.worker";
test("bridgeCheckIn calls adapter.startReservation", async () => {
const testQueue = new Queue("pms-bridge-test", { connection: testRedis });
const job = await testQueue.add("check-in", {
type: "check-in",
organizationId: TEST_ORG_ID,
guestId: TEST_GUEST_ID,
});
// ... wait for completion
// ... assert adapter.startReservation was called
});