Skip to content

Integrations

Ryte is pure logic -- it has no opinion about where commands come from or where workflows are stored. Integrating with any runtime follows the same pattern.

The Pattern

Every integration is five steps:

ts
(async () => {
	const definition = taskWorkflow;
	const router = new WorkflowRouter(taskWorkflow);

	// 1. Receive a command (HTTP request, Kafka message, etc.)
	const { workflowId, command } = parseInput(request);

	// 2. Load the workflow from storage
	const snapshot = await store.get(workflowId);
	const restored = definition.deserialize(snapshot as Parameters<typeof definition.deserialize>[0]);
	if (!restored.ok) throw new Error("Invalid workflow data");

	// 3. Dispatch the command
	const result = await router.dispatch(restored.workflow, command);

	// 4. Persist the updated workflow
	if (result.ok) {
		await store.set(workflowId, definition.serialize(result.workflow));
	}

	// 5. Publish events
	if (result.ok) {
		for (const event of result.events) {
			console.log(`Event: ${event.type}`, event.data);
		}
	}
})();

The serialize() and deserialize() methods handle serialization -- dates become ISO strings, data is validated against Zod schemas on deserialize. You can store snapshots in any JSON-compatible database.

Express

ts
import express from "express";
import { defineWorkflow, WorkflowRouter, type WorkflowSnapshot } from "@rytejs/core";
import { z } from "zod";

const taskWorkflow = defineWorkflow("task", { /* ... */ });

const router = new WorkflowRouter(taskWorkflow)
  .state("Todo", ({ on }) => { /* ... */ })
  .state("InProgress", ({ on }) => { /* ... */ });

const store = new Map<string, WorkflowSnapshot>();
const app = express();
app.use(express.json());

// Create a workflow
app.post("/workflows", (req, res) => {
  const { id, title } = req.body;
  const workflow = taskWorkflow.createWorkflow(id, {
    initialState: "Todo",
    data: { title },
  });
  store.set(id, taskWorkflow.serialize(workflow));
  res.json(workflow);
});

// Dispatch a command
app.post("/workflows/:id/dispatch", async (req, res) => {
  const snapshot = store.get(req.params.id);
  if (!snapshot) return res.status(404).json({ error: "Not found" });

  const restored = taskWorkflow.deserialize(snapshot);
  if (!restored.ok) return res.status(500).json({ error: "Invalid data" });

  const result = await router.dispatch(restored.workflow, req.body);

  if (result.ok) {
    store.set(req.params.id, taskWorkflow.serialize(result.workflow));
    res.json({ workflow: result.workflow, events: result.events });
  } else {
    res.status(400).json({ error: result.error });
  }
});

// Get workflow state
app.get("/workflows/:id", (req, res) => {
  const snapshot = store.get(req.params.id);
  if (!snapshot) return res.status(404).json({ error: "Not found" });

  const restored = taskWorkflow.deserialize(snapshot);
  if (!restored.ok) return res.status(500).json({ error: "Invalid data" });

  res.json(restored.workflow);
});

app.listen(3000);

See the full example: examples/express

Hono

ts
import { Hono } from "hono";
import { serve } from "@hono/node-server";
import { defineWorkflow, WorkflowRouter, type WorkflowSnapshot } from "@rytejs/core";
import { z } from "zod";

const taskWorkflow = defineWorkflow("task", { /* ... */ });
const router = new WorkflowRouter(taskWorkflow)
  .state("Todo", ({ on }) => { /* ... */ });

const store = new Map<string, WorkflowSnapshot>();
const app = new Hono();

app.post("/workflows", async (c) => {
  const { id, title } = await c.req.json();
  const workflow = taskWorkflow.createWorkflow(id, {
    initialState: "Todo",
    data: { title },
  });
  store.set(id, taskWorkflow.serialize(workflow));
  return c.json(workflow, 201);
});

app.post("/workflows/:id/dispatch", async (c) => {
  const snapshot = store.get(c.req.param("id"));
  if (!snapshot) return c.json({ error: "Not found" }, 404);

  const restored = taskWorkflow.deserialize(snapshot);
  if (!restored.ok) return c.json({ error: "Invalid data" }, 500);

  const result = await router.dispatch(restored.workflow, await c.req.json());

  if (result.ok) {
    store.set(c.req.param("id"), taskWorkflow.serialize(result.workflow));
    return c.json({ workflow: result.workflow, events: result.events });
  }
  return c.json({ error: result.error }, 400);
});

app.get("/workflows/:id", (c) => {
  const snapshot = store.get(c.req.param("id"));
  if (!snapshot) return c.json({ error: "Not found" }, 404);

  const restored = taskWorkflow.deserialize(snapshot);
  if (!restored.ok) return c.json({ error: "Invalid data" }, 500);

  return c.json(restored.workflow);
});

serve({ fetch: app.fetch, port: 3000 });

See the full example: examples/hono

Kafka Consumer

ts
import { Kafka } from "kafkajs";
import { defineWorkflow, WorkflowRouter, type WorkflowSnapshot } from "@rytejs/core";
import { z } from "zod";

const taskWorkflow = defineWorkflow("task", { /* ... */ });
const router = new WorkflowRouter(taskWorkflow)
  .state("Todo", ({ on }) => { /* ... */ });

const store = new Map<string, WorkflowSnapshot>();

const kafka = new Kafka({ brokers: ["localhost:9092"] });
const consumer = kafka.consumer({ groupId: "workflow-processor" });

await consumer.connect();
await consumer.subscribe({ topic: "workflow-commands" });

await consumer.run({
  eachMessage: async ({ message }) => {
    const { workflowId, command } = JSON.parse(message.value!.toString());

    const snapshot = store.get(workflowId);
    if (!snapshot) {
      console.error(`Workflow ${workflowId} not found`);
      return;
    }

    const restored = taskWorkflow.deserialize(snapshot);
    if (!restored.ok) {
      console.error(`Invalid workflow data for ${workflowId}`);
      return;
    }

    const result = await router.dispatch(restored.workflow, command);

    if (result.ok) {
      store.set(workflowId, taskWorkflow.serialize(result.workflow));
      for (const event of result.events) {
        console.log(`Event: ${event.type}`, event.data);
      }
    } else {
      console.error(`Dispatch failed: ${result.error.category}`);
    }
  },
});

See the full example: examples/kafka

Hooks for Observability

Use hooks to add logging, metrics, or event publishing without touching handlers:

ts
const hooksRouter = new WorkflowRouter(taskWorkflow)
	.on("transition", (from, to, workflow) => {
		console.log(`[${workflow.id}] ${from} → ${to}`);
	})
	.on("event", (event, workflow) => {
		console.log(`[${workflow.id}] Event: ${event.type}`);
	})
	.on("error", (error) => {
		console.error(`Error: ${error.category}`, error);
	})
	.state("Todo", ({ on: _on }) => {
		/* ... */
	});

Choosing a Storage Backend

Snapshots are plain JSON objects. Store them anywhere:

BackendNotes
PostgreSQLStore as JSONB column, index on id and state
MongoDBStore as document, natural fit
RedisStore as JSON string, good for high-throughput
DynamoDBStore as item, partition key on id
File systemJSON.stringify / JSON.parse, good for prototyping

The deserialize() method validates data against Zod schemas on load, so you always get a valid workflow regardless of what's in storage.