Integrating AI Workflows with Guardrails in DDD/Hexagonal Architecture and CQRS
How to embed LLM-powered agents into enterprise-grade architecture without losing control — ports and adapters, domain events, budget caps, state machines, and observable AI in production.
Large language models are powerful, unpredictable, expensive, and slow. Enterprise architectures are disciplined, predictable, cost-controlled, and observable. Putting LLMs inside a system built on Domain-Driven Design, hexagonal architecture, and CQRS is fundamentally a containment and integration problem: how do you use AI’s capabilities without letting its failure modes — hallucination, cost explosion, latency spikes, non-determinism — leak into the rest of your domain?
This article walks through how we solved this at Mesh-Sync, a 3D model processing platform where AI powers semantic analysis, model classification, and automated code review. Every pattern described here runs in production, handling thousands of LLM calls per day with hard budget caps, distributed locking, state machine validation, and full observability.
The Fundamental Tension
Traditional software has a contract: given input X, produce output Y, every time, within Z milliseconds. DDD reinforces this with bounded contexts, aggregates that enforce invariants, and domain events that represent facts.
LLMs break every part of that contract:
- Non-deterministic output — Same prompt, different result, every time.
- Unbounded latency — A response might take 2 seconds or 90 seconds.
- Cost per invocation — Every call costs money, and complex chains multiply costs.
- Failure modes — Rate limits, context window overflow, content filters, model degradation.
The architectural challenge is: how do you give AI a seat at the table without letting it flip the table?
The answer, in our experience, is to treat AI as an infrastructure concern — not a domain concept. The domain layer neither knows nor cares that classification is powered by an LLM. It asks a port for a classification, and an adapter provides one. Everything else — model selection, retry logic, budget enforcement, fallback strategies — lives outside the domain.
DDD Foundations: Bounded Contexts and Domain Events
Before introducing AI, let’s establish the DDD structure it plugs into.
Mesh-Sync has several bounded contexts: Model Management (uploads, metadata, storage), Pipeline Orchestration (job scheduling, stage execution), Marketplace (publishing, search, transactions), and AI Classification (semantic analysis, model categorisation).
Each context communicates through domain events — immutable records of something that happened in the domain:
// Domain event definitions
interface DomainEvent {
type: string;
correlationId: string;
timestamp: Date;
payload: any;
}
// Concrete events
type ModelStatusUpdateRequested = DomainEvent & {
type: 'model.status.update_requested';
payload: {
modelId: string;
newStatus: 'processing' | 'completed' | 'failed';
};
};
type TechnicalMetadataSaveRequested = DomainEvent & {
type: 'model.technical_metadata.save_requested';
payload: {
modelId: string;
metadata: Record<string, unknown>;
};
};
type ModelClassificationCompleted = DomainEvent & {
type: 'model.classification.completed';
payload: {
modelId: string;
metamodelId: string;
confidence: number;
classifiedAt: Date;
};
};
Events are dispatched through a Mediator pattern — each event type has exactly one handler, and the dispatcher routes events without the emitter knowing who handles them:
class DomainEventDispatcher {
private handlers = new Map<string, DomainEventHandler<any>>();
register<T extends DomainEvent>(
eventType: string,
handler: DomainEventHandler<T>
): void {
this.handlers.set(eventType, handler);
}
async dispatch<T extends DomainEvent>(event: T): Promise<void> {
const handler = this.handlers.get(event.type);
if (!handler) {
throw new Error(`No handler for event type: ${event.type}`);
}
await handler.handle(event);
}
}
// Registration at startup
const dispatcher = new DomainEventDispatcher();
dispatcher.register(
'model.status.update_requested',
new ModelStatusUpdateHandler(webhookClient)
);
dispatcher.register(
'model.technical_metadata.save_requested',
new TechnicalMetadataSaveHandler(webhookClient)
);
Why single-handler-per-event? In our experience, fan-out (multiple handlers for one event) creates implicit coupling and makes it impossible to reason about side effects. If model status updates need to trigger notifications AND cache invalidation, those are two separate events in the domain, not one event with two handlers.
Hexagonal Architecture: Ports and Adapters for AI
The hexagonal architecture (ports and adapters) is the key pattern that makes AI integration clean. The domain defines ports — abstract interfaces that express what it needs. Adapters implement those ports with specific technology.
Here’s the LLM port:
from abc import ABC, abstractmethod
class LLMPort(ABC):
"""Domain port: the system needs text generation capability.
The domain doesn't know or care if it's GPT-4, Claude, or a local model."""
@abstractmethod
async def generate(
self,
prompt: str,
max_tokens: int = 2000,
temperature: float = 0.7,
) -> str:
pass
And the adapters that implement it:
class OllamaAdapter(LLMPort):
"""Adapter: local inference via Ollama."""
def __init__(self, model: str = "llama3", base_url: str = "http://localhost:11434"):
self.model = model
self.base_url = base_url
async def generate(self, prompt: str, max_tokens: int = 2000,
temperature: float = 0.7) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/api/generate",
json={
"model": self.model,
"prompt": prompt,
"options": {"num_predict": max_tokens, "temperature": temperature},
},
timeout=120.0,
)
return response.json()["response"]
class OpenRouterAdapter(LLMPort):
"""Adapter: cloud inference via OpenRouter (GPT-4, Claude, etc.)."""
def __init__(self, api_key: str, model: str = "anthropic/claude-sonnet-4-20250514"):
self.api_key = api_key
self.model = model
async def generate(self, prompt: str, max_tokens: int = 2000,
temperature: float = 0.7) -> str:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://openrouter.ai/api/v1/chat/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"model": self.model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": max_tokens,
"temperature": temperature,
},
timeout=180.0,
)
return response.json()["choices"][0]["message"]["content"]
The same pattern applies to the worker backend — which is not an AI concern, but follows identical hexagonal principles:
class WorkerBackendPort(ABC):
"""Port: the system needs to dispatch and manage background jobs."""
@abstractmethod
async def add_job(self, queue_name: str, job_name: str,
data: dict, opts: dict | None = None) -> str:
pass
@abstractmethod
async def create_worker(self, queue_name: str,
processor: Callable, config: dict | None = None) -> Any:
pass
class BullMQAdapter(WorkerBackendPort):
"""Adapter: job dispatch via BullMQ/Redis."""
async def add_job(self, queue_name, job_name, data, opts=None):
queue = QueueManager.get_queue(queue_name)
job = await queue.add(job_name, data, opts or {})
return job.id
class ExternalWorkerAdapter(WorkerBackendPort):
"""Adapter: job dispatch via external HTTP service."""
def __init__(self, service_url: str):
self.service_url = service_url
async def add_job(self, queue_name, job_name, data, opts=None):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.service_url}/jobs",
json={"queue": queue_name, "name": job_name, "data": data},
)
return response.json()["jobId"]
A factory selects the adapter at startup based on configuration:
class WorkerBackendFactory:
@staticmethod
def get_adapter() -> WorkerBackendPort:
if os.getenv("USE_EXTERNAL_BACKEND") == "true":
return ExternalWorkerAdapter(os.getenv("BACKEND_SERVICE_URL"))
return BullMQAdapter()
The hexagonal diagram below shows how the domain core is insulated from both AI and infrastructure:
graph TB
subgraph domain[Domain Core]
BC1[Model Management<br/><i>Bounded Context</i>]
BC2[Pipeline Orchestration<br/><i>Bounded Context</i>]
BC3[AI Classification<br/><i>Bounded Context</i>]
DE[Domain Events<br/><i>Mediator Dispatch</i>]
end
subgraph ports[Ports — Interfaces]
LP[LLM Port<br/><i>generate(prompt)</i>]
WBP[Worker Backend Port<br/><i>add_job(), create_worker()</i>]
SP[Storage Port<br/><i>upload(), download()</i>]
EP[Event Port<br/><i>publish(event)</i>]
end
subgraph adapters[Adapters — Implementations]
OA[Ollama Adapter<br/><i>Local LLM</i>]
ORA[OpenRouter Adapter<br/><i>Cloud LLM</i>]
BMA[BullMQ Adapter<br/><i>Redis queues</i>]
EWA[External Worker Adapter<br/><i>HTTP service</i>]
S3A[S3 Adapter<br/><i>AWS storage</i>]
ELKA[ELK Adapter<br/><i>Elasticsearch events</i>]
end
BC1 & BC2 & BC3 --> DE
BC3 --> LP
BC2 --> WBP
BC1 --> SP
DE --> EP
LP --> OA
LP --> ORA
WBP --> BMA
WBP --> EWA
SP --> S3A
EP --> ELKA
style domain fill:#1e1e24,stroke:#5eead4,color:#e4e4e7
style ports fill:#1e1e24,stroke:#818cf8,color:#e4e4e7
style adapters fill:#1e1e24,stroke:#34d399,color:#e4e4e7
The critical insight: When you swap from OllamaAdapter (local, free, fast, lower quality) to OpenRouterAdapter (cloud, paid, slower, higher quality), the domain layer doesn’t change. Not one line. The factory handles selection, the port enforces the contract, and the adapter handles the details.
CQRS: Separating Commands from Queries
Command-Query Responsibility Segregation (CQRS) in our system is implemented through webhooks — a deliberate architectural decision (ADR-010) that keeps the write path clean and the read path flexible.
Command side — Dispatching work:
// StageExecutor dispatches a worker job (command)
async executeWorkerStage(
stage: StageDefinition,
context: PipelineContext
): Promise<void> {
const interpolated = this.interpolate(stage.inputs, context);
// Validate inputs before dispatch
const validation = InterpolationValidator.validate(interpolated);
if (!validation.valid) {
throw new StageInputError(stage.name, validation.errors);
}
// Dispatch to queue — fire and forget
const queue = this.queues.get(stage.queue!);
await queue.add(stage.worker!, interpolated, {
attempts: stage.retry?.max_attempts ?? 1,
backoff: {
type: stage.retry?.backoff_type ?? 'fixed',
delay: stage.retry?.initial_delay_ms ?? 5000,
},
});
}
Query side — Workers report results via HMAC-signed webhooks:
// Worker completion webhook handler
async handleWebhook(req: Request): Promise<void> {
// Verify HMAC signature
const signature = req.headers['x-webhook-signature'];
const expected = computeHmacSignature(req.body, this.signingKey);
if (!timingSafeEqual(signature, expected)) {
throw new UnauthorizedError('Invalid webhook signature');
}
// Emit domain event — no direct DB write
await this.dispatcher.dispatch({
type: 'model.status.update_requested',
correlationId: req.body.pipelineId,
timestamp: new Date(),
payload: {
modelId: req.body.modelId,
newStatus: req.body.status,
},
});
}
The handler that receives the event is the only code that touches the persistence layer:
class ModelStatusUpdateHandler
implements DomainEventHandler<ModelStatusUpdateRequested>
{
constructor(private webhookClient: ModelWebhookClient) {}
async handle(event: ModelStatusUpdateRequested): Promise<void> {
await this.webhookClient.send({
type: 'model.status.update',
modelId: event.payload.modelId,
newStatus: event.payload.newStatus,
timestamp: event.timestamp.toISOString(),
signature: computeHmacSignature(event, this.secret),
});
}
}
sequenceDiagram
participant Client
participant Backend as NestJS Backend
participant Orchestrator as Worker Backend
participant Queue as Redis / BullMQ
participant Worker as Python Worker
participant LLM as LLM Provider
Client->>Backend: POST /models/{id}/process
Backend->>Orchestrator: Webhook: start pipeline
Orchestrator->>Queue: Enqueue: semantic-analysis job
Queue->>Worker: Job dispatched
Worker->>LLM: generate(classification prompt)
LLM-->>Worker: Classification result
Worker->>Orchestrator: Webhook: job completed (HMAC-signed)
Orchestrator->>Orchestrator: Dispatch domain event
Orchestrator->>Backend: Webhook: model.status.update
Backend->>Backend: Persist to PostgreSQL
Note over Worker,LLM: Worker-LLM interaction is<br/>invisible to the domain
Why webhooks instead of direct DB access? Three reasons:
- Single writer principle — Only the NestJS Backend writes to PostgreSQL. No schema coupling between services.
- Language independence — Python workers don’t need a TypeScript ORM or PostgreSQL driver.
- Audit trail — Every state change flows through the domain event system, making it observable and replayable.
The Guardrails Taxonomy
This is where most AI integrations fail. Without explicit guardrails, an AI agent will cheerfully burn through your entire API budget in an hour, spawn 50 concurrent tasks, and make decisions that violate domain invariants. Our guardrails are organised into seven categories:
1. Budget Caps
Every AI operation has a hard dollar limit. When the budget is exhausted, the operation is terminated — not paused, terminated. There’s no “just one more token” grace period.
class Settings(BaseSettings):
# Per-operation budget caps
llm_max_budget_usd: float = 8.0 # Max per pipeline execution
plan_max_budget_usd: float = 2.0 # Max for planning phase
subtask_max_budget_usd: float = 5.0 # Max per subtask
# Model-specific configuration
claude_model: str = "claude-opus-4-6"
claude_timeout: int = 1800 # 30 minutes
kimi_timeout: int = 600 # 10 minutes
Budget tracking is done at the adapter level, not in the domain. The ElkMetricTracker records token usage per invocation, and a budget monitor aggregates costs in real-time:
async with elk_wrapper.track_llm_execution(
model="claude-opus-4-6",
user_id=context.owner_id,
) as tracker:
result = await llm_port.generate(prompt)
tracker.set_token_usage(input_tokens=1200, output_tokens=800)
# Automatically emits to ELK: model, tokens, cost, duration
2. Concurrency Limits
Unbounded parallelism is how you get a surprise $500 invoice and a rate-limited API key. Concurrency is capped at the system level and per-repository:
class Settings(BaseSettings):
max_concurrent_agents: int = 6 # System-wide cap
max_per_repo: int = 2 # Per-repository cap
These limits are enforced via Redis-based semaphores. If a seventh agent tries to start, it queues until a slot opens.
3. Timeout Enforcement
Every LLM call, every pipeline stage, and every subtask has a deadline. No operation runs indefinitely.
class Settings(BaseSettings):
claude_timeout: int = 1800 # 30 min per LLM call
subtask_timeout_cap: int = 3600 # 1 hour per subtask
lock_ttl_issue: int = 7200 # 2 hour lock timeout
lock_ttl_pr_review: int = 3600 # 1 hour PR review timeout
The TimeoutMonitor scans running operations every 15 seconds. Timed-out operations are forcibly terminated and escalated — either retried (if attempts remain) or raised as an alert.
4. Distributed Locking
Before an AI agent starts working on an issue or PR, it must acquire a distributed lock. This prevents two agents from reviewing the same PR simultaneously or making conflicting changes to the same codebase:
class PRReviewPipeline:
async def run(self, pr: PullRequest) -> dict:
# Acquire lock — blocks if already held
handle = await self._lock.acquire(
LockScope.PR_REVIEW,
f"{pr.org}:{pr.repo}:{pr.number}",
timeout=self._settings.lock_ttl_pr_review,
)
if handle is None:
self._discord.notify(ctx, "PR already under review — skipping")
return {"status": "skipped", "reason": "locked"}
try:
# ... perform review ...
pass
finally:
await self._lock.release(handle)
Locks have TTLs (time-to-live). If an agent crashes without releasing a lock, the TTL expires and the lock becomes available again. This prevents permanent deadlocks.
5. State Machine Validation
AI agents transition entities through states (PR: open → in_review → approved → merged). A state machine enforces that only valid transitions occur — an agent cannot merge a PR that hasn’t been reviewed, regardless of what the LLM suggests:
stateDiagram-v2
[*] --> open
open --> in_review : Agent acquires lock
in_review --> changes_requested : Review finds issues
changes_requested --> in_review : Changes pushed
in_review --> approved : Consensus reached
approved --> merged : Auto-merge
in_review --> escalated : Budget exceeded / Timeout
changes_requested --> escalated : Max rounds exceeded
escalated --> [*] : Human intervention
merged --> [*]
class StateMachine:
VALID_TRANSITIONS = {
PRState.OPEN: {PRState.IN_REVIEW},
PRState.IN_REVIEW: {PRState.CHANGES_REQUESTED, PRState.APPROVED, PRState.ESCALATED},
PRState.CHANGES_REQUESTED: {PRState.IN_REVIEW, PRState.ESCALATED},
PRState.APPROVED: {PRState.MERGED},
}
async def transition_pr(
self, org: str, repo: str, pr_number: int,
new_state: PRState, reason: str
) -> None:
current = await self.get_pr_state(org, repo, pr_number)
allowed = self.VALID_TRANSITIONS.get(current, set())
if new_state not in allowed:
raise InvalidTransitionError(
f"Cannot transition PR from {current} to {new_state}. "
f"Allowed: {allowed}"
)
await self.set_pr_state(org, repo, pr_number, new_state, reason)
6. Consensus Requirements
For critical actions (merging a PR, publishing a model), a single AI opinion isn’t enough. The system requires multiple consecutive approvals before proceeding:
class Settings(BaseSettings):
min_approvals_for_consensus: int = 2 # At least 2 consecutive approvals
The review pipeline runs in rounds. Each round produces a review. If two consecutive rounds approve, the PR is merged. If any round requests changes, the counter resets. This prevents a single hallucinated “LGTM” from reaching production.
7. Workspace Isolation
AI agents that modify code do so in isolated git worktrees — not on the main branch, not even on a shared development branch:
# Each agent gets a dedicated worktree
workspace_dir = f"{repo_dir}/.worktrees/{branch_name}"
await ssh_client.run(
f"git worktree add -B {branch_name} {workspace_dir} origin/{branch_name}"
)
If the agent corrupts the workspace, only its worktree is affected. The main repository is untouched. Worktrees are cleaned up after the agent completes (or after the lock TTL expires).
Three-Level Validation: From Schema to Runtime
AI outputs are untrusted by default. Every piece of data that flows from an AI adapter back into the domain passes through three validation layers:
Level 1: JSON Schema (Structural)
Pipeline definitions and job payloads are validated against JSON Schema before any processing begins:
// AJV schema validation at the API boundary
const validator = ajv.compile(pipelineSchema);
const valid = validator(payload);
if (!valid) {
throw new ValidationError(
validator.errors!.map(e => `${e.instancePath}: ${e.message}`)
);
}
Level 2: Semantic (Logical)
Structurally valid data can still be logically broken. Semantic validation catches dangling references, duplicate identifiers, and constraint violations:
// Example: verify all stage references exist
for (const stage of pipeline.stages) {
if (stage.on_success && !stageNames.has(stage.on_success)) {
errors.push(
`Stage '${stage.name}' references non-existent target '${stage.on_success}'`
);
}
}
Level 3: Runtime (Type Safety)
At the Python worker level, Pydantic models enforce type safety on every incoming job payload. If an AI-generated classification result has a missing field or wrong type, the worker rejects it immediately:
from pydantic import BaseModel, validator
class ClassificationResult(BaseModel):
model_id: str
metamodel_id: str
confidence: float
categories: list[str]
@validator('confidence')
def confidence_in_range(cls, v):
if not 0.0 <= v <= 1.0:
raise ValueError(f'Confidence must be 0-1, got {v}')
return v
@validator('model_id')
def model_id_not_empty(cls, v):
if not v.strip():
raise ValueError('model_id cannot be empty')
return v
# Untrusted AI output is validated at the boundary
result = ClassificationResult(**ai_response) # Raises on invalid data
Observable AI: ELK Metrics for LLM Operations
You cannot manage what you cannot measure. Every LLM invocation is tracked with structured metrics published to Elasticsearch:
class MetricCategory(Enum):
LLM_EXECUTION = "llm_execution"
BLENDER_OPERATION = "blender_operation"
FILE_PROCESSING = "file_processing"
MARKETPLACE_API = "marketplace_api"
class ElkMetricTracker:
"""Context manager for tracking operations with automatic metrics emission."""
async with elk_wrapper.track_llm_execution(
model="claude-opus-4-6",
user_id="user-123",
) as tracker:
result = await llm.generate(prompt)
tracker.set_token_usage(input_tokens=1500, output_tokens=600)
# Auto-emits: model, tokens, cost_usd, duration_ms, success, error
This data powers Kibana dashboards showing:
- Cost per pipeline — Total LLM spend per model processing run
- Token efficiency — Input/output token ratio (are our prompts too verbose?)
- Latency percentiles — p50, p95, p99 for each model provider
- Failure rates — Which models fail most? Rate limits? Timeouts? Content filters?
- Budget utilisation — How close are operations running to their caps?
graph LR
subgraph ai[AI Adapter Layer]
LLM[LLM Adapter<br/><i>OllamaAdapter / OpenRouterAdapter</i>]
MT[Metric Tracker<br/><i>ElkMetricTracker</i>]
end
subgraph guardrails[Guardrail Layer]
BC[Budget Cap<br/><i>$8 max per pipeline</i>]
CL[Concurrency Limiter<br/><i>6 agents max</i>]
TO[Timeout Monitor<br/><i>30min per call</i>]
DL[Distributed Lock<br/><i>Redis + TTL</i>]
SM[State Machine<br/><i>Valid transitions only</i>]
CR[Consensus Gate<br/><i>2+ approvals required</i>]
WI[Workspace Isolation<br/><i>Git worktrees</i>]
end
subgraph observe[Observability]
ELK[Elasticsearch<br/><i>Metrics + Events</i>]
KB[Kibana Dashboards<br/><i>Cost, Latency, Errors</i>]
end
LLM --> MT
MT --> ELK
ELK --> KB
LLM -.->|enforced by| BC
LLM -.->|enforced by| CL
LLM -.->|enforced by| TO
LLM -.->|requires| DL
LLM -.->|validated by| SM
LLM -.->|gated by| CR
LLM -.->|sandboxed in| WI
style ai fill:#1e1e24,stroke:#5eead4,color:#e4e4e7
style guardrails fill:#1e1e24,stroke:#fbbf24,color:#e4e4e7
style observe fill:#1e1e24,stroke:#818cf8,color:#e4e4e7
When Guardrails Slow You Down
Not every AI operation needs all seven guardrails. Over-constraining low-risk operations creates unnecessary latency and complexity. Here’s our risk-based approach:
| Operation | Risk Level | Guardrails Applied |
|---|---|---|
| Model classification (read-only) | Low | Budget cap, timeout, metrics |
| Metadata extraction (read-only) | Low | Budget cap, timeout, metrics |
| PR review (makes comments) | Medium | All except workspace isolation |
| Code generation (writes code) | High | All seven guardrails |
| Auto-merge PR | Critical | All seven + human approval for repos with >10 contributors |
The principle: guardrails should be proportional to the blast radius. If an AI classification is wrong, it’s a minor inconvenience — a human corrects it. If an AI merges broken code to main, it’s a production incident. Scale your controls accordingly.
Lessons Learned
AI Belongs in the Adapter Layer, Not the Domain
When we first integrated LLM classification, we put it inside the Model aggregate: model.classify(prompt). This was wrong. It coupled the domain to AI concerns (retries, model selection, token budgets) that have nothing to do with 3D models. Moving it to an adapter behind the LLMPort simplified both the domain and the AI integration independently.
Budget Caps Are Non-Negotiable
In our first week of production AI operations, a bug in prompt construction caused a feedback loop that spent $340 in 45 minutes. After that, we made budget caps a hard kill switch — not a soft warning, not a “please try to stay under.” When you hit the limit, the operation dies. Period.
State Machines Prevent the Worst AI Failures
The state machine saved us from catastrophic failures more than any other guardrail. Without it, an AI agent once tried to merge a PR that was in changes_requested state — skipping the review update entirely. The state machine rejected the transition, the operation was escalated to a human, and we added the scenario to our test suite.
Observability Pays for Itself Immediately
Within the first month of ELK metrics, we discovered that 40% of our semantic analysis calls were redundant — the same model was analysed with the same parameters multiple times. Adding MinIO result caching (which we could identify as a fix because of the metrics) cut our monthly LLM spend by 35%.
Conclusion
Integrating AI into enterprise architecture isn’t a plug-and-play operation. LLMs are powerful but undisciplined — they need the same engineering rigor we apply to any untrusted external dependency. The combination of DDD (for domain isolation), hexagonal architecture (for port/adapter abstraction), and CQRS (for clean command/query separation) provides the structural foundation. The seven guardrails — budget caps, concurrency limits, timeouts, distributed locks, state machines, consensus requirements, and workspace isolation — provide the runtime safety net.
The result is a system where AI handles the tasks it’s good at (classification, analysis, code review) while the architecture ensures it can’t do damage when it’s wrong. And that happens more often than you’d expect.