AI Pipeline
Architecture of the NexusCommerce AI worker fleet, job queue, and result handling.
Overview
The AI Pipeline is the execution layer for all AI-powered features in NexusCommerce. It consists of the Python FastAPI worker fleet, the BullMQ job queue backed by Redis, the NestJS job dispatch and monitoring layer, and the Supabase result storage. Understanding this pipeline helps you optimize AI job performance, troubleshoot failures, and build custom workflows in AI Studio.
Key Concepts
Worker — A Python FastAPI process that executes AI jobs. Workers are stateless and horizontally scalable. Multiple worker instances share a single Redis queue.
Job — A unit of AI work. Jobs have a type, parameters, and a status lifecycle: queued → running → completed / failed.
Queue — A BullMQ queue backed by Redis. The NestJS API enqueues jobs; workers dequeue and execute them. The queue handles prioritization, retries, and dead-letter handling.
Dead Letter Queue (DLQ) — Failed jobs that have exhausted all retry attempts are moved to the DLQ for manual inspection.
Result Store — Supabase. All job results are written to the ai_jobs table in the result JSONB column. Secondary effects (e.g., price recommendations written to price_recommendations, sentiment scores written to review_sentiment) are also written by the worker.
Getting Started
The AI pipeline starts automatically with the worker service. In Docker Compose:
docker compose up workersWorkers register themselves with the queue on startup. Verify workers are connected:
GET http://localhost:8000/health
{"status":"ok","queue_connected":true,"jobs_processed_24h":142}Features
Job Lifecycle
NestJS API Redis Queue Python Worker
│ │ │
│ POST /api/ai-jobs │ │
│────────────────────────> │ │
│ Create job record in │ enqueue job │
│ Supabase (status=queued) │─────────────────────> │
│ │ │
│ │ dequeue job │
│ │ <─────────────────────│
│ │ │ execute
│ │ │ (calls OpenAI,
│ │ │ reads DB, etc.)
│ │ │
│ Update job in Supabase │ │
│ (status=completed, │ │
│ result={...}) <─────────────────────────────────│
│ │ │
│ Frontend polls or │ │
│ receives Realtime update │ │Worker Implementation
Each worker job type is implemented as a Python class in services/workers/:
services/workers/
main.py FastAPI app entry point
jobs/
pricing_recalculate.py Pricing engine
review_sentiment.py NLP sentiment analysis
return_classify.py Return reason classification
llm_visibility_score.py LLM visibility measurement
competitor_scan.py Competitor price fetching
listing_optimize.py Listing improvement generation
core/
supabase_client.py Supabase Python client
openai_client.py OpenAI client with retry
job_base.py Base class with common patternsRetry Logic
Failed jobs are retried with exponential backoff:
- Attempt 1: immediate
- Attempt 2: 30 seconds
- Attempt 3: 5 minutes
After 3 failed attempts, the job is marked failed and moved to the DLQ. The NestJS API fires an ai_job.failed audit log event.
Certain error types bypass retries:
INVALID_PARAMS— Job parameters are wrong; retrying will produce the same failureQUOTA_EXCEEDED— OpenAI quota is exhausted; retrying until quota resets is wasteful
Prioritization
Three priority levels map to separate BullMQ queues:
| Priority | Queue | Use Case |
|---|---|---|
| high | ai-jobs:high | Buy Box Recovery flows, time-sensitive repricing |
| normal | ai-jobs:normal | Standard job dispatch |
| low | ai-jobs:low | Scheduled bulk jobs, monthly reports |
Workers process high-priority queue jobs first.
Concurrency
Each worker instance processes jobs concurrently using Python's asyncio. Default concurrency per worker instance: 10 concurrent jobs.
For CPU-bound jobs (rare — most NexusCommerce jobs are I/O-bound), concurrency is limited to 2 per worker instance.
Configure concurrency via WORKER_CONCURRENCY environment variable.
Monitoring
Job queue metrics are available at GET http://worker:8000/metrics:
{
"queue_depths": {
"high": 0,
"normal": 12,
"low": 34
},
"active_jobs": 8,
"completed_24h": 142,
"failed_24h": 3,
"avg_duration_seconds": {
"pricing_recalculate": 27,
"review_sentiment": 45,
"return_classify": 31
}
}Expose these metrics to your monitoring system (Prometheus, Datadog, etc.) for alerting.
OpenAI Integration
The openai_client.py wrapper adds:
- Automatic retry on rate limit (429) and server errors (5xx)
- Timeout handling (30 second default)
- Token usage tracking (logged to
ai_job_metricstable) - Model selection via
OPENAI_MODELenvironment variable
Default model: gpt-4o-mini (good balance of speed and quality for sentiment and classification). Override to gpt-4o for higher-quality listing optimization proposals.
Configuration
| Variable | Description | Default |
|---|---|---|
REDIS_URL | Redis connection string | redis://localhost:6379 |
SUPABASE_URL | Supabase project URL | Required |
SUPABASE_SERVICE_ROLE_KEY | Supabase service role key | Required |
OPENAI_API_KEY | OpenAI API key | Required |
OPENAI_MODEL | Default GPT model | gpt-4o-mini |
WORKER_CONCURRENCY | Max concurrent jobs per worker | 10 |
JOB_TIMEOUT_SECONDS | Max job execution time | 300 |
MAX_RETRIES | Retry attempts before DLQ | 3 |