Skip to main content

Advanced reactor usage

This page covers the low-level IReactor interface and the internal components you can access through ReactorModule. Most developers should use IReactorClient (see IReactorClient API Reference) — the information here is for advanced scenarios such as:

  • Building custom tooling or infrastructure on top of the reactor
  • Working with consistency tokens for read-after-write guarantees
  • Subscribing directly to internal event bus events
  • Constructing a reactor with custom storage or executor configurations
  • Writing integration tests that need access to internals

IReactor vs IReactorClient

IReactorClient is a high-level wrapper around IReactor. The table below summarizes the key differences:

AspectIReactorIReactorClient
Write return valuesReturns JobInfo immediately (fire-and-forget)Waits for job completion, returns the final document
SigningCaller passes an ISigner explicitlyClient manages signing internally
Document lookupSeparate get(), getBySlug(), getByIdOrSlug() methodsSingle get(identifier) that accepts either
Children/parentsReturns string[] (document IDs only)Returns PagedResults<PHDocument> (full documents)
Convenience methodsBasic CRUDPlus: createEmpty(), createDocumentInDrive(), rename(), moveChildren(), deleteDocuments()
SubscriptionsNot available (use the event bus directly)subscribe(search, callback, view?) for real-time document changes
Consistency tokensExplicit — pass tokens to reads after writesHandled internally by the client

When to use IReactor directly:

  • You need fire-and-forget job submission without waiting for completion
  • You want explicit control over consistency tokens
  • You are building infrastructure that manages its own signing
  • You need access to executeBatch() for multi-document atomic operations with dependency ordering

Building a reactor with ReactorBuilder

ReactorBuilder uses a fluent API to construct and wire all internal components.

import {
ReactorBuilder,
ConsoleLogger,
ChannelScheme,
} from "@powerhousedao/reactor";

const reactor = await new ReactorBuilder()
.withDocumentModels([todoListModule, invoiceModule])
.withLogger(new ConsoleLogger())
.withExecutorConfig({ maxConcurrency: 4, jobTimeoutMs: 30_000 })
.withWriteCacheConfig({ maxDocuments: 100, ringBufferSize: 10 })
.withMigrationStrategy("auto")
.withChannelScheme(ChannelScheme.CONNECT)
.build();

build() returns an IReactor. If you need access to internal components, use buildModule() instead — it returns a ReactorModule containing the reactor plus all its internals (see ReactorModule).

Builder methods

MethodDescription
withDocumentModels(models)Register document model modules
withUpgradeManifests(manifests)Register upgrade manifests for document model versioning
withLogger(logger)Set the logger (defaults to ConsoleLogger)
withExecutorConfig(config)Configure maxConcurrency and jobTimeoutMs
withWriteCacheConfig(config)Configure maxDocuments and ringBufferSize for the write cache
withMigrationStrategy(strategy)Set to "auto" to run database migrations on build
withKysely(kysely)Provide a custom Kysely database instance (defaults to in-memory PGlite)
withQueue(queue)Provide a custom job queue (defaults to InMemoryQueue)
withEventBus(eventBus)Provide a custom event bus
withExecutor(executor)Provide a custom job executor manager
withReadModel(readModel)Register an additional read model
withSync(syncBuilder)Enable synchronization with remote reactors
withChannelScheme(scheme)Set the sync channel scheme
withFeatures(features)Set feature flags
withSignatureVerifier(verifier)Set a signature verification handler
withJwtHandler(handler)Set a JWT handler for authentication
withDocumentModelLoader(loader)Set a custom document model loader
withSignalHandlers()Register OS signal handlers for graceful shutdown

IReactor API

Reading documents

// By exact ID
const doc = await reactor.get<MyDocument>(docId);

// By slug
const doc = await reactor.getBySlug<MyDocument>("my-document");

// By either ID or slug (throws if ambiguous)
const doc = await reactor.getByIdOrSlug<MyDocument>(identifier);

// With consistency token for read-after-write
const doc = await reactor.get<MyDocument>(docId, undefined, token);

// Children and parents (returns string[] of IDs, not full documents)
const childIds = await reactor.getChildren(parentId);
const parentIds = await reactor.getParents(childId);

// Search
const results = await reactor.find({ type: "powerhouse/todo-list" });

// Operations
const ops = await reactor.getOperations(docId);

Writing documents

All write methods return JobInfo immediately — they do not wait for the job to complete.

// Create a document
const job = await reactor.create(document, signer);

// Execute actions
const job = await reactor.execute(docId, "main", actions);

// Load pre-existing operations (e.g., from sync)
const job = await reactor.load(docId, "main", operations);

// Delete
const job = await reactor.deleteDocument(docId, signer);

// Manage relationships
const job = await reactor.addChildren(parentId, [childId1, childId2]);
const job = await reactor.removeChildren(parentId, [childId1]);

Batch operations

executeBatch lets you submit multiple mutation jobs with dependency ordering. Jobs are executed in the order dictated by their dependsOn keys.

const result = await reactor.executeBatch({
jobs: [
{
key: "create-drive",
documentId: driveId,
scope: "global",
branch: "main",
actions: [createDriveAction],
dependsOn: [],
},
{
key: "add-document",
documentId: driveId,
scope: "global",
branch: "main",
actions: [addFileAction],
dependsOn: ["create-drive"], // Waits for drive creation
},
],
});

// result.jobs["create-drive"] and result.jobs["add-document"] are JobInfo objects

Job tracking and shutdown

// Check job status
const info = await reactor.getJobStatus(jobId);
// info.status is PENDING | RUNNING | WRITE_READY | READ_READY | FAILED

// Graceful shutdown
const shutdown = reactor.kill();
// shutdown.isShutdown is true immediately
await shutdown.completed; // Resolves when all in-flight jobs finish

Consistency tokens

Every write operation returns a JobInfo that includes a ConsistencyToken. This token captures the exact operation coordinates that the write produced:

type ConsistencyToken = {
version: 1;
createdAtUtcIso: string;
coordinates: Array<{
documentId: string;
scope: string;
branch: string;
operationIndex: number;
}>;
};

Pass this token to subsequent reads to guarantee you see the effects of your write:

const job = await reactor.execute(docId, "main", actions);
const token = job.consistencyToken;

// This read is guaranteed to include the operations from the write above
const doc = await reactor.get(docId, undefined, token);

Without a consistency token, reads may return stale data if the read models have not yet indexed the latest operations. IReactorClient handles this automatically — it waits for READ_READY before returning — but when using IReactor directly, consistency tokens give you explicit control.

ReactorModule

ReactorBuilder.buildModule() returns a ReactorModule that exposes all internal components:

const module = await new ReactorBuilder()
.withDocumentModels([todoListModule])
.buildModule();

const { reactor, eventBus, processorManager, operationStore } = module;

Available components

ComponentInterfacePurpose
reactorIReactorThe reactor instance
eventBusIEventBusInternal pub/sub for reactor events
queueIQueueJob queue with per-document ordering
jobTrackerIJobTrackerTracks job lifecycle (PENDING through READ_READY/FAILED)
executorManagerIJobExecutorManagerManages job executor instances
operationStoreIOperationStoreAppend-only operation log
keyframeStoreIKeyframeStoreDocument state snapshots
writeCacheIWriteCacheWrite-path document cache
operationIndexIOperationIndexGlobal ordinal assignment
documentViewIDocumentViewMaintains document snapshots for reads
documentIndexerIDocumentIndexerTracks document relationships (parent/child graph)
readModelCoordinatorIReadModelCoordinatorDispatches operations to all read models
subscriptionManagerIReactorSubscriptionManagerManages document change subscriptions
processorManagerIProcessorManagerRoutes operations to user-defined processors
databaseKysely<Database>The underlying database connection
syncModuleSyncModule | undefinedSync infrastructure (if configured)

Subscribing to the event bus

The IEventBus lets you listen to internal reactor events. Subscribers are called sequentially in registration order.

import { ReactorEventTypes, SyncEventTypes } from "@powerhousedao/reactor";

// Listen for all completed jobs
const unsubscribe = module.eventBus.subscribe(
ReactorEventTypes.JOB_READ_READY,
(type, event) => {
console.log("Job completed:", event.jobId);
console.log("Operations:", event.operations.length);
},
);

// Listen for sync failures
module.eventBus.subscribe(SyncEventTypes.SYNC_FAILED, (type, event) => {
console.error("Sync failed for job:", event.jobId, event.errors);
});

See Reactor event system for the full list of event types.

Working with the operation store

The IOperationStore provides direct access to the append-only operation log.

const { operationStore } = module;

// Get operations since a specific revision
const ops = await operationStore.getSince(docId, "global", "main", 5);

// Get the latest revision per scope
const revisions = await operationStore.getRevisions(docId, "main");
warning

Writing directly to the operation store bypasses the job queue, reducers, and read models. In almost all cases, use reactor.execute() or reactor.load() instead. Direct store access is intended for read-only inspection, debugging, and testing.

Registering custom read models

A read model receives operations after each job's write phase completes and builds a derived view of the data.

import type { IReadModel } from "@powerhousedao/reactor";

class MyCustomReadModel implements IReadModel {
async indexOperations(operations: OperationWithContext[]): Promise<void> {
for (const { operation, context } of operations) {
// Build your derived view
}
}
}

const reactor = await new ReactorBuilder()
.withDocumentModels([todoListModule])
.withReadModel(new MyCustomReadModel())
.build();

Read models registered via withReadModel() run in the pre-ready phase — they complete before JOB_READ_READY fires. This is in contrast to processors (registered via ProcessorManager), which run in the post-ready phase.

Example: integration test setup

A common use case for the low-level API is writing integration tests that need full control over the reactor lifecycle:

import {
ReactorBuilder,
ConsoleLogger,
ReactorClientBuilder,
} from "@powerhousedao/reactor";

async function createTestReactor() {
// Build with full access to internals
const module = await new ReactorBuilder()
.withDocumentModels([todoListModule])
.withLogger(new ConsoleLogger())
.buildModule();

// Optionally wrap with a client for convenience
const client = await new ReactorClientBuilder()
.withReactorModule(module)
.build();

return { module, client };
}

// In your test
const { module, client } = await createTestReactor();

// Use client for high-level operations
const doc = await client.createEmpty("powerhouse/todo-list");

// Use module for low-level inspection
const ops = await module.operationStore.getSince(
doc.header.id,
"global",
"main",
0,
);
expect(ops.results.length).toBe(1);

// Cleanup
module.reactor.kill();