Executor
The WorkflowExecutor is the IO shell around the pure router: load → dispatch → save. It takes a router and a store, runs your middleware pipeline, and handles concurrency.
Store Interface
The executor delegates persistence to the StoreAdapter interface:
| Method | Responsibility |
|---|---|
load(id) | Load a workflow snapshot by ID |
save(options) | Persist a snapshot with optimistic concurrency |
save() takes an expectedVersion for optimistic concurrency control — throw ConcurrencyConflictError if the stored version doesn't match.
const pgStore: StoreAdapter = {
async load(id: string): Promise<StoredWorkflow | null> {
// SELECT snapshot, version FROM workflows WHERE id = $1
throw new Error(`Not implemented: load(${id})`);
},
async save(options: SaveOptions): Promise<void> {
// UPDATE workflows SET snapshot = $2, version = version + 1
// WHERE id = $1 AND version = $3
// Throw ConcurrencyConflictError if rowCount === 0
throw new Error(`Not implemented: save(${options.id})`);
},
};Memory Store
For testing and prototyping, use the built-in memory store:
const store = memoryStore();Creating an Executor
Pass a router and a store to the constructor:
const executor = new WorkflowExecutor(taskRouter, store);Executing Commands
executor.execute() loads the workflow, runs the middleware pipeline, dispatches the command, saves the result, and returns:
(async () => {
const result = await executor.execute("task-1", {
type: "Start",
payload: { assignee: "alice" },
});
if (result.ok) {
console.log(result.snapshot); // WorkflowSnapshot with state "InProgress"
console.log(result.events); // [{ type: "TaskStarted", ... }]
console.log(result.version); // 2
} else {
console.log(result.error);
}
})();Optimistic Locking
Pass expectedVersion to reject stale writes early:
(async () => {
const result = await executor.execute(
"task-1",
{ type: "Start", payload: { assignee: "alice" } },
{ expectedVersion: 1 },
);
if (!result.ok && result.error.category === "conflict") {
console.log("Stale version — reload and retry");
}
})();Middleware
Middleware runs after the workflow is loaded but before the save. Use it for auth, logging, rate limiting, or any cross-cutting concern that needs access to the stored workflow:
const authMiddleware: ExecutorMiddleware = async (ctx, next) => {
// Middleware sees the loaded workflow — check permissions
const ownerField = (ctx.stored.snapshot.data as { owner?: string }).owner;
if (ownerField !== "current-user") {
ctx.result = {
ok: false as const,
error: { category: "not_found" as const, id: ctx.id },
};
return; // short-circuit — don't call next()
}
await next();
};
executor.use(authMiddleware);Middleware executes in Koa-style onion order — the first middleware added wraps the rest.
Error Handling
Dispatch errors (domain, validation, router) are returned inside ExecutionResult, never thrown. Store adapters throw ConcurrencyConflictError for optimistic locking failures at the database level:
(async () => {
try {
await pgStore.save({
id: "task-1",
snapshot: {} as Parameters<typeof pgStore.save>[0]["snapshot"],
expectedVersion: 1,
});
} catch (err) {
if (err instanceof ConcurrencyConflictError) {
console.log("Conflict:", err.workflowId, err.expectedVersion, err.actualVersion);
}
}
})();