Integrating AI Workflows with Guardrails in DDD/Hexagonal Architecture and CQRS

How to embed LLM-powered agents into enterprise-grade architecture without losing control — ports and adapters, domain events, budget caps, state machines, and observable AI in production.

Large language models are powerful, unpredictable, expensive, and slow. Enterprise architectures are disciplined, predictable, cost-controlled, and observable. Putting LLMs inside a system built on Domain-Driven Design, hexagonal architecture, and CQRS is fundamentally a containment and integration problem: how do you use AI’s capabilities without letting its failure modes — hallucination, cost explosion, latency spikes, non-determinism — leak into the rest of your domain?

This article walks through how we solved this at Mesh-Sync, a 3D model processing platform where AI powers semantic analysis, model classification, and automated code review. Every pattern described here runs in production, handling thousands of LLM calls per day with hard budget caps, distributed locking, state machine validation, and full observability.

The Fundamental Tension

Traditional software has a contract: given input X, produce output Y, every time, within Z milliseconds. DDD reinforces this with bounded contexts, aggregates that enforce invariants, and domain events that represent facts.

LLMs break every part of that contract:

  • Non-deterministic output — Same prompt, different result, every time.
  • Unbounded latency — A response might take 2 seconds or 90 seconds.
  • Cost per invocation — Every call costs money, and complex chains multiply costs.
  • Failure modes — Rate limits, context window overflow, content filters, model degradation.

The architectural challenge is: how do you give AI a seat at the table without letting it flip the table?

The answer, in our experience, is to treat AI as an infrastructure concern — not a domain concept. The domain layer neither knows nor cares that classification is powered by an LLM. It asks a port for a classification, and an adapter provides one. Everything else — model selection, retry logic, budget enforcement, fallback strategies — lives outside the domain.

DDD Foundations: Bounded Contexts and Domain Events

Before introducing AI, let’s establish the DDD structure it plugs into.

Mesh-Sync has several bounded contexts: Model Management (uploads, metadata, storage), Pipeline Orchestration (job scheduling, stage execution), Marketplace (publishing, search, transactions), and AI Classification (semantic analysis, model categorisation).

Each context communicates through domain events — immutable records of something that happened in the domain:

// Domain event definitions
interface DomainEvent {
  type: string;
  correlationId: string;
  timestamp: Date;
  payload: any;
}

// Concrete events
type ModelStatusUpdateRequested = DomainEvent & {
  type: 'model.status.update_requested';
  payload: {
    modelId: string;
    newStatus: 'processing' | 'completed' | 'failed';
  };
};

type TechnicalMetadataSaveRequested = DomainEvent & {
  type: 'model.technical_metadata.save_requested';
  payload: {
    modelId: string;
    metadata: Record<string, unknown>;
  };
};

type ModelClassificationCompleted = DomainEvent & {
  type: 'model.classification.completed';
  payload: {
    modelId: string;
    metamodelId: string;
    confidence: number;
    classifiedAt: Date;
  };
};

Events are dispatched through a Mediator pattern — each event type has exactly one handler, and the dispatcher routes events without the emitter knowing who handles them:

class DomainEventDispatcher {
  private handlers = new Map<string, DomainEventHandler<any>>();

  register<T extends DomainEvent>(
    eventType: string,
    handler: DomainEventHandler<T>
  ): void {
    this.handlers.set(eventType, handler);
  }

  async dispatch<T extends DomainEvent>(event: T): Promise<void> {
    const handler = this.handlers.get(event.type);
    if (!handler) {
      throw new Error(`No handler for event type: ${event.type}`);
    }
    await handler.handle(event);
  }
}

// Registration at startup
const dispatcher = new DomainEventDispatcher();
dispatcher.register(
  'model.status.update_requested',
  new ModelStatusUpdateHandler(webhookClient)
);
dispatcher.register(
  'model.technical_metadata.save_requested',
  new TechnicalMetadataSaveHandler(webhookClient)
);

Why single-handler-per-event? In our experience, fan-out (multiple handlers for one event) creates implicit coupling and makes it impossible to reason about side effects. If model status updates need to trigger notifications AND cache invalidation, those are two separate events in the domain, not one event with two handlers.

Hexagonal Architecture: Ports and Adapters for AI

The hexagonal architecture (ports and adapters) is the key pattern that makes AI integration clean. The domain defines ports — abstract interfaces that express what it needs. Adapters implement those ports with specific technology.

Here’s the LLM port:

from abc import ABC, abstractmethod

class LLMPort(ABC):
    """Domain port: the system needs text generation capability.
    The domain doesn't know or care if it's GPT-4, Claude, or a local model."""

    @abstractmethod
    async def generate(
        self,
        prompt: str,
        max_tokens: int = 2000,
        temperature: float = 0.7,
    ) -> str:
        pass

And the adapters that implement it:

class OllamaAdapter(LLMPort):
    """Adapter: local inference via Ollama."""

    def __init__(self, model: str = "llama3", base_url: str = "http://localhost:11434"):
        self.model = model
        self.base_url = base_url

    async def generate(self, prompt: str, max_tokens: int = 2000,
                       temperature: float = 0.7) -> str:
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.base_url}/api/generate",
                json={
                    "model": self.model,
                    "prompt": prompt,
                    "options": {"num_predict": max_tokens, "temperature": temperature},
                },
                timeout=120.0,
            )
            return response.json()["response"]


class OpenRouterAdapter(LLMPort):
    """Adapter: cloud inference via OpenRouter (GPT-4, Claude, etc.)."""

    def __init__(self, api_key: str, model: str = "anthropic/claude-sonnet-4-20250514"):
        self.api_key = api_key
        self.model = model

    async def generate(self, prompt: str, max_tokens: int = 2000,
                       temperature: float = 0.7) -> str:
        async with httpx.AsyncClient() as client:
            response = await client.post(
                "https://openrouter.ai/api/v1/chat/completions",
                headers={"Authorization": f"Bearer {self.api_key}"},
                json={
                    "model": self.model,
                    "messages": [{"role": "user", "content": prompt}],
                    "max_tokens": max_tokens,
                    "temperature": temperature,
                },
                timeout=180.0,
            )
            return response.json()["choices"][0]["message"]["content"]

The same pattern applies to the worker backend — which is not an AI concern, but follows identical hexagonal principles:

class WorkerBackendPort(ABC):
    """Port: the system needs to dispatch and manage background jobs."""

    @abstractmethod
    async def add_job(self, queue_name: str, job_name: str,
                      data: dict, opts: dict | None = None) -> str:
        pass

    @abstractmethod
    async def create_worker(self, queue_name: str,
                            processor: Callable, config: dict | None = None) -> Any:
        pass


class BullMQAdapter(WorkerBackendPort):
    """Adapter: job dispatch via BullMQ/Redis."""

    async def add_job(self, queue_name, job_name, data, opts=None):
        queue = QueueManager.get_queue(queue_name)
        job = await queue.add(job_name, data, opts or {})
        return job.id


class ExternalWorkerAdapter(WorkerBackendPort):
    """Adapter: job dispatch via external HTTP service."""

    def __init__(self, service_url: str):
        self.service_url = service_url

    async def add_job(self, queue_name, job_name, data, opts=None):
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.service_url}/jobs",
                json={"queue": queue_name, "name": job_name, "data": data},
            )
            return response.json()["jobId"]

A factory selects the adapter at startup based on configuration:

class WorkerBackendFactory:
    @staticmethod
    def get_adapter() -> WorkerBackendPort:
        if os.getenv("USE_EXTERNAL_BACKEND") == "true":
            return ExternalWorkerAdapter(os.getenv("BACKEND_SERVICE_URL"))
        return BullMQAdapter()

The hexagonal diagram below shows how the domain core is insulated from both AI and infrastructure:

graph TB
    subgraph domain[Domain Core]
        BC1[Model Management<br/><i>Bounded Context</i>]
        BC2[Pipeline Orchestration<br/><i>Bounded Context</i>]
        BC3[AI Classification<br/><i>Bounded Context</i>]
        DE[Domain Events<br/><i>Mediator Dispatch</i>]
    end

    subgraph ports[Ports — Interfaces]
        LP[LLM Port<br/><i>generate&lpar;prompt&rpar;</i>]
        WBP[Worker Backend Port<br/><i>add_job&lpar;&rpar;, create_worker&lpar;&rpar;</i>]
        SP[Storage Port<br/><i>upload&lpar;&rpar;, download&lpar;&rpar;</i>]
        EP[Event Port<br/><i>publish&lpar;event&rpar;</i>]
    end

    subgraph adapters[Adapters — Implementations]
        OA[Ollama Adapter<br/><i>Local LLM</i>]
        ORA[OpenRouter Adapter<br/><i>Cloud LLM</i>]
        BMA[BullMQ Adapter<br/><i>Redis queues</i>]
        EWA[External Worker Adapter<br/><i>HTTP service</i>]
        S3A[S3 Adapter<br/><i>AWS storage</i>]
        ELKA[ELK Adapter<br/><i>Elasticsearch events</i>]
    end

    BC1 & BC2 & BC3 --> DE
    BC3 --> LP
    BC2 --> WBP
    BC1 --> SP
    DE --> EP

    LP --> OA
    LP --> ORA
    WBP --> BMA
    WBP --> EWA
    SP --> S3A
    EP --> ELKA

    style domain fill:#1e1e24,stroke:#5eead4,color:#e4e4e7
    style ports fill:#1e1e24,stroke:#818cf8,color:#e4e4e7
    style adapters fill:#1e1e24,stroke:#34d399,color:#e4e4e7

The critical insight: When you swap from OllamaAdapter (local, free, fast, lower quality) to OpenRouterAdapter (cloud, paid, slower, higher quality), the domain layer doesn’t change. Not one line. The factory handles selection, the port enforces the contract, and the adapter handles the details.

CQRS: Separating Commands from Queries

Command-Query Responsibility Segregation (CQRS) in our system is implemented through webhooks — a deliberate architectural decision (ADR-010) that keeps the write path clean and the read path flexible.

Command side — Dispatching work:

// StageExecutor dispatches a worker job (command)
async executeWorkerStage(
  stage: StageDefinition,
  context: PipelineContext
): Promise<void> {
  const interpolated = this.interpolate(stage.inputs, context);

  // Validate inputs before dispatch
  const validation = InterpolationValidator.validate(interpolated);
  if (!validation.valid) {
    throw new StageInputError(stage.name, validation.errors);
  }

  // Dispatch to queue — fire and forget
  const queue = this.queues.get(stage.queue!);
  await queue.add(stage.worker!, interpolated, {
    attempts: stage.retry?.max_attempts ?? 1,
    backoff: {
      type: stage.retry?.backoff_type ?? 'fixed',
      delay: stage.retry?.initial_delay_ms ?? 5000,
    },
  });
}

Query side — Workers report results via HMAC-signed webhooks:

// Worker completion webhook handler
async handleWebhook(req: Request): Promise<void> {
  // Verify HMAC signature
  const signature = req.headers['x-webhook-signature'];
  const expected = computeHmacSignature(req.body, this.signingKey);
  if (!timingSafeEqual(signature, expected)) {
    throw new UnauthorizedError('Invalid webhook signature');
  }

  // Emit domain event — no direct DB write
  await this.dispatcher.dispatch({
    type: 'model.status.update_requested',
    correlationId: req.body.pipelineId,
    timestamp: new Date(),
    payload: {
      modelId: req.body.modelId,
      newStatus: req.body.status,
    },
  });
}

The handler that receives the event is the only code that touches the persistence layer:

class ModelStatusUpdateHandler
  implements DomainEventHandler<ModelStatusUpdateRequested>
{
  constructor(private webhookClient: ModelWebhookClient) {}

  async handle(event: ModelStatusUpdateRequested): Promise<void> {
    await this.webhookClient.send({
      type: 'model.status.update',
      modelId: event.payload.modelId,
      newStatus: event.payload.newStatus,
      timestamp: event.timestamp.toISOString(),
      signature: computeHmacSignature(event, this.secret),
    });
  }
}
sequenceDiagram
    participant Client
    participant Backend as NestJS Backend
    participant Orchestrator as Worker Backend
    participant Queue as Redis / BullMQ
    participant Worker as Python Worker
    participant LLM as LLM Provider

    Client->>Backend: POST /models/{id}/process
    Backend->>Orchestrator: Webhook: start pipeline
    Orchestrator->>Queue: Enqueue: semantic-analysis job
    Queue->>Worker: Job dispatched

    Worker->>LLM: generate(classification prompt)
    LLM-->>Worker: Classification result

    Worker->>Orchestrator: Webhook: job completed (HMAC-signed)
    Orchestrator->>Orchestrator: Dispatch domain event
    Orchestrator->>Backend: Webhook: model.status.update
    Backend->>Backend: Persist to PostgreSQL

    Note over Worker,LLM: Worker-LLM interaction is<br/>invisible to the domain

Why webhooks instead of direct DB access? Three reasons:

  1. Single writer principle — Only the NestJS Backend writes to PostgreSQL. No schema coupling between services.
  2. Language independence — Python workers don’t need a TypeScript ORM or PostgreSQL driver.
  3. Audit trail — Every state change flows through the domain event system, making it observable and replayable.

The Guardrails Taxonomy

This is where most AI integrations fail. Without explicit guardrails, an AI agent will cheerfully burn through your entire API budget in an hour, spawn 50 concurrent tasks, and make decisions that violate domain invariants. Our guardrails are organised into seven categories:

1. Budget Caps

Every AI operation has a hard dollar limit. When the budget is exhausted, the operation is terminated — not paused, terminated. There’s no “just one more token” grace period.

class Settings(BaseSettings):
    # Per-operation budget caps
    llm_max_budget_usd: float = 8.0        # Max per pipeline execution
    plan_max_budget_usd: float = 2.0        # Max for planning phase
    subtask_max_budget_usd: float = 5.0     # Max per subtask

    # Model-specific configuration
    claude_model: str = "claude-opus-4-6"
    claude_timeout: int = 1800              # 30 minutes
    kimi_timeout: int = 600                 # 10 minutes

Budget tracking is done at the adapter level, not in the domain. The ElkMetricTracker records token usage per invocation, and a budget monitor aggregates costs in real-time:

async with elk_wrapper.track_llm_execution(
    model="claude-opus-4-6",
    user_id=context.owner_id,
) as tracker:
    result = await llm_port.generate(prompt)
    tracker.set_token_usage(input_tokens=1200, output_tokens=800)
    # Automatically emits to ELK: model, tokens, cost, duration

2. Concurrency Limits

Unbounded parallelism is how you get a surprise $500 invoice and a rate-limited API key. Concurrency is capped at the system level and per-repository:

class Settings(BaseSettings):
    max_concurrent_agents: int = 6    # System-wide cap
    max_per_repo: int = 2             # Per-repository cap

These limits are enforced via Redis-based semaphores. If a seventh agent tries to start, it queues until a slot opens.

3. Timeout Enforcement

Every LLM call, every pipeline stage, and every subtask has a deadline. No operation runs indefinitely.

class Settings(BaseSettings):
    claude_timeout: int = 1800        # 30 min per LLM call
    subtask_timeout_cap: int = 3600   # 1 hour per subtask
    lock_ttl_issue: int = 7200        # 2 hour lock timeout
    lock_ttl_pr_review: int = 3600    # 1 hour PR review timeout

The TimeoutMonitor scans running operations every 15 seconds. Timed-out operations are forcibly terminated and escalated — either retried (if attempts remain) or raised as an alert.

4. Distributed Locking

Before an AI agent starts working on an issue or PR, it must acquire a distributed lock. This prevents two agents from reviewing the same PR simultaneously or making conflicting changes to the same codebase:

class PRReviewPipeline:
    async def run(self, pr: PullRequest) -> dict:
        # Acquire lock — blocks if already held
        handle = await self._lock.acquire(
            LockScope.PR_REVIEW,
            f"{pr.org}:{pr.repo}:{pr.number}",
            timeout=self._settings.lock_ttl_pr_review,
        )
        if handle is None:
            self._discord.notify(ctx, "PR already under review — skipping")
            return {"status": "skipped", "reason": "locked"}

        try:
            # ... perform review ...
            pass
        finally:
            await self._lock.release(handle)

Locks have TTLs (time-to-live). If an agent crashes without releasing a lock, the TTL expires and the lock becomes available again. This prevents permanent deadlocks.

5. State Machine Validation

AI agents transition entities through states (PR: openin_reviewapprovedmerged). A state machine enforces that only valid transitions occur — an agent cannot merge a PR that hasn’t been reviewed, regardless of what the LLM suggests:

stateDiagram-v2
    [*] --> open
    open --> in_review : Agent acquires lock
    in_review --> changes_requested : Review finds issues
    changes_requested --> in_review : Changes pushed
    in_review --> approved : Consensus reached
    approved --> merged : Auto-merge
    in_review --> escalated : Budget exceeded / Timeout
    changes_requested --> escalated : Max rounds exceeded
    escalated --> [*] : Human intervention
    merged --> [*]
class StateMachine:
    VALID_TRANSITIONS = {
        PRState.OPEN: {PRState.IN_REVIEW},
        PRState.IN_REVIEW: {PRState.CHANGES_REQUESTED, PRState.APPROVED, PRState.ESCALATED},
        PRState.CHANGES_REQUESTED: {PRState.IN_REVIEW, PRState.ESCALATED},
        PRState.APPROVED: {PRState.MERGED},
    }

    async def transition_pr(
        self, org: str, repo: str, pr_number: int,
        new_state: PRState, reason: str
    ) -> None:
        current = await self.get_pr_state(org, repo, pr_number)
        allowed = self.VALID_TRANSITIONS.get(current, set())

        if new_state not in allowed:
            raise InvalidTransitionError(
                f"Cannot transition PR from {current} to {new_state}. "
                f"Allowed: {allowed}"
            )

        await self.set_pr_state(org, repo, pr_number, new_state, reason)

6. Consensus Requirements

For critical actions (merging a PR, publishing a model), a single AI opinion isn’t enough. The system requires multiple consecutive approvals before proceeding:

class Settings(BaseSettings):
    min_approvals_for_consensus: int = 2  # At least 2 consecutive approvals

The review pipeline runs in rounds. Each round produces a review. If two consecutive rounds approve, the PR is merged. If any round requests changes, the counter resets. This prevents a single hallucinated “LGTM” from reaching production.

7. Workspace Isolation

AI agents that modify code do so in isolated git worktrees — not on the main branch, not even on a shared development branch:

# Each agent gets a dedicated worktree
workspace_dir = f"{repo_dir}/.worktrees/{branch_name}"
await ssh_client.run(
    f"git worktree add -B {branch_name} {workspace_dir} origin/{branch_name}"
)

If the agent corrupts the workspace, only its worktree is affected. The main repository is untouched. Worktrees are cleaned up after the agent completes (or after the lock TTL expires).

Three-Level Validation: From Schema to Runtime

AI outputs are untrusted by default. Every piece of data that flows from an AI adapter back into the domain passes through three validation layers:

Level 1: JSON Schema (Structural)

Pipeline definitions and job payloads are validated against JSON Schema before any processing begins:

// AJV schema validation at the API boundary
const validator = ajv.compile(pipelineSchema);
const valid = validator(payload);
if (!valid) {
  throw new ValidationError(
    validator.errors!.map(e => `${e.instancePath}: ${e.message}`)
  );
}

Level 2: Semantic (Logical)

Structurally valid data can still be logically broken. Semantic validation catches dangling references, duplicate identifiers, and constraint violations:

// Example: verify all stage references exist
for (const stage of pipeline.stages) {
  if (stage.on_success && !stageNames.has(stage.on_success)) {
    errors.push(
      `Stage '${stage.name}' references non-existent target '${stage.on_success}'`
    );
  }
}

Level 3: Runtime (Type Safety)

At the Python worker level, Pydantic models enforce type safety on every incoming job payload. If an AI-generated classification result has a missing field or wrong type, the worker rejects it immediately:

from pydantic import BaseModel, validator

class ClassificationResult(BaseModel):
    model_id: str
    metamodel_id: str
    confidence: float
    categories: list[str]

    @validator('confidence')
    def confidence_in_range(cls, v):
        if not 0.0 <= v <= 1.0:
            raise ValueError(f'Confidence must be 0-1, got {v}')
        return v

    @validator('model_id')
    def model_id_not_empty(cls, v):
        if not v.strip():
            raise ValueError('model_id cannot be empty')
        return v

# Untrusted AI output is validated at the boundary
result = ClassificationResult(**ai_response)  # Raises on invalid data

Observable AI: ELK Metrics for LLM Operations

You cannot manage what you cannot measure. Every LLM invocation is tracked with structured metrics published to Elasticsearch:

class MetricCategory(Enum):
    LLM_EXECUTION = "llm_execution"
    BLENDER_OPERATION = "blender_operation"
    FILE_PROCESSING = "file_processing"
    MARKETPLACE_API = "marketplace_api"


class ElkMetricTracker:
    """Context manager for tracking operations with automatic metrics emission."""

    async with elk_wrapper.track_llm_execution(
        model="claude-opus-4-6",
        user_id="user-123",
    ) as tracker:
        result = await llm.generate(prompt)
        tracker.set_token_usage(input_tokens=1500, output_tokens=600)
        # Auto-emits: model, tokens, cost_usd, duration_ms, success, error

This data powers Kibana dashboards showing:

  • Cost per pipeline — Total LLM spend per model processing run
  • Token efficiency — Input/output token ratio (are our prompts too verbose?)
  • Latency percentiles — p50, p95, p99 for each model provider
  • Failure rates — Which models fail most? Rate limits? Timeouts? Content filters?
  • Budget utilisation — How close are operations running to their caps?
graph LR
    subgraph ai[AI Adapter Layer]
        LLM[LLM Adapter<br/><i>OllamaAdapter / OpenRouterAdapter</i>]
        MT[Metric Tracker<br/><i>ElkMetricTracker</i>]
    end

    subgraph guardrails[Guardrail Layer]
        BC[Budget Cap<br/><i>$8 max per pipeline</i>]
        CL[Concurrency Limiter<br/><i>6 agents max</i>]
        TO[Timeout Monitor<br/><i>30min per call</i>]
        DL[Distributed Lock<br/><i>Redis + TTL</i>]
        SM[State Machine<br/><i>Valid transitions only</i>]
        CR[Consensus Gate<br/><i>2+ approvals required</i>]
        WI[Workspace Isolation<br/><i>Git worktrees</i>]
    end

    subgraph observe[Observability]
        ELK[Elasticsearch<br/><i>Metrics + Events</i>]
        KB[Kibana Dashboards<br/><i>Cost, Latency, Errors</i>]
    end

    LLM --> MT
    MT --> ELK
    ELK --> KB

    LLM -.->|enforced by| BC
    LLM -.->|enforced by| CL
    LLM -.->|enforced by| TO
    LLM -.->|requires| DL
    LLM -.->|validated by| SM
    LLM -.->|gated by| CR
    LLM -.->|sandboxed in| WI

    style ai fill:#1e1e24,stroke:#5eead4,color:#e4e4e7
    style guardrails fill:#1e1e24,stroke:#fbbf24,color:#e4e4e7
    style observe fill:#1e1e24,stroke:#818cf8,color:#e4e4e7

When Guardrails Slow You Down

Not every AI operation needs all seven guardrails. Over-constraining low-risk operations creates unnecessary latency and complexity. Here’s our risk-based approach:

OperationRisk LevelGuardrails Applied
Model classification (read-only)LowBudget cap, timeout, metrics
Metadata extraction (read-only)LowBudget cap, timeout, metrics
PR review (makes comments)MediumAll except workspace isolation
Code generation (writes code)HighAll seven guardrails
Auto-merge PRCriticalAll seven + human approval for repos with >10 contributors

The principle: guardrails should be proportional to the blast radius. If an AI classification is wrong, it’s a minor inconvenience — a human corrects it. If an AI merges broken code to main, it’s a production incident. Scale your controls accordingly.

Lessons Learned

AI Belongs in the Adapter Layer, Not the Domain

When we first integrated LLM classification, we put it inside the Model aggregate: model.classify(prompt). This was wrong. It coupled the domain to AI concerns (retries, model selection, token budgets) that have nothing to do with 3D models. Moving it to an adapter behind the LLMPort simplified both the domain and the AI integration independently.

Budget Caps Are Non-Negotiable

In our first week of production AI operations, a bug in prompt construction caused a feedback loop that spent $340 in 45 minutes. After that, we made budget caps a hard kill switch — not a soft warning, not a “please try to stay under.” When you hit the limit, the operation dies. Period.

State Machines Prevent the Worst AI Failures

The state machine saved us from catastrophic failures more than any other guardrail. Without it, an AI agent once tried to merge a PR that was in changes_requested state — skipping the review update entirely. The state machine rejected the transition, the operation was escalated to a human, and we added the scenario to our test suite.

Observability Pays for Itself Immediately

Within the first month of ELK metrics, we discovered that 40% of our semantic analysis calls were redundant — the same model was analysed with the same parameters multiple times. Adding MinIO result caching (which we could identify as a fix because of the metrics) cut our monthly LLM spend by 35%.

Conclusion

Integrating AI into enterprise architecture isn’t a plug-and-play operation. LLMs are powerful but undisciplined — they need the same engineering rigor we apply to any untrusted external dependency. The combination of DDD (for domain isolation), hexagonal architecture (for port/adapter abstraction), and CQRS (for clean command/query separation) provides the structural foundation. The seven guardrails — budget caps, concurrency limits, timeouts, distributed locks, state machines, consensus requirements, and workspace isolation — provide the runtime safety net.

The result is a system where AI handles the tasks it’s good at (classification, analysis, code review) while the architecture ensures it can’t do damage when it’s wrong. And that happens more often than you’d expect.