N Nexus Docs
Architecture

AI Pipeline

Architecture of the NexusCommerce AI worker fleet, job queue, and result handling.

Overview

The AI Pipeline is the execution layer for all AI-powered features in NexusCommerce. It consists of the Python FastAPI worker fleet, the BullMQ job queue backed by Redis, the NestJS job dispatch and monitoring layer, and the Supabase result storage. Understanding this pipeline helps you optimize AI job performance, troubleshoot failures, and build custom workflows in AI Studio.

Key Concepts

Worker — A Python FastAPI process that executes AI jobs. Workers are stateless and horizontally scalable. Multiple worker instances share a single Redis queue.

Job — A unit of AI work. Jobs have a type, parameters, and a status lifecycle: queuedrunningcompleted / failed.

Queue — A BullMQ queue backed by Redis. The NestJS API enqueues jobs; workers dequeue and execute them. The queue handles prioritization, retries, and dead-letter handling.

Dead Letter Queue (DLQ) — Failed jobs that have exhausted all retry attempts are moved to the DLQ for manual inspection.

Result Store — Supabase. All job results are written to the ai_jobs table in the result JSONB column. Secondary effects (e.g., price recommendations written to price_recommendations, sentiment scores written to review_sentiment) are also written by the worker.

Getting Started

The AI pipeline starts automatically with the worker service. In Docker Compose:

docker compose up workers

Workers register themselves with the queue on startup. Verify workers are connected:

GET http://localhost:8000/health
{"status":"ok","queue_connected":true,"jobs_processed_24h":142}

Features

Job Lifecycle

NestJS API                   Redis Queue              Python Worker
    │                             │                        │
    │  POST /api/ai-jobs          │                        │
    │────────────────────────>    │                        │
    │  Create job record in       │  enqueue job           │
    │  Supabase (status=queued)   │─────────────────────>  │
    │                             │                        │
    │                             │  dequeue job           │
    │                             │ <─────────────────────│
    │                             │                        │  execute
    │                             │                        │  (calls OpenAI,
    │                             │                        │   reads DB, etc.)
    │                             │                        │
    │  Update job in Supabase     │                        │
    │  (status=completed,         │                        │
    │   result={...})   <─────────────────────────────────│
    │                             │                        │
    │  Frontend polls or          │                        │
    │  receives Realtime update   │                        │

Worker Implementation

Each worker job type is implemented as a Python class in services/workers/:

services/workers/
  main.py                    FastAPI app entry point
  jobs/
    pricing_recalculate.py   Pricing engine
    review_sentiment.py      NLP sentiment analysis
    return_classify.py       Return reason classification
    llm_visibility_score.py  LLM visibility measurement
    competitor_scan.py       Competitor price fetching
    listing_optimize.py      Listing improvement generation
  core/
    supabase_client.py       Supabase Python client
    openai_client.py         OpenAI client with retry
    job_base.py              Base class with common patterns

Retry Logic

Failed jobs are retried with exponential backoff:

  • Attempt 1: immediate
  • Attempt 2: 30 seconds
  • Attempt 3: 5 minutes

After 3 failed attempts, the job is marked failed and moved to the DLQ. The NestJS API fires an ai_job.failed audit log event.

Certain error types bypass retries:

  • INVALID_PARAMS — Job parameters are wrong; retrying will produce the same failure
  • QUOTA_EXCEEDED — OpenAI quota is exhausted; retrying until quota resets is wasteful

Prioritization

Three priority levels map to separate BullMQ queues:

PriorityQueueUse Case
highai-jobs:highBuy Box Recovery flows, time-sensitive repricing
normalai-jobs:normalStandard job dispatch
lowai-jobs:lowScheduled bulk jobs, monthly reports

Workers process high-priority queue jobs first.

Concurrency

Each worker instance processes jobs concurrently using Python's asyncio. Default concurrency per worker instance: 10 concurrent jobs.

For CPU-bound jobs (rare — most NexusCommerce jobs are I/O-bound), concurrency is limited to 2 per worker instance.

Configure concurrency via WORKER_CONCURRENCY environment variable.

Monitoring

Job queue metrics are available at GET http://worker:8000/metrics:

{
  "queue_depths": {
    "high": 0,
    "normal": 12,
    "low": 34
  },
  "active_jobs": 8,
  "completed_24h": 142,
  "failed_24h": 3,
  "avg_duration_seconds": {
    "pricing_recalculate": 27,
    "review_sentiment": 45,
    "return_classify": 31
  }
}

Expose these metrics to your monitoring system (Prometheus, Datadog, etc.) for alerting.

OpenAI Integration

The openai_client.py wrapper adds:

  • Automatic retry on rate limit (429) and server errors (5xx)
  • Timeout handling (30 second default)
  • Token usage tracking (logged to ai_job_metrics table)
  • Model selection via OPENAI_MODEL environment variable

Default model: gpt-4o-mini (good balance of speed and quality for sentiment and classification). Override to gpt-4o for higher-quality listing optimization proposals.

Configuration

VariableDescriptionDefault
REDIS_URLRedis connection stringredis://localhost:6379
SUPABASE_URLSupabase project URLRequired
SUPABASE_SERVICE_ROLE_KEYSupabase service role keyRequired
OPENAI_API_KEYOpenAI API keyRequired
OPENAI_MODELDefault GPT modelgpt-4o-mini
WORKER_CONCURRENCYMax concurrent jobs per worker10
JOB_TIMEOUT_SECONDSMax job execution time300
MAX_RETRIESRetry attempts before DLQ3