Concurrency
kassette’s step ID scheme is order-dependent. At the core level, each record(name, fn) call gets a step ID from a per-name counter. In the workflow API, ctx.step(name, fn) delegates to record().
If concurrent branches call record() / step() with the same name on the same run context, the assigned step IDs depend on which branch reaches the call first. If that order changes between the original run and a replay, memoized results can be returned to the wrong branch. For same-name collisions, no error is thrown because the journaled name still matches the current call’s name.
This document explains when concurrency is safe, when it isn’t, and the protections that exist in the implementation.
How step IDs work
Each call to record(name, fn) generates a step ID using a counter:
- First call with name
llm→ step IDllm - Second call with name
llm→ step IDllm#2 - Third call →
llm#3
On replay, the same sequence of record() calls regenerates the same step IDs, which map to the correct memoized results. The counter is deterministic so this will work so long as the calls happen in the same order.
Step names must not contain #; it is reserved as the occurrence-counter separator.
When concurrency is safe
Sequential code is always safe. The common agent pattern — a loop that calls record() or step() one at a time — is deterministic by construction:
while (!done) {
const response = await session.record('llm', () => llm.chat(messages));
const toolResult = await session.record('tool', () => executeTool(response));
// ...
}
Each await completes before the next record() call. The counter always advances the same way.
Concurrent calls with distinct names will also be safe. If every concurrent branch uses a different name then no two branches can produce the same step ID:
// Safe: distinct names → distinct step IDs
await Promise.all([
session.record('tool:search', () => search(query)),
session.record('tool:weather', () => getWeather(city)),
]);
tool:search and tool:weather are different counter keys.
When concurrency is dangerous
We get into trouble when concurrent calls have the same name. This is where step ID collision can cause silent corruption on replay since the order that the processItem calls are executed may differ when rerun:
// Dangerous: same name, concurrent execution
const branches = items.map((item) => session.record('process', () => processItem(item)));
await Promise.allSettled(branches);
Nested concurrent calls with the same name also pose a risk for the same reason:
const branchA = async () => {
const data = await session.record('fetch', () => fetchData('a'));
return session.record('process', () => process(data));
};
const branchB = async () => {
const data = await session.record('fetch', () => fetchData('b'));
return session.record('process', () => process(data));
};
// 'fetch' and 'process' calls share counters across branches.
// Step ID assignment depends on execution order.
await Promise.all([branchA(), branchB()]);
Making concurrency safe
1. ctx.parallel() — branch-key step name prefixing
The workflow API provides ctx.parallel(). Each branch receives a child Context backed by a run wrapper that simply prefixes record() names.
const results = await ctx.parallel({
a: async (ctx) => ctx.step('fetch', () => fetchData('a')), // step ID: a:fetch
b: async (ctx) => ctx.step('fetch', () => fetchData('b')), // step ID: b:fetch
});
For the core Run API, use explicit branch-specific names yourself:
await Promise.all([session.record('a:fetch', () => fetchData('a')), session.record('b:fetch', () => fetchData('b'))]);
2. Replay mismatch detection
Every step entry in the journal records both the step ID and the raw name passed to record(). On replay, when a memo hit occurs, the stored name is compared against the current call’s name. If they don’t match, a ReplayMismatchError is thrown instead of returning the memoized result. While this won’t catch same-name concurrency issues we discussed above, it can act as a defensive integrity check against code modifications.
You can also use the version option to catch code changes between sessions which will throw a VersionMismatchError on start() if the version doesn’t match:
const session = await start(storage, runId, { version: 'v2' });
3. Microtask yield on replay
One thing you should be aware of is that on first run, await fn() inside a record() will yield to the event loop, giving concurrent branches a chance to interleave. On replay, memo hits would otherwise return synchronously. Replay will insert await Promise.resolve() after each memo hit in order to yield to the microtask queue, giving other branches a chance to start before the replayed call resolves.
This is another defensive measure, not a correctness guarantee.
Recommendations
For workflow branches with the same step names use ctx.parallel() so branch keys prefix step names:
const results = await ctx.parallel({
a: async (ctx) => ctx.step('fetch', () => fetchData('a')),
b: async (ctx) => ctx.step('fetch', () => fetchData('b')),
});
Or for dynamic branches you can derive a unique branch key from a stable item identity:
await ctx.parallel(
Object.fromEntries(items.map((item) => [item.id, async (ctx) => ctx.step('process', () => processItem(item))])),
);
For core Run / record() usage make sure to manually namespace names with a stable branch identity:
await Promise.allSettled(items.map((item) => session.record(`process:${item.id}`, () => processItem(item))));