Skip to content

Executor

The WorkflowExecutor is the IO shell around the pure router: load → dispatch → save. It takes a router and a store, runs your middleware pipeline, and handles concurrency.

Store Interface

The executor delegates persistence to the StoreAdapter interface:

MethodResponsibility
load(id)Load a workflow snapshot by ID
save(options)Persist a snapshot with optimistic concurrency

save() takes an expectedVersion for optimistic concurrency control — throw ConcurrencyConflictError if the stored version doesn't match.

ts
const pgStore: StoreAdapter = {
	async load(id: string): Promise<StoredWorkflow | null> {
		// SELECT snapshot, version FROM workflows WHERE id = $1
		throw new Error(`Not implemented: load(${id})`);
	},
	async save(options: SaveOptions): Promise<void> {
		// UPDATE workflows SET snapshot = $2, version = version + 1
		//   WHERE id = $1 AND version = $3
		// Throw ConcurrencyConflictError if rowCount === 0
		throw new Error(`Not implemented: save(${options.id})`);
	},
};

Memory Store

For testing and prototyping, use the built-in memory store:

ts
const store = memoryStore();

Creating an Executor

Pass a router and a store to the constructor:

ts
const executor = new WorkflowExecutor(taskRouter, store);

Executing Commands

executor.execute() loads the workflow, runs the middleware pipeline, dispatches the command, saves the result, and returns:

ts
(async () => {
	const result = await executor.execute("task-1", {
		type: "Start",
		payload: { assignee: "alice" },
	});

	if (result.ok) {
		console.log(result.snapshot); // WorkflowSnapshot with state "InProgress"
		console.log(result.events); // [{ type: "TaskStarted", ... }]
		console.log(result.version); // 2
	} else {
		console.log(result.error);
	}
})();

Optimistic Locking

Pass expectedVersion to reject stale writes early:

ts
(async () => {
	const result = await executor.execute(
		"task-1",
		{ type: "Start", payload: { assignee: "alice" } },
		{ expectedVersion: 1 },
	);

	if (!result.ok && result.error.category === "conflict") {
		console.log("Stale version — reload and retry");
	}
})();

Middleware

Middleware runs after the workflow is loaded but before the save. Use it for auth, logging, rate limiting, or any cross-cutting concern that needs access to the stored workflow:

ts
const authMiddleware: ExecutorMiddleware = async (ctx, next) => {
	// Middleware sees the loaded workflow — check permissions
	const ownerField = (ctx.stored.snapshot.data as { owner?: string }).owner;
	if (ownerField !== "current-user") {
		ctx.result = {
			ok: false as const,
			error: { category: "not_found" as const, id: ctx.id },
		};
		return; // short-circuit — don't call next()
	}
	await next();
};

executor.use(authMiddleware);

Middleware executes in Koa-style onion order — the first middleware added wraps the rest.

Error Handling

Dispatch errors (domain, validation, router) are returned inside ExecutionResult, never thrown. Store adapters throw ConcurrencyConflictError for optimistic locking failures at the database level:

ts
(async () => {
	try {
		await pgStore.save({
			id: "task-1",
			snapshot: {} as Parameters<typeof pgStore.save>[0]["snapshot"],
			expectedVersion: 1,
		});
	} catch (err) {
		if (err instanceof ConcurrencyConflictError) {
			console.log("Conflict:", err.workflowId, err.expectedVersion, err.actualVersion);
		}
	}
})();