Model-Based Systems Engineering: Pipeline Orchestration for Distributed Systems
How MBSE principles translate into YAML-driven DAG orchestration — dependency resolution, stage execution, validation layers, and observable infrastructure in production.
When you operate a platform that processes thousands of 3D models daily — each requiring thumbnail generation, semantic analysis, metadata extraction, and quality scoring — ad-hoc scripting breaks down fast. You need a systems engineering approach to orchestration: one where pipelines are declarative models, dependencies are resolved as directed acyclic graphs, execution is observable end-to-end, and failures are recovered gracefully.
This article walks through how we applied Model-Based Systems Engineering (MBSE) principles to build a production pipeline orchestration engine in TypeScript, backed by BullMQ, Redis, MinIO, and Elasticsearch. Every pattern described here runs in production at Mesh-Sync, processing model ingestion workflows across a distributed worker pool.
What is MBSE — and Why Does It Matter Here?
Model-Based Systems Engineering replaces document-centric engineering with formal models as the primary artefact. Instead of writing prose specifications that drift from reality, you define the system’s structure and behaviour in a machine-readable format — and the runtime enforces it.
In traditional pipeline engineering, you see a lot of this:
// The "just call functions in order" approach
await generateThumbnail(model);
await extractMetadata(model);
await runSemanticAnalysis(model);
await publishToMarketplace(model);
This works until it doesn’t. What happens when extractMetadata fails but runSemanticAnalysis is independent? What if you need to add a quality gate between analysis and publishing? What about retry policies, timeouts, parallel branches, or conditional routing?
MBSE answers this by making the pipeline itself a model — a first-class artefact that can be validated, visualised, versioned, and executed by a generic engine.
Pipeline-as-Model: YAML-Driven DAG Definitions
Our pipelines are declared in YAML files that serve as the single source of truth. Each pipeline defines its stages, dependencies, routing logic, error handlers, and resource requirements:
name: model-processing-standard
version: "1.0.0"
trigger:
event: "model.upload.requested"
queue: "model-processing"
context:
modelId: ""
ownerId: ""
storageLocation: {}
stages:
- name: cache-check
type: internal
action: check_minio_cache
inputs:
cacheKey: "${context.modelId}-v1"
on_success: thumbnail-generation
on_failure: error-handler
timeout: 30000
- name: thumbnail-generation
type: worker
queue: thumbnail-generation
worker: thumbnail-generator
inputs:
modelId: "${context.modelId}"
storageLocation: "${context.storageLocation}"
timeout: 300000
retry:
max_attempts: 3
backoff_type: exponential
initial_delay_ms: 1000
on_success: parallel-processing
- name: parallel-processing
type: parallel
parallel_stages:
- semantic-analysis
- technical-metadata
on_success: quality-check
wait_strategy: all
- name: semantic-analysis
type: worker
queue: semantic-analysis
worker: semantic-analyzer
inputs:
modelId: "${context.modelId}"
timeout: 600000
- name: technical-metadata
type: worker
queue: metadata-generation
worker: metadata-generator
inputs:
modelId: "${context.modelId}"
timeout: 180000
- name: quality-check
type: decision
decision:
- condition: "${stages.semantic-analysis.result.confidence > 0.8}"
next: publish-marketplace
- condition: "true"
next: manual-review
- name: publish-marketplace
type: internal
action: update_model_status
inputs:
modelId: "${context.modelId}"
status: "published"
- name: manual-review
type: internal
action: update_model_status
inputs:
modelId: "${context.modelId}"
status: "review_required"
error_handlers:
- name: error-handler
type: internal
action: log
inputs:
message: "Pipeline failed: ${context.error}"
finalize:
- name: cleanup
type: internal
action: cleanup_workspace
This is not configuration — it’s a model. The YAML schema is validated against a JSON Schema definition, semantically checked for dangling references and cycles, and then compiled into an executable DAG.
The Four Stage Types
Our orchestration engine supports four atomic stage types, each with distinct execution semantics:
flowchart TD
A[Pipeline Triggered] --> B{Stage Type?}
B -->|worker| C[Dispatch to BullMQ Queue]
B -->|internal| D[Execute via ActionRegistry]
B -->|parallel| E[Fork: Execute All Branches]
B -->|decision| F[Evaluate JEXL Conditions]
C --> G[Worker Processes Job]
G --> H[Webhook: Result Callback]
H --> I[Next Stage]
D --> I
E --> J[Branch 1]
E --> K[Branch 2]
E --> L[Branch N]
J & K & L --> M{Wait Strategy}
M -->|all| I
M -->|any| I
F --> N[Condition 1: true] --> I
F --> O[Condition 2: false]
O --> P[Condition N: fallback] --> I
style A fill:#1e1e24,stroke:#5eead4,color:#e4e4e7
style I fill:#1e1e24,stroke:#34d399,color:#e4e4e7
Worker Stages
Worker stages dispatch jobs to BullMQ queues. A separate pool of Python or TypeScript workers consumes these jobs asynchronously. The orchestrator doesn’t wait synchronously — instead, workers report completion via HMAC-signed webhooks back to the orchestration API. This decoupling means workers can scale independently, crash without affecting the orchestrator, and be implemented in any language.
if (stage.type === 'worker') {
const queue = this.queues.get(stage.queue!);
await queue.add(stage.worker!, interpolatedInputs, {
attempts: stage.retry?.max_attempts ?? 1,
backoff: {
type: stage.retry?.backoff_type ?? 'fixed',
delay: stage.retry?.initial_delay_ms ?? 5000,
},
});
// Execution continues when webhook callback arrives
}
Internal Stages
Internal stages execute synchronously within the orchestrator process via the Action Registry — a command pattern implementation where each action is a named handler:
if (stage.type === 'internal') {
const action = this.actionRegistry.get(stage.action!);
return await action.execute(interpolatedInputs, context);
}
Actions include cache checks, status updates, cleanup operations, and domain event emission. They’re registered at startup and resolved by name, making the system extensible without modifying the execution engine.
Parallel Stages
Parallel stages fork execution into multiple concurrent branches. The wait_strategy controls synchronisation:
all— Wait for every branch to complete (default). The next stage receives aggregated results.any— Proceed as soon as the first branch completes. Remaining branches continue in the background.
if (stage.type === 'parallel') {
const branches = await Promise.all(
stage.parallel_stages!.map(name => {
const branchStage = this.getStage(name);
return this.executeStage(branchStage, context);
})
);
return { allCompleted: true, branches };
}
Decision Stages
Decision stages evaluate JEXL expressions against the pipeline context and route execution to the first matching branch. This enables quality gates, A/B routing, and conditional workflows without hardcoding logic:
if (stage.type === 'decision') {
for (const branch of stage.decision!) {
const matches = await jexl.eval(branch.condition, {
context: context.variables,
stages: context.stageResults,
});
if (matches) {
return { nextStage: branch.next };
}
}
}
Dependency Resolution: Building the DAG
The DependencyResolver transforms a flat list of stage definitions into a directed acyclic graph by analysing on_success, on_failure, and parallel_stages references:
graph LR
CC[cache-check] --> TG[thumbnail-generation]
TG --> PP[parallel-processing]
PP --> SA[semantic-analysis]
PP --> TM[technical-metadata]
SA --> QC[quality-check]
TM --> QC
QC -->|confidence > 0.8| PM[publish-marketplace]
QC -->|fallback| MR[manual-review]
style CC fill:#1e1e24,stroke:#5eead4,color:#e4e4e7
style PP fill:#1e1e24,stroke:#818cf8,color:#e4e4e7
style QC fill:#1e1e24,stroke:#fbbf24,color:#e4e4e7
style PM fill:#1e1e24,stroke:#34d399,color:#e4e4e7
style MR fill:#1e1e24,stroke:#fbbf24,color:#e4e4e7
The resolver performs three key operations:
- Graph construction — Extract implicit edges from
on_success/on_failurerouting. - Cycle detection — A topological sort verifies the graph is acyclic. Circular dependencies are rejected at validation time, not at runtime.
- Dependency satisfaction — Before executing any stage, the resolver checks that all upstream dependencies are in
completedstate.
class DependencyResolver {
buildGraph(pipeline: PipelineDefinition): Map<string, Set<string>> {
const graph = new Map<string, Set<string>>();
for (const stage of pipeline.stages) {
if (!graph.has(stage.name)) {
graph.set(stage.name, new Set());
}
// Implicit dependency: on_success target depends on this stage
if (stage.on_success) {
if (!graph.has(stage.on_success)) {
graph.set(stage.on_success, new Set());
}
graph.get(stage.on_success)!.add(stage.name);
}
// Parallel branches: all depend on parent
if (stage.type === 'parallel' && stage.parallel_stages) {
for (const branch of stage.parallel_stages) {
if (!graph.has(branch)) {
graph.set(branch, new Set());
}
graph.get(branch)!.add(stage.name);
}
}
}
return graph;
}
areDependenciesSatisfied(
stage: StageDefinition,
statuses: Map<string, StageStatus>
): boolean {
const deps = this.graph.get(stage.name) ?? new Set();
return [...deps].every(dep => statuses.get(dep) === 'completed');
}
}
Three-Level Validation: Catching Errors Before Runtime
A core MBSE principle is validation at definition time, not discovery at runtime. Our pipeline validator enforces three layers:
Level 1: Schema Validation (JSON Schema via AJV)
Every pipeline YAML is validated against a JSON Schema that enforces structural correctness — required fields, allowed stage types, valid retry configurations:
const valid = this.ajvValidator(pipeline);
if (!valid) {
return {
valid: false,
errors: this.ajvValidator.errors!.map(e =>
`${e.instancePath}: ${e.message}`
),
};
}
Level 2: Semantic Validation
Schema-valid pipelines can still be logically broken. Semantic validation catches:
- Duplicate stage names — Each stage must be uniquely identified.
- Dangling references — An
on_success: "nonexistent-stage"is caught here. - Orphaned stages — Stages with no incoming edges and not the entry point.
- Cycle detection — Circular dependencies in the DAG.
private validateSemantics(pipeline: PipelineDefinition): string[] {
const errors: string[] = [];
const stageNames = new Set<string>();
for (const stage of pipeline.stages) {
if (stageNames.has(stage.name)) {
errors.push(`Duplicate stage name: ${stage.name}`);
}
stageNames.add(stage.name);
}
for (const stage of pipeline.stages) {
if (stage.on_success && !stageNames.has(stage.on_success)) {
errors.push(
`Stage '${stage.name}' references non-existent ` +
`on_success target '${stage.on_success}'`
);
}
}
return errors;
}
Level 3: Interpolation Validation
Before dispatching any stage, the engine validates that all ${context.variable} expressions resolve to actual values in the pipeline context. Unresolved interpolations are caught and reported as errors, preventing workers from receiving incomplete payloads:
static validate(data: any): ValidationResult {
const dataStr = JSON.stringify(data);
const unresolved = dataStr.match(/\$\{(?:[^{}]|\{[^}]*\})*\}/g);
if (unresolved && unresolved.length > 0) {
return {
valid: false,
errors: unresolved.map(v => `Uninterpolated variable: ${v}`),
};
}
return { valid: true, errors: [] };
}
Stage Lifecycle and State Management
Each stage progresses through a well-defined state machine. Transitions are enforced — you cannot jump from pending to completed without passing through running:
stateDiagram-v2
[*] --> pending
pending --> running : Dependencies satisfied
running --> completed : Success callback
running --> failed : Error / Exception
running --> timeout : Timeout exceeded
failed --> running : Retry (if attempts remain)
timeout --> running : Retry (if attempts remain)
failed --> [*] : Max retries exceeded
timeout --> [*] : Max retries exceeded
completed --> [*]
The orchestrator persists stage states in Redis using Lua scripts for atomic transitions. This prevents race conditions when multiple webhook callbacks arrive simultaneously:
-- Redis Lua: Atomic stage state transition
local currentState = redis.call('HGET', KEYS[1], 'state')
local allowedTransitions = {
pending = { running = true },
running = { completed = true, failed = true, timeout = true },
failed = { running = true },
timeout = { running = true },
}
if allowedTransitions[currentState]
and allowedTransitions[currentState][ARGV[1]] then
redis.call('HSET', KEYS[1], 'state', ARGV[1])
redis.call('HSET', KEYS[1], 'updated_at', ARGV[2])
return 1
end
return 0
Observable Infrastructure: ELK Event Publishing
Every pipeline event is published to Elasticsearch for real-time monitoring and post-mortem analysis. The event taxonomy covers the full lifecycle:
export enum ELKEventType {
PIPELINE_STARTED = 'pipeline.started',
PIPELINE_COMPLETED = 'pipeline.completed',
PIPELINE_FAILED = 'pipeline.failed',
PIPELINE_TIMEOUT = 'pipeline.timeout',
STAGE_STARTED = 'stage.started',
STAGE_COMPLETED = 'stage.completed',
STAGE_FAILED = 'stage.failed',
STAGE_TIMEOUT = 'stage.timeout',
STAGE_RETRY = 'stage.retry',
CACHE_HIT = 'cache.hit',
CACHE_MISS = 'cache.miss',
AUTHORIZATION_GRANTED = 'authorization.granted',
AUTHORIZATION_DENIED = 'authorization.denied',
}
Events are batched (default: 10 events or 5-second flush interval) to avoid overwhelming Elasticsearch with per-stage writes. Each event carries the pipelineId, stageName, duration, and arbitrary metadata — enabling Kibana dashboards that show pipeline throughput, stage latency distributions, failure hotspots, and cache hit ratios.
sequenceDiagram
participant Client
participant Orchestrator
participant Redis
participant Worker
participant ELK
Client->>Orchestrator: POST /pipeline/start
Orchestrator->>ELK: pipeline.started
Orchestrator->>Redis: Create pipeline state
Orchestrator->>Redis: Enqueue stage: cache-check
Redis->>Orchestrator: cache-check ready
Orchestrator->>ELK: stage.started (cache-check)
Orchestrator->>Orchestrator: Execute internal action
Orchestrator->>ELK: cache.miss
Orchestrator->>ELK: stage.completed (cache-check)
Orchestrator->>Redis: Enqueue stage: thumbnail-generation
Redis->>Worker: Job dispatched
Worker->>Orchestrator: Webhook: job completed
Orchestrator->>ELK: stage.completed (thumbnail-generation)
Orchestrator->>Redis: Enqueue parallel stages
Note over Worker: semantic-analysis + technical-metadata run concurrently
Worker->>Orchestrator: Webhook: semantic-analysis done
Worker->>Orchestrator: Webhook: technical-metadata done
Orchestrator->>ELK: stage.completed (parallel-processing)
Orchestrator->>ELK: pipeline.completed
Caching with MinIO
The MinIOCacheManager implements content-addressable caching for pipeline stage results. Before dispatching an expensive worker job (like 3D thumbnail rendering), the orchestrator checks if a cached result exists:
class MinIOCacheManager {
async checkCache(cacheKey: string): Promise<CacheResult | null> {
try {
const object = await this.minioClient.getObject(
this.bucket,
`pipeline-cache/${cacheKey}`
);
return JSON.parse(await streamToString(object));
} catch {
return null; // Cache miss
}
}
async storeResult(cacheKey: string, result: any, ttl: number): Promise<void> {
await this.minioClient.putObject(
this.bucket,
`pipeline-cache/${cacheKey}`,
JSON.stringify(result),
{ 'X-Amz-Meta-Ttl': String(ttl) }
);
}
}
Cache hits skip the worker entirely and inject the cached result into the pipeline context, cutting processing time for duplicate uploads from minutes to milliseconds.
Timeout Monitoring
The TimeoutMonitor runs as a background process that periodically scans for stages that have exceeded their configured timeout value. Timed-out stages are transitioned to the timeout state and either retried (if attempts remain) or escalated to the error handler:
class TimeoutMonitor {
private readonly checkInterval = 15_000; // 15s
start(): void {
setInterval(() => this.scanForTimeouts(), this.checkInterval);
}
private async scanForTimeouts(): Promise<void> {
const running = await this.getRunningStages();
const now = Date.now();
for (const stage of running) {
const elapsed = now - stage.startedAt;
if (elapsed > stage.timeout) {
await this.transitionToTimeout(stage);
await this.elkPublisher.publishEvent({
type: ELKEventType.STAGE_TIMEOUT,
pipelineId: stage.pipelineId,
stageName: stage.name,
duration: elapsed,
});
}
}
}
}
Lessons Learned
What Worked
Declarative pipelines are the right abstraction. When product requirements change — “add a virus scan before publishing” — we add a stage to the YAML. No orchestrator code changes. The engine is generic; the model carries the domain knowledge.
Three-level validation catches ~95% of configuration errors before any job is dispatched. In six months of production operation, we’ve had zero runtime failures caused by invalid pipeline definitions. Every failure traces back to worker-level issues (network timeouts, upstream API errors, resource exhaustion).
Observable by default saves hours. When a pipeline takes 4x longer than expected, the ELK dashboard immediately shows which stage is the bottleneck — no debugging, no log grepping.
What We’d Do Differently
Pipeline versioning needs a migration strategy. When you update a pipeline YAML while in-flight executions exist, you need either version pinning (each execution locks to the version it started with) or backward-compatible changes only. We learned this the hard way when a stage rename broke 200+ running pipelines.
The decision stage’s JEXL evaluation should be sandboxed. In theory, JEXL expressions are safe. In practice, complex expressions with deep property access on unvalidated context can cause unexpected behaviour. We’d add an expression complexity limit and a whitelist of allowed context paths.
Start with observability, not add it later. We bolted ELK publishing onto the orchestrator in Phase 3. It would have been far cleaner if every state transition emitted events from day one. Retrofitting observability into an existing system means touching every code path that changes state.
Conclusion
Model-Based Systems Engineering isn’t just for aerospace and automotive. The core principle — make the model the authority, not the code — applies directly to software pipeline orchestration. Our YAML-defined DAGs are validated, versioned, and executed by a generic engine that neither knows nor cares about the domain specifics. The model carries the intent; the engine provides the mechanics.
If you’re building pipeline orchestration for a distributed system with more than a handful of stages, consider investing in a declarative model-first approach. The upfront cost of building the engine pays for itself the first time you need to modify a workflow at 2 AM and all it takes is a YAML change.