Skip to main content

Queue System

UniPulse uses BullMQ with Redis 7 for asynchronous job processing. There are 27 queues handling everything from post publishing to AI generation to revenue attribution.


Architecture


Complete Queue Reference (27 Queues)

Content & Publishing

QueuePurposeTriggerRetry
publishPublish posts to Facebook/Instagram/TikTok APIsUser action or schedule3x exponential
scheduleTrigger scheduled posts at the correct timeCron / scheduler3x exponential
post-classifyClassify post content type and topic via AIAfter post creation2x exponential
calendar-generateGenerate AI content calendarsUser request2x exponential

AI & Machine Learning

QueuePurposeTriggerRetry
ai-suggestionsGenerate AI content suggestionsBackground / periodic2x exponential
prediction-evalRun engagement/revenue prediction modelsAfter analytics sync2x exponential
ab-test-evalEvaluate A/B test results when duration elapsesTimer1x
workspace-classifyClassify workspace industry/nichePeriodic2x exponential

Analytics & Metrics

QueuePurposeTriggerRetry
analytics-syncSync metrics from social platform APIsPeriodic / manual3x exponential
benchmark-computeCompute industry and niche benchmarksPeriodic2x exponential
ad-syncSync ad performance from Meta/TikTok AdsPeriodic3x exponential

Conversation Engine (ICE)

QueuePurposeTriggerRetry
ice-processRun the 3-step LLM pipeline on incoming messagesMessage webhook3x exponential
ice-escalationRoute thread to human agent based on rulesEscalation rule match2x exponential
ice-experiment-evalEvaluate reply experiment resultsExperiment duration1x

E-Commerce & Revenue

QueuePurposeTriggerRetry
ecommerce-syncSync products and inventory from storesWebhook / periodic3x exponential
order-syncSync orders from e-commerce platformsWebhook / periodic3x exponential
revenue-attributionMatch orders to posts via UTM parametersAfter order sync2x exponential

Audience & Segmentation

QueuePurposeTriggerRetry
audience-syncUpdate audience graph and engagement scoresAfter interaction2x exponential
segment-evalRe-evaluate audience segment membershipAfter audience update2x exponential

Workflows & Automation

QueuePurposeTriggerRetry
workflow-engineExecute workflow actions (notify, publish, reply, etc.)Workflow trigger event3x exponential

Integrations & System

QueuePurposeTriggerRetry
competitor-scanScrape competitor social profiles and metricsPeriodic schedule3x exponential
trend-scanDetect trending topics via GeminiPeriodic schedule2x exponential
token-refreshRefresh expiring social platform OAuth tokensBefore expiry5x exponential
payment-webhookProcess Stripe/Paymob subscription eventsPayment webhook3x exponential
webhook-processProcess incoming external webhooksExternal event3x exponential

Adding a Job to a Queue

import { publishQueue } from '../queues/publish.queue';

// Add a single job
await publishQueue.add('publish-post', {
postId: 'abc123',
platforms: ['facebook', 'instagram'],
workspaceId: 'ws_xyz',
}, {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
});

// Add a delayed job (scheduled publishing)
await publishQueue.add('publish-post', {
postId: 'abc456',
platforms: ['tiktok'],
}, {
delay: 3600000, // 1 hour from now
});

// Add a repeating job (periodic sync)
await analyticsSyncQueue.add('sync-metrics', {
workspaceId: 'ws_xyz',
}, {
repeat: {
every: 3600000, // Every hour
},
});

Processing Jobs (Worker)

import { Worker } from 'bullmq';
import { redis } from '../lib/redis';
import { postService } from '../services/post.service';

const publishWorker = new Worker('publish', async (job) => {
const { postId, platforms, workspaceId } = job.data;

// Update progress
await job.updateProgress(10);

// Process each platform
for (const platform of platforms) {
await postService.publishToPlatform(postId, platform);
await job.updateProgress((platforms.indexOf(platform) + 1) / platforms.length * 100);
}

return { success: true, publishedAt: new Date() };
}, {
connection: redis,
concurrency: 5, // Process up to 5 jobs simultaneously
});

// Event handlers
publishWorker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});

publishWorker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err.message);
});

Default Job Options

const defaultJobOptions = {
attempts: 3, // Retry up to 3 times
backoff: { type: 'exponential', delay: 5000 }, // 5s, 10s, 20s...
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 500, // Keep last 500 failed jobs
};

Queue Configuration Pattern

// apps/api/src/queues/publish.queue.ts
import { Queue } from 'bullmq';
import { redis } from '../lib/redis';

export const publishQueue = new Queue('publish', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: 100,
removeOnFail: 500,
},
});

Monitoring & Debugging

MethodDescription
Admin APIQueue status endpoints (pending, active, completed, failed counts)
Control CenterDashboard shows queue health and job counts
Redis CLIredis-cli KEYS bull:* to inspect queue data
Job RetentionFailed jobs are retained in Redis for post-mortem analysis

Useful BullMQ Admin Commands

// Get queue metrics
const counts = await publishQueue.getJobCounts();
// { waiting: 5, active: 2, completed: 150, failed: 3, delayed: 1 }

// Get failed jobs
const failed = await publishQueue.getFailed(0, 10);

// Retry a failed job
await failed[0].retry();

// Clean old jobs
await publishQueue.clean(86400000, 100, 'completed'); // Remove completed jobs older than 24h
Failed Job Alerts

Monitor the failed count for each queue. A spike in failures usually indicates an external API outage (social platforms, Gemini, etc.) or a database issue. The Control Center dashboard surfaces these alerts automatically.


Cross-Reference