Queue System
UniPulse uses BullMQ with Redis 7 for asynchronous job processing. There are 27 queues handling everything from post publishing to AI generation to revenue attribution.
Architecture
Complete Queue Reference (27 Queues)
Content & Publishing
| Queue | Purpose | Trigger | Retry |
|---|---|---|---|
publish | Publish posts to Facebook/Instagram/TikTok APIs | User action or schedule | 3x exponential |
schedule | Trigger scheduled posts at the correct time | Cron / scheduler | 3x exponential |
post-classify | Classify post content type and topic via AI | After post creation | 2x exponential |
calendar-generate | Generate AI content calendars | User request | 2x exponential |
AI & Machine Learning
| Queue | Purpose | Trigger | Retry |
|---|---|---|---|
ai-suggestions | Generate AI content suggestions | Background / periodic | 2x exponential |
prediction-eval | Run engagement/revenue prediction models | After analytics sync | 2x exponential |
ab-test-eval | Evaluate A/B test results when duration elapses | Timer | 1x |
workspace-classify | Classify workspace industry/niche | Periodic | 2x exponential |
Analytics & Metrics
| Queue | Purpose | Trigger | Retry |
|---|---|---|---|
analytics-sync | Sync metrics from social platform APIs | Periodic / manual | 3x exponential |
benchmark-compute | Compute industry and niche benchmarks | Periodic | 2x exponential |
ad-sync | Sync ad performance from Meta/TikTok Ads | Periodic | 3x exponential |
Conversation Engine (ICE)
| Queue | Purpose | Trigger | Retry |
|---|---|---|---|
ice-process | Run the 3-step LLM pipeline on incoming messages | Message webhook | 3x exponential |
ice-escalation | Route thread to human agent based on rules | Escalation rule match | 2x exponential |
ice-experiment-eval | Evaluate reply experiment results | Experiment duration | 1x |
E-Commerce & Revenue
| Queue | Purpose | Trigger | Retry |
|---|---|---|---|
ecommerce-sync | Sync products and inventory from stores | Webhook / periodic | 3x exponential |
order-sync | Sync orders from e-commerce platforms | Webhook / periodic | 3x exponential |
revenue-attribution | Match orders to posts via UTM parameters | After order sync | 2x exponential |
Audience & Segmentation
| Queue | Purpose | Trigger | Retry |
|---|---|---|---|
audience-sync | Update audience graph and engagement scores | After interaction | 2x exponential |
segment-eval | Re-evaluate audience segment membership | After audience update | 2x exponential |
Workflows & Automation
| Queue | Purpose | Trigger | Retry |
|---|---|---|---|
workflow-engine | Execute workflow actions (notify, publish, reply, etc.) | Workflow trigger event | 3x exponential |
Integrations & System
| Queue | Purpose | Trigger | Retry |
|---|---|---|---|
competitor-scan | Scrape competitor social profiles and metrics | Periodic schedule | 3x exponential |
trend-scan | Detect trending topics via Gemini | Periodic schedule | 2x exponential |
token-refresh | Refresh expiring social platform OAuth tokens | Before expiry | 5x exponential |
payment-webhook | Process Stripe/Paymob subscription events | Payment webhook | 3x exponential |
webhook-process | Process incoming external webhooks | External event | 3x exponential |
Adding a Job to a Queue
import { publishQueue } from '../queues/publish.queue';
// Add a single job
await publishQueue.add('publish-post', {
postId: 'abc123',
platforms: ['facebook', 'instagram'],
workspaceId: 'ws_xyz',
}, {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
});
// Add a delayed job (scheduled publishing)
await publishQueue.add('publish-post', {
postId: 'abc456',
platforms: ['tiktok'],
}, {
delay: 3600000, // 1 hour from now
});
// Add a repeating job (periodic sync)
await analyticsSyncQueue.add('sync-metrics', {
workspaceId: 'ws_xyz',
}, {
repeat: {
every: 3600000, // Every hour
},
});
Processing Jobs (Worker)
import { Worker } from 'bullmq';
import { redis } from '../lib/redis';
import { postService } from '../services/post.service';
const publishWorker = new Worker('publish', async (job) => {
const { postId, platforms, workspaceId } = job.data;
// Update progress
await job.updateProgress(10);
// Process each platform
for (const platform of platforms) {
await postService.publishToPlatform(postId, platform);
await job.updateProgress((platforms.indexOf(platform) + 1) / platforms.length * 100);
}
return { success: true, publishedAt: new Date() };
}, {
connection: redis,
concurrency: 5, // Process up to 5 jobs simultaneously
});
// Event handlers
publishWorker.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
publishWorker.on('failed', (job, err) => {
console.error(`Job ${job?.id} failed:`, err.message);
});
Default Job Options
const defaultJobOptions = {
attempts: 3, // Retry up to 3 times
backoff: { type: 'exponential', delay: 5000 }, // 5s, 10s, 20s...
removeOnComplete: 100, // Keep last 100 completed jobs
removeOnFail: 500, // Keep last 500 failed jobs
};
Queue Configuration Pattern
// apps/api/src/queues/publish.queue.ts
import { Queue } from 'bullmq';
import { redis } from '../lib/redis';
export const publishQueue = new Queue('publish', {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 5000 },
removeOnComplete: 100,
removeOnFail: 500,
},
});
Monitoring & Debugging
| Method | Description |
|---|---|
| Admin API | Queue status endpoints (pending, active, completed, failed counts) |
| Control Center | Dashboard shows queue health and job counts |
| Redis CLI | redis-cli KEYS bull:* to inspect queue data |
| Job Retention | Failed jobs are retained in Redis for post-mortem analysis |
Useful BullMQ Admin Commands
// Get queue metrics
const counts = await publishQueue.getJobCounts();
// { waiting: 5, active: 2, completed: 150, failed: 3, delayed: 1 }
// Get failed jobs
const failed = await publishQueue.getFailed(0, 10);
// Retry a failed job
await failed[0].retry();
// Clean old jobs
await publishQueue.clean(86400000, 100, 'completed'); // Remove completed jobs older than 24h
Failed Job Alerts
Monitor the failed count for each queue. A spike in failures usually indicates an external API outage (social platforms, Gemini, etc.) or a database issue. The Control Center dashboard surfaces these alerts automatically.
Cross-Reference
- Data Flow -- how queues fit into the overall architecture
- Redis & Queues Infrastructure -- Redis configuration
- Services -- service layer that queues delegate to
- Workflow Engine -- workflow-specific queue processing
- Conversation Engine -- ICE queue pipeline