Reference
Kassette is published as four packages. @usekassette/core provides the journal API (start, resume, fork, Run.record) and storage backends. @usekassette/kassette provides the workflow wrapper (kassette(), Context, ctx.step, ctx.suspend, ctx.parallel, ctx.sleep). @usekassette/s3 provides an S3-compatible ObjectStoreClient for RemoteStorage. @usekassette/cli provides the kassette command for inspecting and forking journals. CLI usage and flags are documented in Operations.
Journal file layout
Each runId has one append-only journal. The journal is newline-delimited JSON: one JSON entry per line. LocalStorage writes {dir}/{runId}.jsonl. RemoteStorage writes {runId}/journal.jsonl, or {prefix}/{runId}/journal.jsonl when a prefix is set.
Values stored in the journal must be JSON-compatible. metadata, step result, and resume value are written with JSON.stringify and read with JSON.parse. Cycles and BigInt throw. Dates become strings. undefined object fields are omitted.
Entry envelope
interface BaseEntry {
readonly session: number;
readonly timestamp: string; // ISO 8601
}
type Entry = StartEntry | StepEntry | SuspendEntry | ResumeEntry | CompleteEntry | ErrorEntry | CancelEntry;
type JournalEntry = Entry & { readonly offset: number };
session is the session number of the writer that appended the entry. It increases across the run. timestamp is when the entry was appended. offset is the position assigned by storage; it starts at 0 and increases by 1 for each entry. Entries returned by readAll include offset. Entries passed to append do not, because storage assigns the offset.
Entry types
start
Marks the beginning of a session.
interface StartEntry extends BaseEntry {
readonly type: 'start';
readonly version?: string;
readonly source?: { readonly runId: string; readonly fromOffset: number };
readonly metadata?: unknown;
}
version is written only when the caller supplies one. Version checks compare the caller’s version to the first start entry that has a version. source is written on the session that continues a forked run, not on the initial start entry that creates the fork journal. metadata is written on the first start entry of a new run. For a fork, inherited metadata is written on that initial start entry.
step
Records the result of one record() call.
interface StepEntry extends BaseEntry {
readonly type: 'step';
readonly stepId: string;
readonly name: string;
readonly result: unknown;
}
stepId is the unique stored identifier (llm, llm#2, llm#3). name is the caller-supplied name before repeat numbering. Step names must not contain #; that character is reserved as the repeat separator. Both fields are stored so replay can check that the current call is asking for the same step result that was recorded earlier. A mismatch throws ReplayMismatchError. See Concurrency for how parallel branch names are prefixed.
suspend
Records that the session is waiting for an external event.
interface SuspendEntry extends BaseEntry {
readonly type: 'suspend';
readonly reason: string;
readonly waitingFor: string;
readonly timeout?: string;
}
waitingFor is the event name. reason is a human-readable note (defaults to Waiting for event: ${eventName}). timeout is an ISO 8601 deadline. Kassette does not wake itself up when the deadline passes. The next start() or resume() checks the deadline; if it has expired, Kassette writes cancel and throws CancelledError.
resume
Carries an event payload into the run.
interface ResumeEntry extends BaseEntry {
readonly type: 'resume';
readonly eventName: string;
readonly value: unknown;
}
Written by resume() after it opens a new session and storage has rejected stale writers. If a matching resume entry already exists, Kassette does not write a duplicate. The value already in the journal wins (Journal Authority). On replay, the matching waitForEvent call returns the stored value instead of suspending.
complete
Terminal. The workflow returned successfully.
interface CompleteEntry extends BaseEntry {
readonly type: 'complete';
}
After this entry is written, start() and resume() on the same run throw TerminalRunError. Forking from a complete source run is allowed because the fork gets its own journal. Use a new target runId for forks.
error
Terminal. The workflow threw an error that was not SuspendError.
interface ErrorEntry extends BaseEntry {
readonly type: 'error';
readonly name?: string;
readonly message: string;
readonly stack?: string;
}
Written by Run.fail(error). After this entry is written, start() and resume() on the same run throw TerminalRunError. Kassette stores the thrown error as name, message, and stack. There is no machine-readable error code.
cancel
Terminal. The run was cancelled before completing.
interface CancelEntry extends BaseEntry {
readonly type: 'cancel';
readonly reason?: string;
}
Today, only suspend-timeout auto-cancel writes this entry. When start() or resume() sees an expired suspend.timeout, it writes a fresh start and a cancel with reason: 'suspend_timeout_expired', then throws CancelledError. There is no public cancel API.
@usekassette/core
Storage interface
interface Storage {
append(runId: string, entry: Entry): Promise<number>;
readAll(runId: string): Promise<JournalEntry[]>;
list(): Promise<string[]>;
}
The Storage interface has three methods. append must write an entry atomically: the entry is fully written or not written at all. It assigns an increasing offset and rejects writes from older sessions with FencedError. readAll returns every entry in append order. list returns every runId present in storage. Core includes two storage backends. Third-party code can implement this interface for any backend that can append safely and reject stale writers.
new LocalStorage(dir: string)
new RemoteStorage(client: ObjectStoreClient, options?: { prefix?: string })
LocalStorage writes one JSONL file per run under dir. It uses a lockfile held for the whole session to reject stale writers. RemoteStorage writes one object per run at {runId}/journal.jsonl or {prefix}/{runId}/journal.jsonl. It rejects stale writers with conditional object writes: If-Match for updates and create-if-absent for first writes. It retries PreconditionFailedError 5 times. @usekassette/s3 provides the S3-compatible adapter.
Run interface
interface Run {
readonly runId: string;
readonly metadata: unknown;
record<T>(name: string, fn: () => Promise<T>, options?: RecordOptions<T>): Promise<T>;
waitForEvent<T>(eventName: string, options?: WaitForEventOptions): Promise<T>;
complete(): Promise<void>;
fail(error: Error): Promise<void>;
}
Returned by start, resume, and fork. A Run represents the active session. metadata is the value passed to the first start() for the run and replayed on every later session.
record
record<T>(name: string, fn: () => Promise<T>, options?: RecordOptions<T>): Promise<T>
On first execution, fn runs and the result is written as a step entry. On replay, Kassette returns the journaled result and does not run fn. name is the caller-supplied identifier and must not contain #. The stored stepId is name for the first call, then name#2, name#3, etc. for repeats. Throws ReplayMismatchError if the stored step name does not match the current call’s name.
waitForEvent
waitForEvent<T>(eventName: string, options?: WaitForEventOptions): Promise<T>
If a resume entry for eventName already exists, returns its value. Otherwise writes a suspend entry, marks the session suspended, and throws SuspendError. Do not swallow SuspendError; let it escape so the process can stop or the wrapper can return suspended. Re-using an event name within the same run throws UsageError. Use unique names, such as approval:${itemId}, for repeated waits.
complete / fail
complete(): Promise<void>
fail(error: Error): Promise<void>
Write a terminal entry and close the session. Later record or waitForEvent calls throw SessionClosedError. Calling complete or fail after the session has suspended throws SuspendedError. kassette() calls these for you. Direct callers of start, resume, or fork must call one of them explicitly.
RecordOptions
interface RecordOptions<T> {
onReplay?: (result: T) => void;
}
onReplay runs synchronously when replay returns a stored result. Streaming integrations use it to send stored events to the consumer when the step is replayed instead of recomputed. See examples/vercel-ai-sdk for the streaming pattern.
WaitForEventOptions
interface WaitForEventOptions {
timeout?: string;
reason?: string;
}
timeout is an ISO 8601 absolute deadline written into the suspend entry. reason is a human-readable note. Both are optional. Kassette does not wake itself up when the deadline passes. The next start() or resume() enforces the deadline.
Lifecycle functions
start
function start(storage: Storage, runId: string, options?: StartRunOptions): Promise<Run>;
interface StartRunOptions extends RunOptions {
metadata?: unknown;
}
interface RunOptions {
version?: string | undefined;
}
Opens a new session for runId. On a new run, writes the first start entry and stores metadata. On recovery, replays the existing journal, checks version against the first start entry that has a version, and checks metadata only when the caller provides it. If metadata is omitted, Kassette uses the value already in the journal. Throws TerminalRunError if the run is terminal, EventPendingError if the run is suspended (use resume), and MetadataMismatchError or VersionMismatchError on validation failures.
resume
function resume(storage: Storage, runId: string, eventName: string, value: unknown, options?: RunOptions): Promise<Run>;
Delivers an event to a suspended run and opens the next session. Writes a new start entry, then a resume entry carrying value. If a resume entry for eventName already exists, the value in the journal wins, the new value is ignored, and no duplicate resume entry is written. This makes retrying the same resume call safe after a crash. Throws UsageError if the run is not suspended, or if it is suspended on a different event name.
fork
function fork(
storage: Storage,
runId: string,
source: { runId: string } & ({ fromOffset: number } | { fromStepId: string }),
options?: RunOptions,
): Promise<Run>;
Branches a new run from the beginning of an existing run. The cut point is fromOffset, or the offset of the first step entry matching fromStepId. Entries with source offsets lower than the cut point are copied. The cut point itself is not copied. Only step and resume entries are copied; start, suspend, and terminal entries are not copied.
The target first gets an initial start entry, with inherited metadata when present. A later session start records source: { runId, fromOffset }. version comes from the new caller.
The copy is not atomic. A crash during the copy leaves a partial target journal with no terminal entry. Reopening that target run treats it as crashed work and replays or re-executes from the first missing step. Use a new target runId. Throws UsageError if fromStepId does not match any step.
Validation order
start and resume share this order:
- Read the target journal with
storage.readAll(runId). - If the run is terminal (
complete,error, orcancel), throwTerminalRunError. - Rebuild replay state and check
version. If the caller provided a version and it differs from the first journaled version, throwVersionMismatchError. - If the run is suspended with an expired
timeout, write a freshstartpluscancel, then throwCancelledError. start: if the run is still suspended, throwEventPendingError; otherwise check provided metadata against journaled metadata and write a freshstart.resume: require either a matching unresolved suspend or an existing matchingresumeentry from an earlier retry. Write a freshstart; writeresumeonly if it was not already journaled.
fork follows a different path. It reads the source journal, resolves the cut point, writes an initial start to the target, copies eligible step and resume entries, then reads the target and opens a session with the normal terminal and version checks. It does not auto-cancel expired suspends in the source run.
Errors
All errors extend KassetteError. One instanceof KassetteError catches every kassette throw.
| Error | When | Notable fields |
|---|---|---|
UsageError | Caller misuse (bad input, wrong call order, misconfiguration). | |
TerminalRunError (UsageError) | start/resume on a terminal run. | terminalState |
MetadataMismatchError (UsageError) | Recovery start with provided metadata that differs from journaled metadata. | storedMetadata, providedMetadata |
EventPendingError (UsageError) | start on a suspended run with an unresolved event. Use resume. | waitingFor |
SuspendError | Control-flow error thrown by waitForEvent when the workflow suspends. | eventName |
SuspendedError | Session method called after suspension, usually because a catch swallowed SuspendError. | |
SessionClosedError | Session method called after complete() or fail(). | |
VersionMismatchError | options.version disagrees with the first journaled version. | storedVersion, currentVersion |
CancelledError | Suspend-timeout auto-cancel. | reason |
ReplayMismatchError | Replay found a stored step name that does not match the current call. | stepId, expectedName, actualName |
FencedError | Storage rejected an append from an older session. | rejectedSession, activeSession |
WriteContentionError | Backend cannot get exclusive write access (lockfile held, conditional-write retries failed). | |
PreconditionFailedError | ObjectStoreClient conditional put failed; RemoteStorage catches this during retries. | |
JournalCorruptionError | Journal data cannot be parsed or fails structural validation. | line |
InternalError | Library invariant violated. File a bug. |
Every instance carries runId (when known) on the base class.
Helpers
createRunId
function createRunId(): string;
Returns a fresh UUID via crypto.randomUUID(). Callers can create run IDs another way if they prefer. Every API takes runId as a string.
isSuspendError
function isSuspendError(err: unknown): err is SuspendError;
Cross-package-safe SuspendError check. Prefer this over instanceof when bundled code may contain more than one copy of @usekassette/core.
isTerminal
function isTerminal(entry: Entry): entry is CompleteEntry | ErrorEntry | CancelEntry;
Type guard. Returns true for complete, error, or cancel entries. Useful when walking a journal looking for the closing entry.
getMetadata
function getMetadata<T = unknown>(entries: readonly JournalEntry[]): T | undefined;
Pulls the caller-supplied metadata from the first start entry of a journal. Returns undefined if the journal is empty or no metadata was passed at start time.
runStatus / RunStatus
function runStatus(entries: readonly JournalEntry[]): RunStatus;
type RunStatus =
| { status: 'completed' }
| { status: 'failed'; message: string; name?: string; stack?: string }
| { status: 'cancelled'; reason?: string }
| { status: 'suspended'; waitingFor: string; timeout?: string }
| { status: 'unsettled' };
Stateless helper over JournalEntry[]. It is the shared answer to “what is this run doing right now?” unsettled covers a live session in another process, a crashed-and-abandoned session, and an empty journal. The journal alone cannot distinguish those cases, so live-process detection needs a separate signal.
ObjectStoreClient / GetObjectResult / PreconditionFailedError
interface ObjectStoreClient {
getObject(key: string): Promise<GetObjectResult | null>;
putObject(key: string, content: string, etag: string | undefined): Promise<string>;
listPrefixes(prefix: string): Promise<string[]>;
}
interface GetObjectResult {
readonly content: string;
readonly etag: string;
}
class PreconditionFailedError extends KassetteError {}
function isPreconditionFailedError(err: unknown): err is PreconditionFailedError;
The extension point for RemoteStorage. Implementations wrap a specific object-store SDK and translate its “write only if the object version matches” feature into conditional writes. putObject with etag: string updates only if the current object still has that ETag. putObject with etag: undefined creates the object only if it does not already exist. Throw PreconditionFailedError on HTTP 412. listPrefixes(prefix) must return run IDs with the parent prefix removed and no trailing slash. isPreconditionFailedError is the cross-package-safe check used by RemoteStorage. @usekassette/s3 provides the S3 adapter.
@usekassette/s3
S3ObjectStoreClient
class S3ObjectStoreClient implements ObjectStoreClient {
constructor(options: S3ObjectStoreClientOptions);
}
interface S3ObjectStoreClientOptions {
readonly bucket: string;
readonly client?: S3Client; // pre-configured @aws-sdk/client-s3 instance
readonly clientConfig?: S3ClientConfig; // ignored if `client` is provided
}
Implements ObjectStoreClient using the AWS SDK v3 (@aws-sdk/client-s3). Pass it to new RemoteStorage(client, { prefix }) to journal runs into an S3-compatible bucket. It works with AWS S3, Cloudflare R2, DigitalOcean Spaces, GCS interop mode, Tigris, and MinIO. The backend must provide read-after-write consistency and conditional writes (If-Match / If-None-Match). Eventually consistent or CDN-fronted stores can let stale writers overwrite newer sessions.
Maps the three ObjectStoreClient operations to GetObjectCommand, PutObjectCommand, and ListObjectsV2Command. putObject sends If-Match: <etag> for updates and If-None-Match: * for first writes. HTTP 412 from either path is detected via $metadata.httpStatusCode, with an error.name === 'PreconditionFailed' fallback for non-AWS providers, and re-thrown as PreconditionFailedError for RemoteStorage’s conditional-write retry loop. getObject returns null on NoSuchKey. @aws-sdk/client-s3 is a peer dependency.
@usekassette/kassette
kassette()
function kassette<TInput, TOutput, TEvents extends Record<string, unknown> = Record<never, never>>(
fn: (ctx: Context<TEvents>, input: TInput) => Promise<TOutput>,
options: KassetteOptions<TOutput>,
): Kassette<TInput, TOutput, TEvents>;
Wraps a workflow function and returns an object with start, resume, and fork. The wrapper opens the session, runs fn, calls complete() on success, calls fail() on non-suspend errors, and returns suspended when it sees SuspendError. TEvents maps event names to payload types for ctx.suspend and Kassette.resume type checking.
KassetteOptions
interface KassetteOptions<TOutput> {
storage: Storage;
version?: string | undefined;
onFinish?: (result: RunResult<TOutput>) => void | Promise<void>;
onError?: (info: { runId: string; error: Error }) => void | Promise<void>;
}
storage is the journal backend. version is recorded when supplied and checked on later sessions. onFinish runs for settled wrapper results (success, suspended, or failed) from start, resume, or fork. onError runs only for failed. Both hooks are for logging or metrics. Hook errors are caught and logged, not passed back to the caller.
Core lifecycle errors thrown before the workflow function runs (TerminalRunError, VersionMismatchError, CancelledError, etc.) are passed to the caller as thrown errors. They are not converted into RunResult, and hooks are not called.
Context
Context<TEvents> is the API available inside the workflow function. Each method forwards to the core run or a small helper.
ctx.runId
ctx.runId: string
The current run’s identifier. Stable across sessions of the same run.
ctx.input
ctx.input: unknown
The input passed to the original start(). On replay, returns the value stored as metadata on the first start entry. The type is unknown because the journal stores it without TypeScript type information. The same value also arrives as the typed second argument to the workflow function, which is usually what you want.
ctx.step
ctx.step<T>(name: string, fn: () => Promise<T>, opts?: StepOptions<T>): Promise<T>
Thin wrapper over Run.record. On first execution, fn runs and the result is written to the journal. On replay, Kassette returns the journaled result and does not run fn. Names containing # throw UsageError.
StepOptions.retry wraps fn in a retry loop inside one record() call. All attempts run in memory before any journal write. A successful return is written to the journal exactly once. If every attempt fails, the last error throws out of ctx.step and no step entry is written. The workflow can catch it like any other exception.
ctx.suspend
ctx.suspend<K extends keyof TEvents & string>(eventName: K, opts?: WaitForEventOptions): Promise<TEvents[K]>
Calls Run.waitForEvent with payload typing from TEvents. If a resume entry for eventName already exists, returns the journaled payload. Otherwise writes a suspend entry and throws SuspendError. Let SuspendError escape. The wrapper turns it into RunResult with status: 'suspended'.
ctx.sleep
ctx.sleep(ms: number): Promise<void>
Crash-aware in-process delay. Writes an absolute wake deadline as a step (delay:${ms}ms) and waits the remaining time. On replay after a crash, it waits only the time that was still left. Use this only for short waits. If the wait could outlive the process, use ctx.suspend with a timeout and an external timer.
ctx.parallel
ctx.parallel<T extends Record<string, (ctx: Context<TEvents>) => Promise<unknown>>>(
branches: T,
): Promise<{ [K in keyof T]: Awaited<ReturnType<T[K]>> }>
Runs named branches concurrently with Promise.allSettled. Each branch receives a Context whose step calls are prefixed with the branch key (${key}:${name}), so step IDs from different branches do not collide. If any branch throws SuspendError, the whole call throws SuspendError and the parallel block suspends. If any branch throws a non-suspend error and no branch suspended, the first error throws. Event names are not prefixed because they are part of the external resume API.
StepOptions
interface StepOptions<T> {
retry?: RetryConfig;
onReplay?: (result: T) => void;
}
retry activates the in-memory retry loop. onReplay mirrors RecordOptions.onReplay for streaming integrations.
RetryConfig
interface RetryConfig {
maxAttempts: number;
delay?: number; // initial delay in ms (default 1000)
backoffRate?: number; // multiplier per attempt (default 1)
maxDelay?: number; // cap on delay (default Infinity)
}
maxAttempts is the maximum number of tries. Wait time between attempts starts at delay, multiplies by backoffRate each round, and is capped at maxDelay. All retries run in-process. No journal entry is written until success.
Kassette object
kassette() returns an object with three methods. Each opens a session through the core lifecycle and runs the workflow function to completion, suspension, or failure.
workflow.start
start(
input: Record<string, never> extends TInput ? TInput | undefined : TInput,
options?: { runId?: string },
): Promise<RunResult<TOutput>>
Begins a run. Uses options.runId if provided; otherwise creates one with createRunId. input is stored as metadata on the first start entry. TypeScript allows undefined input when TInput permits Record<string, never>. At runtime, Kassette journals whatever value is passed. Returns once the wrapper has a settled result. Calling start again with the same runId after a crash recovers and replays.
workflow.resume
resume(
runId: string,
event: { [K in keyof TEvents & string]: { eventName: K; value: TEvents[K] } }[keyof TEvents & string],
): Promise<RunResult<TOutput>>
Delivers an event to a suspended run. The wrapper opens a new session through core resume, replays to the suspend point, and continues. Retrying after a crash is safe: repeat calls with a different value for the same event ignore the new value and use the journaled value.
workflow.fork
fork(
source: { runId: string } & ({ fromOffset: number } | { fromStepId: string }),
options?: { runId?: string },
): Promise<RunResult<TOutput>>
Branches a new run from the beginning of source.runId up to the cut point. The new runId is options.runId or a fresh ID. Replays the copied entries and continues live from the cut point. The source run is unchanged.
RunResult
type RunResult<TOutput> =
| { status: 'success'; result: TOutput; runId: string }
| { status: 'failed'; error: Error; runId: string }
| { status: 'suspended'; event: string; runId: string };
Returned by start, resume, and fork. success carries the workflow’s return value. failed carries the thrown error, which has already been written to the journal. suspended carries the event name the workflow is waiting on. The next call should be workflow.resume(runId, { eventName, value }). Switch on status to handle each case.
Re-exports from core
@usekassette/kassette re-exports the symbols workflow authors usually need, so most workflows do not need to import @usekassette/core directly.
LocalStorage,RemoteStorage,createRunId,runStatusPreconditionFailedError,isPreconditionFailedErrorKassetteError,UsageError,SuspendError,isSuspendError,TerminalRunError,EventPendingError,MetadataMismatchError,ReplayMismatchError,FencedError,WriteContentionError,VersionMismatchError,CancelledError,JournalCorruptionError- Types:
Storage,WaitForEventOptions,RunStatus,ObjectStoreClient,GetObjectResult, and all journal entry types
For lower-level work, import from @usekassette/core: start, resume, fork, the Run interface, isTerminal, getMetadata, InternalError, SessionClosedError, SuspendedError, and custom storage internals.