agent-ecosystem/src/main/services/team/opencode/delivery/OpenCodePromptDeliveryLedger.ts
777genius 6e4f8ff8c4 fix: stabilize opencode mcp transport refresh
Keep OpenCode app MCP transport evidence durable, refresh stale sessions without consuming normal delivery attempts, and keep recoverable runtime diagnostics out of member card errors.

Cover stable MCP restart/fallback, forced session refresh, resolved_behavior_changed recovery, and renderer diagnostics with regression and safe e2e tests.
2026-05-18 13:08:34 +03:00

1143 lines
40 KiB
TypeScript

import { stableHash } from '../bridge/OpenCodeBridgeCommandContract';
import { VersionedJsonStore, VersionedJsonStoreError } from '../store/VersionedJsonStore';
import type {
OpenCodeDeliveryResponseObservation,
OpenCodeDeliveryResponseState,
OpenCodeDeliveryVisibleReplyCorrelation,
} from '../bridge/OpenCodeBridgeCommandContract';
import type { AgentActionMode, InboxMessage, InboxMessageKind, TaskRef } from '@shared/types/team';
export const OPENCODE_PROMPT_DELIVERY_LEDGER_SCHEMA_VERSION = 1;
export const OPENCODE_PROMPT_DELIVERY_RESPONDED_RETENTION_MS = 7 * 24 * 60 * 60 * 1000;
export const OPENCODE_PROMPT_DELIVERY_FAILED_RETENTION_MS = 30 * 24 * 60 * 60 * 1000;
export const OPENCODE_PROMPT_DELIVERY_SESSION_REFRESH_MAX_ATTEMPTS = 5;
export type OpenCodePromptDeliveryStatus =
| 'pending'
| 'accepted'
| 'responded'
| 'unanswered'
| 'retry_scheduled'
| 'retried'
| 'failed_retryable'
| 'failed_terminal';
export interface OpenCodePromptDeliveryLedgerRecord {
id: string;
teamName: string;
memberName: string;
laneId: string;
runId: string | null;
runtimeSessionId: string | null;
runtimePromptMessageId?: string | null;
runtimePromptMessageIds?: string[];
lastRuntimePromptMessageId?: string | null;
lastDeliveryAttemptIdWithAcceptedPrompt?: string | null;
inboxMessageId: string;
inboxTimestamp: string;
source: 'watcher' | 'ui-send' | 'manual' | 'watchdog' | 'member-work-sync-review-pickup';
messageKind: InboxMessageKind | null;
workSyncIntent?: InboxMessage['workSyncIntent'] | null;
replyRecipient: string;
actionMode: AgentActionMode | null;
taskRefs: TaskRef[];
payloadHash: string;
status: OpenCodePromptDeliveryStatus;
responseState: OpenCodeDeliveryResponseState;
attempts: number;
maxAttempts: number;
sessionRefreshAttempts?: number;
maxSessionRefreshAttempts?: number;
lastSessionRefreshReason?: string | null;
acceptanceUnknown: boolean;
nextAttemptAt: string | null;
lastAttemptAt: string | null;
lastObservedAt: string | null;
acceptedAt: string | null;
respondedAt: string | null;
failedAt: string | null;
inboxReadCommittedAt: string | null;
inboxReadCommitError: string | null;
prePromptCursor: string | null;
postPromptCursor: string | null;
deliveredUserMessageId: string | null;
observedAssistantMessageId: string | null;
observedAssistantPreview: string | null;
observedToolCallNames: string[];
observedVisibleMessageId: string | null;
visibleReplyMessageId: string | null;
visibleReplyInbox: string | null;
visibleReplyCorrelation: OpenCodeDeliveryVisibleReplyCorrelation | null;
lastReason: string | null;
diagnostics: string[];
createdAt: string;
updatedAt: string;
}
const OPENCODE_PROMPT_DELIVERY_STATUSES = new Set<OpenCodePromptDeliveryStatus>([
'pending',
'accepted',
'responded',
'unanswered',
'retry_scheduled',
'retried',
'failed_retryable',
'failed_terminal',
]);
const OPENCODE_DELIVERY_RESPONSE_STATES = new Set<OpenCodeDeliveryResponseState>([
'not_observed',
'pending',
'prompt_not_indexed',
'responded_tool_call',
'responded_visible_message',
'responded_non_visible_tool',
'responded_plain_text',
'permission_blocked',
'tool_error',
'empty_assistant_turn',
'prompt_delivered_no_assistant_message',
'session_stale',
'session_error',
'reconcile_failed',
]);
const OPENCODE_PROMPT_DELIVERY_SOURCES = new Set<OpenCodePromptDeliveryLedgerRecord['source']>([
'watcher',
'ui-send',
'manual',
'watchdog',
'member-work-sync-review-pickup',
]);
const OPENCODE_DELIVERY_VISIBLE_REPLY_CORRELATIONS =
new Set<OpenCodeDeliveryVisibleReplyCorrelation>([
'relayOfMessageId',
'direct_child_message_send',
'plain_assistant_text',
]);
const AGENT_ACTION_MODES = new Set<AgentActionMode>(['do', 'ask', 'delegate']);
export interface EnsureOpenCodePromptDeliveryInput {
teamName: string;
memberName: string;
laneId: string;
runId?: string | null;
inboxMessageId: string;
inboxTimestamp: string;
source: OpenCodePromptDeliveryLedgerRecord['source'];
messageKind?: InboxMessageKind | null;
workSyncIntent?: InboxMessage['workSyncIntent'] | null;
replyRecipient: string;
actionMode?: AgentActionMode | null;
taskRefs?: TaskRef[];
payloadHash: string;
maxAttempts?: number;
now: string;
}
export interface ApplyOpenCodePromptDeliveryResultInput {
id: string;
accepted: boolean;
attempted?: boolean;
responseObservation?: OpenCodeDeliveryResponseObservation;
sessionId?: string | null;
runtimePromptMessageId?: string | null;
deliveryAttemptId?: string | null;
runtimePid?: number;
prePromptCursor?: string | null;
diagnostics?: string[];
reason?: string | null;
now: string;
}
export interface ApplyOpenCodePromptDestinationProofInput {
id: string;
visibleReplyInbox: string;
visibleReplyMessageId: string;
visibleReplyCorrelation: OpenCodeDeliveryVisibleReplyCorrelation;
semanticallySufficient: boolean;
diagnostics?: string[];
observedAt: string;
}
export class OpenCodePromptDeliveryLedgerStore {
constructor(private readonly store: VersionedJsonStore<OpenCodePromptDeliveryLedgerRecord[]>) {}
async ensurePending(
input: EnsureOpenCodePromptDeliveryInput
): Promise<OpenCodePromptDeliveryLedgerRecord> {
const id = buildOpenCodePromptDeliveryRecordId(input);
let result: OpenCodePromptDeliveryLedgerRecord | null = null;
await this.store.updateLocked((records) => {
const existing = records.find((record) => record.id === id);
if (existing) {
if (existing.payloadHash !== input.payloadHash) {
const reason = 'opencode_prompt_delivery_payload_mismatch';
const updated: OpenCodePromptDeliveryLedgerRecord = {
...existing,
status: 'failed_terminal',
failedAt: input.now,
nextAttemptAt: null,
lastReason: reason,
diagnostics: mergeDiagnostics(existing.diagnostics, [
`${reason}: existing payload hash does not match current inbox row payload`,
]),
updatedAt: input.now,
};
result = updated;
return records.map((record) => (record.id === existing.id ? updated : record));
}
if (existing.messageKind == null && input.messageKind) {
const updated: OpenCodePromptDeliveryLedgerRecord = {
...existing,
messageKind: input.messageKind,
...(input.workSyncIntent ? { workSyncIntent: input.workSyncIntent } : {}),
updatedAt: input.now,
};
result = updated;
return records.map((record) => (record.id === existing.id ? updated : record));
}
if (existing.workSyncIntent == null && input.workSyncIntent) {
const updated: OpenCodePromptDeliveryLedgerRecord = {
...existing,
workSyncIntent: input.workSyncIntent,
updatedAt: input.now,
};
result = updated;
return records.map((record) => (record.id === existing.id ? updated : record));
}
result = existing;
return records;
}
const created: OpenCodePromptDeliveryLedgerRecord = {
id,
teamName: input.teamName,
memberName: input.memberName,
laneId: input.laneId,
runId: input.runId ?? null,
runtimeSessionId: null,
runtimePromptMessageId: null,
runtimePromptMessageIds: [],
lastRuntimePromptMessageId: null,
lastDeliveryAttemptIdWithAcceptedPrompt: null,
inboxMessageId: input.inboxMessageId,
inboxTimestamp: input.inboxTimestamp,
source: input.source,
messageKind: input.messageKind ?? null,
workSyncIntent: input.workSyncIntent ?? null,
replyRecipient: input.replyRecipient,
actionMode: input.actionMode ?? null,
taskRefs: input.taskRefs ?? [],
payloadHash: input.payloadHash,
status: 'pending',
responseState: 'not_observed',
attempts: 0,
maxAttempts: input.maxAttempts ?? 3,
sessionRefreshAttempts: 0,
maxSessionRefreshAttempts: OPENCODE_PROMPT_DELIVERY_SESSION_REFRESH_MAX_ATTEMPTS,
lastSessionRefreshReason: null,
acceptanceUnknown: false,
nextAttemptAt: null,
lastAttemptAt: null,
lastObservedAt: null,
acceptedAt: null,
respondedAt: null,
failedAt: null,
inboxReadCommittedAt: null,
inboxReadCommitError: null,
prePromptCursor: null,
postPromptCursor: null,
deliveredUserMessageId: null,
observedAssistantMessageId: null,
observedAssistantPreview: null,
observedToolCallNames: [],
observedVisibleMessageId: null,
visibleReplyMessageId: null,
visibleReplyInbox: null,
visibleReplyCorrelation: null,
lastReason: null,
diagnostics: [],
createdAt: input.now,
updatedAt: input.now,
};
result = created;
return [...records, created];
});
if (!result) {
throw new Error('OpenCode prompt delivery ensurePending failed');
}
return result;
}
async getByInboxMessage(input: {
teamName: string;
memberName: string;
laneId: string;
inboxMessageId: string;
}): Promise<OpenCodePromptDeliveryLedgerRecord | null> {
const records = await this.readRequired();
return (
records.find(
(record) =>
record.teamName === input.teamName &&
record.memberName.toLowerCase() === input.memberName.toLowerCase() &&
record.laneId === input.laneId &&
record.inboxMessageId === input.inboxMessageId
) ?? null
);
}
async getActiveForMember(input: {
teamName: string;
memberName: string;
laneId: string;
}): Promise<OpenCodePromptDeliveryLedgerRecord | null> {
const records = await this.readRequired();
return (
records
.filter(
(record) =>
record.teamName === input.teamName &&
record.memberName.toLowerCase() === input.memberName.toLowerCase() &&
record.laneId === input.laneId &&
!isTerminalForAutomaticSelection(record)
)
.sort((left, right) => Date.parse(left.createdAt) - Date.parse(right.createdAt))[0] ?? null
);
}
async applyDeliveryResult(
input: ApplyOpenCodePromptDeliveryResultInput
): Promise<OpenCodePromptDeliveryLedgerRecord> {
return await this.updateExisting(input.id, (record) => {
const observation = input.responseObservation;
const responseState =
observation?.state ?? (input.accepted ? record.responseState : 'not_observed');
const sessionRefreshState = isOpenCodeSessionRefreshResponseState({
responseState,
reason: input.reason ?? observation?.reason ?? record.lastReason,
diagnostics: input.diagnostics,
});
const responded = isOpenCodePromptResponseStateResponded(responseState);
const unanswered = isOpenCodePromptDeliveryUnansweredResponseState(responseState);
const acceptedRuntimePromptMessageId =
input.accepted && input.runtimePromptMessageId?.trim()
? input.runtimePromptMessageId.trim()
: null;
const previousRuntimePromptMessageIds = getOpenCodeRuntimePromptMessageIds(record);
const runtimePromptMessageIds =
acceptedRuntimePromptMessageId &&
!previousRuntimePromptMessageIds.includes(acceptedRuntimePromptMessageId)
? [...previousRuntimePromptMessageIds, acceptedRuntimePromptMessageId]
: previousRuntimePromptMessageIds;
const acceptedDeliveryAttemptId = input.deliveryAttemptId?.trim() || null;
const acceptedAttemptAlreadyRecorded = Boolean(
input.accepted &&
acceptedDeliveryAttemptId &&
record.lastDeliveryAttemptIdWithAcceptedPrompt === acceptedDeliveryAttemptId
);
const acceptedPromptAlreadyRecorded = Boolean(
input.accepted &&
acceptedRuntimePromptMessageId &&
previousRuntimePromptMessageIds.includes(acceptedRuntimePromptMessageId)
);
const shouldIncrementAttempts =
(input.accepted || input.attempted === true) &&
!acceptedAttemptAlreadyRecorded &&
!acceptedPromptAlreadyRecorded &&
!sessionRefreshState;
const lastRuntimePromptMessageId =
acceptedRuntimePromptMessageId ??
record.lastRuntimePromptMessageId ??
record.runtimePromptMessageId ??
runtimePromptMessageIds[runtimePromptMessageIds.length - 1] ??
null;
return {
...record,
status: input.accepted
? responded
? 'responded'
: unanswered
? 'unanswered'
: 'accepted'
: 'failed_retryable',
responseState,
attempts: shouldIncrementAttempts ? record.attempts + 1 : record.attempts,
runtimeSessionId: input.sessionId ?? record.runtimeSessionId,
runtimePromptMessageId: lastRuntimePromptMessageId,
runtimePromptMessageIds,
lastRuntimePromptMessageId,
lastDeliveryAttemptIdWithAcceptedPrompt:
input.accepted && acceptedDeliveryAttemptId
? acceptedDeliveryAttemptId
: (record.lastDeliveryAttemptIdWithAcceptedPrompt ?? null),
acceptanceUnknown: input.accepted ? false : record.acceptanceUnknown,
lastAttemptAt: input.now,
lastObservedAt: observation ? input.now : record.lastObservedAt,
acceptedAt: input.accepted ? (record.acceptedAt ?? input.now) : record.acceptedAt,
respondedAt: responded ? (record.respondedAt ?? input.now) : record.respondedAt,
prePromptCursor: input.prePromptCursor ?? record.prePromptCursor,
deliveredUserMessageId:
observation?.deliveredUserMessageId ?? record.deliveredUserMessageId,
observedAssistantMessageId:
observation?.assistantMessageId ?? record.observedAssistantMessageId,
observedAssistantPreview:
observation?.latestAssistantPreview ?? record.observedAssistantPreview,
observedToolCallNames: observation?.toolCallNames ?? record.observedToolCallNames,
observedVisibleMessageId:
observation?.visibleMessageToolCallId ?? record.observedVisibleMessageId,
visibleReplyMessageId: observation?.visibleReplyMessageId ?? record.visibleReplyMessageId,
visibleReplyCorrelation:
observation?.visibleReplyCorrelation ?? record.visibleReplyCorrelation,
lastReason: input.reason ?? observation?.reason ?? record.lastReason,
lastSessionRefreshReason: sessionRefreshState
? (input.reason ?? observation?.reason ?? record.lastSessionRefreshReason ?? null)
: (record.lastSessionRefreshReason ?? null),
diagnostics: mergeDiagnostics(record.diagnostics, input.diagnostics ?? []),
updatedAt: input.now,
};
});
}
async applyObservation(input: {
id: string;
responseObservation: OpenCodeDeliveryResponseObservation;
sessionId?: string | null;
runtimePromptMessageId?: string | null;
diagnostics?: string[];
observedAt: string;
}): Promise<OpenCodePromptDeliveryLedgerRecord> {
return await this.updateExisting(input.id, (record) => {
const responded = isOpenCodePromptResponseStateResponded(input.responseObservation.state);
const unanswered = isOpenCodePromptDeliveryUnansweredResponseState(
input.responseObservation.state
);
const sessionRefreshState = isOpenCodeSessionRefreshResponseState({
responseState: input.responseObservation.state,
reason: input.responseObservation.reason ?? record.lastReason,
diagnostics: input.diagnostics,
});
const previousRuntimePromptMessageIds = getOpenCodeRuntimePromptMessageIds(record);
const deliveredRuntimePromptMessageId =
input.responseObservation.deliveredUserMessageId?.trim() || null;
const requestedRuntimePromptMessageId = input.runtimePromptMessageId?.trim() || null;
const requestedRuntimePromptMessageIdIsKnown = Boolean(
requestedRuntimePromptMessageId &&
previousRuntimePromptMessageIds.includes(requestedRuntimePromptMessageId)
);
const observedRuntimePromptMessageId =
deliveredRuntimePromptMessageId ||
(requestedRuntimePromptMessageIdIsKnown ? requestedRuntimePromptMessageId : null);
const runtimePromptMessageIds =
observedRuntimePromptMessageId &&
!previousRuntimePromptMessageIds.includes(observedRuntimePromptMessageId)
? [...previousRuntimePromptMessageIds, observedRuntimePromptMessageId]
: previousRuntimePromptMessageIds;
const promptAcceptedByObservation = Boolean(deliveredRuntimePromptMessageId);
const lastRuntimePromptMessageId =
observedRuntimePromptMessageId ??
record.lastRuntimePromptMessageId ??
record.runtimePromptMessageId ??
runtimePromptMessageIds[runtimePromptMessageIds.length - 1] ??
null;
return {
...record,
status: responded
? 'responded'
: unanswered
? 'unanswered'
: record.status === 'pending' || promptAcceptedByObservation
? 'accepted'
: record.status,
responseState: input.responseObservation.state,
runtimeSessionId: input.sessionId ?? record.runtimeSessionId,
runtimePromptMessageId: lastRuntimePromptMessageId,
runtimePromptMessageIds,
lastRuntimePromptMessageId,
acceptanceUnknown: promptAcceptedByObservation ? false : record.acceptanceUnknown,
lastObservedAt: input.observedAt,
acceptedAt: promptAcceptedByObservation
? (record.acceptedAt ?? input.observedAt)
: record.acceptedAt,
respondedAt: responded ? (record.respondedAt ?? input.observedAt) : record.respondedAt,
deliveredUserMessageId:
input.responseObservation.deliveredUserMessageId ?? record.deliveredUserMessageId,
observedAssistantMessageId:
input.responseObservation.assistantMessageId ?? record.observedAssistantMessageId,
observedAssistantPreview:
input.responseObservation.latestAssistantPreview ?? record.observedAssistantPreview,
observedToolCallNames: input.responseObservation.toolCallNames,
observedVisibleMessageId:
input.responseObservation.visibleMessageToolCallId ?? record.observedVisibleMessageId,
visibleReplyMessageId:
input.responseObservation.visibleReplyMessageId ?? record.visibleReplyMessageId,
visibleReplyCorrelation:
input.responseObservation.visibleReplyCorrelation ?? record.visibleReplyCorrelation,
lastReason: input.responseObservation.reason ?? record.lastReason,
lastSessionRefreshReason: sessionRefreshState
? (input.responseObservation.reason ?? record.lastSessionRefreshReason ?? null)
: (record.lastSessionRefreshReason ?? null),
diagnostics: mergeDiagnostics(record.diagnostics, input.diagnostics ?? []),
updatedAt: input.observedAt,
};
});
}
async applyDestinationProof(
input: ApplyOpenCodePromptDestinationProofInput
): Promise<OpenCodePromptDeliveryLedgerRecord> {
const responseState =
input.visibleReplyCorrelation === 'plain_assistant_text'
? 'responded_plain_text'
: 'responded_visible_message';
return await this.updateExisting(input.id, (record) => ({
...record,
status: input.semanticallySufficient ? 'responded' : record.status,
responseState,
lastObservedAt: input.observedAt,
respondedAt: input.semanticallySufficient
? (record.respondedAt ?? input.observedAt)
: record.respondedAt,
visibleReplyInbox: input.visibleReplyInbox,
visibleReplyMessageId: input.visibleReplyMessageId,
visibleReplyCorrelation: input.visibleReplyCorrelation,
lastReason: input.semanticallySufficient
? record.lastReason
: selectOpenCodeDestinationProofInsufficientReason(input.diagnostics),
diagnostics: mergeDiagnostics(record.diagnostics, input.diagnostics ?? []),
updatedAt: input.observedAt,
}));
}
async markAcceptanceUnknown(input: {
id: string;
reason: string;
nextAttemptAt: string;
diagnostics?: string[];
markedAt: string;
}): Promise<OpenCodePromptDeliveryLedgerRecord> {
return await this.updateExisting(input.id, (record) => ({
...record,
status: 'failed_retryable',
responseState: 'not_observed',
acceptanceUnknown: true,
nextAttemptAt: input.nextAttemptAt,
lastReason: input.reason,
diagnostics: mergeDiagnostics(record.diagnostics, [
input.reason,
...(input.diagnostics ?? []),
]),
updatedAt: input.markedAt,
}));
}
async markNextAttemptScheduled(input: {
id: string;
status: Extract<OpenCodePromptDeliveryStatus, 'accepted' | 'retry_scheduled'>;
nextAttemptAt: string;
reason: string;
scheduledAt: string;
}): Promise<OpenCodePromptDeliveryLedgerRecord> {
return await this.updateExisting(input.id, (record) => ({
...record,
status: input.status,
nextAttemptAt: input.nextAttemptAt,
lastReason: input.reason,
updatedAt: input.scheduledAt,
}));
}
async markSessionRefreshScheduled(input: {
id: string;
nextAttemptAt: string;
reason: string;
scheduledAt: string;
maxSessionRefreshAttempts?: number;
diagnostics?: string[];
}): Promise<OpenCodePromptDeliveryLedgerRecord> {
return await this.updateExisting(input.id, (record) => {
const maxSessionRefreshAttempts =
record.maxSessionRefreshAttempts ??
input.maxSessionRefreshAttempts ??
OPENCODE_PROMPT_DELIVERY_SESSION_REFRESH_MAX_ATTEMPTS;
const sessionRefreshAttempts = (record.sessionRefreshAttempts ?? 0) + 1;
return {
...record,
status: 'retry_scheduled',
responseState: 'session_stale',
nextAttemptAt: input.nextAttemptAt,
sessionRefreshAttempts,
maxSessionRefreshAttempts,
lastSessionRefreshReason: input.reason,
lastReason: input.reason,
diagnostics: mergeDiagnostics(record.diagnostics, [
input.reason,
...(input.diagnostics ?? []),
]),
updatedAt: input.scheduledAt,
};
});
}
async markRetryAttempted(input: {
id: string;
attemptedAt: string;
reason?: string | null;
}): Promise<OpenCodePromptDeliveryLedgerRecord> {
return await this.updateExisting(input.id, (record) => ({
...record,
status: 'retried',
attempts: record.attempts + 1,
lastAttemptAt: input.attemptedAt,
nextAttemptAt: null,
lastReason: input.reason ?? record.lastReason,
updatedAt: input.attemptedAt,
}));
}
async markFailedTerminal(input: {
id: string;
reason: string;
diagnostics?: string[];
failedAt: string;
}): Promise<OpenCodePromptDeliveryLedgerRecord> {
return await this.updateExisting(input.id, (record) => ({
...record,
status: 'failed_terminal',
failedAt: input.failedAt,
nextAttemptAt: null,
lastReason: input.reason,
diagnostics: mergeDiagnostics(record.diagnostics, [
input.reason,
...(input.diagnostics ?? []),
]),
updatedAt: input.failedAt,
}));
}
async markInboxReadCommitted(input: {
id: string;
committedAt: string;
}): Promise<OpenCodePromptDeliveryLedgerRecord> {
return await this.updateExisting(input.id, (record) => ({
...record,
inboxReadCommittedAt: input.committedAt,
inboxReadCommitError: null,
updatedAt: input.committedAt,
}));
}
async markInboxReadCommitFailed(input: {
id: string;
error: string;
failedAt: string;
}): Promise<OpenCodePromptDeliveryLedgerRecord> {
return await this.updateExisting(input.id, (record) => ({
...record,
inboxReadCommitError: input.error,
diagnostics: mergeDiagnostics(record.diagnostics, [input.error]),
updatedAt: input.failedAt,
}));
}
async list(): Promise<OpenCodePromptDeliveryLedgerRecord[]> {
return await this.readRequired();
}
async listDue(input: {
teamName?: string;
now: Date;
limit: number;
}): Promise<OpenCodePromptDeliveryLedgerRecord[]> {
const nowMs = input.now.getTime();
const limit = Math.max(0, input.limit);
if (limit === 0) {
return [];
}
const teamName = input.teamName?.trim().toLowerCase() ?? null;
const records = await this.readRequired();
return records
.filter((record) => {
if (teamName && record.teamName.trim().toLowerCase() !== teamName) {
return false;
}
if (isTerminalForAutomaticSelection(record)) {
return false;
}
return isOpenCodePromptDeliveryAttemptDue(record, nowMs);
})
.sort(compareOpenCodePromptDeliveryDueOrder)
.slice(0, limit);
}
async pruneTerminalRecords(input: {
now: Date;
respondedRetentionMs?: number;
failedRetentionMs?: number;
}): Promise<{ pruned: number; remaining: number }> {
const nowMs = input.now.getTime();
const respondedRetentionMs =
input.respondedRetentionMs ?? OPENCODE_PROMPT_DELIVERY_RESPONDED_RETENTION_MS;
const failedRetentionMs =
input.failedRetentionMs ?? OPENCODE_PROMPT_DELIVERY_FAILED_RETENTION_MS;
let pruned = 0;
let remaining = 0;
await this.store.updateLocked((records) => {
const kept = records.filter((record) => {
if (
shouldPruneOpenCodePromptDeliveryRecord(
record,
nowMs,
respondedRetentionMs,
failedRetentionMs
)
) {
pruned += 1;
return false;
}
return true;
});
remaining = kept.length;
return kept;
});
return { pruned, remaining };
}
private async updateExisting(
id: string,
updater: (record: OpenCodePromptDeliveryLedgerRecord) => OpenCodePromptDeliveryLedgerRecord
): Promise<OpenCodePromptDeliveryLedgerRecord> {
let updated: OpenCodePromptDeliveryLedgerRecord | null = null;
await this.store.updateLocked((records) =>
records.map((record) => {
if (record.id !== id) {
return record;
}
updated = updater(record);
return updated;
})
);
if (!updated) {
throw new Error(`OpenCode prompt delivery record not found: ${id}`);
}
return updated;
}
private async readRequired(): Promise<OpenCodePromptDeliveryLedgerRecord[]> {
const result = await this.store.read();
if (!result.ok) {
throw new VersionedJsonStoreError(result.message, result.reason, result.quarantinePath);
}
return result.data;
}
}
export function createOpenCodePromptDeliveryLedgerStore(options: {
filePath: string;
clock?: () => Date;
}): OpenCodePromptDeliveryLedgerStore {
const clock = options.clock ?? (() => new Date());
return new OpenCodePromptDeliveryLedgerStore(
new VersionedJsonStore<OpenCodePromptDeliveryLedgerRecord[]>({
filePath: options.filePath,
schemaVersion: OPENCODE_PROMPT_DELIVERY_LEDGER_SCHEMA_VERSION,
defaultData: () => [],
validate: validateOpenCodePromptDeliveryLedgerRecords,
clock,
})
);
}
export function buildOpenCodePromptDeliveryRecordId(input: {
teamName: string;
memberName: string;
laneId: string;
inboxMessageId: string;
}): string {
return `opencode-prompt:${stableHash({
version: 1,
teamName: input.teamName,
memberName: input.memberName.toLowerCase(),
laneId: input.laneId,
inboxMessageId: input.inboxMessageId,
})}`;
}
export function hashOpenCodePromptDeliveryPayload(input: {
text: string;
replyRecipient: string;
actionMode?: AgentActionMode | null;
taskRefs?: TaskRef[];
attachments?: { id?: string; filename?: string; mimeType?: string; size?: number }[];
source?: string;
}): string {
return `sha256:${stableHash({
text: input.text,
replyRecipient: input.replyRecipient,
actionMode: input.actionMode ?? null,
taskRefs: input.taskRefs ?? [],
attachments:
input.attachments?.map((attachment) => ({
id: attachment.id ?? null,
filename: attachment.filename ?? null,
mimeType: attachment.mimeType ?? null,
size: attachment.size ?? null,
})) ?? [],
source: input.source ?? null,
})}`;
}
export function getOpenCodeRuntimePromptMessageIds(
record: Pick<
OpenCodePromptDeliveryLedgerRecord,
'runtimePromptMessageId' | 'runtimePromptMessageIds' | 'lastRuntimePromptMessageId'
>
): string[] {
const ids: string[] = [];
for (const value of [
...(Array.isArray(record.runtimePromptMessageIds) ? record.runtimePromptMessageIds : []),
record.runtimePromptMessageId,
record.lastRuntimePromptMessageId,
]) {
const id = typeof value === 'string' ? value.trim() : '';
if (id && !ids.includes(id)) {
ids.push(id);
}
}
return ids;
}
export function getLatestOpenCodeRuntimePromptMessageId(
record: Pick<
OpenCodePromptDeliveryLedgerRecord,
'runtimePromptMessageId' | 'runtimePromptMessageIds' | 'lastRuntimePromptMessageId'
>
): string | null {
const explicit =
record.lastRuntimePromptMessageId?.trim() || record.runtimePromptMessageId?.trim();
if (explicit) {
return explicit;
}
const ids = getOpenCodeRuntimePromptMessageIds(record);
return ids[ids.length - 1] ?? null;
}
export function buildOpenCodePromptDeliveryAttemptId(
record: Pick<
OpenCodePromptDeliveryLedgerRecord,
'id' | 'attempts' | 'payloadHash' | 'sessionRefreshAttempts'
>
): string {
const base = [record.id, record.attempts + 1, record.payloadHash.slice(0, 12)];
const sessionRefreshAttempts = record.sessionRefreshAttempts ?? 0;
if (sessionRefreshAttempts > 0) {
base.push(`refresh${sessionRefreshAttempts}`);
}
return base.join(':');
}
export function isOpenCodePromptResponseStateResponded(
state: OpenCodeDeliveryResponseState
): boolean {
return (
state === 'responded_visible_message' ||
state === 'responded_non_visible_tool' ||
state === 'responded_tool_call' ||
state === 'responded_plain_text'
);
}
function isOpenCodePromptDeliveryUnansweredResponseState(
state: OpenCodeDeliveryResponseState
): boolean {
return state === 'empty_assistant_turn' || state === 'prompt_delivered_no_assistant_message';
}
export function isOpenCodeResolvedBehaviorChangedReason(
reason: string | null | undefined
): boolean {
return /\bresolved_behavior_changed:[^\s]+->[^\s]+/i.test(reason?.trim() ?? '');
}
export function isOpenCodeSessionTransportChangedReason(
reason: string | null | undefined
): boolean {
return /\bopencode_app_mcp_transport_changed:[^\s]+->[^\s]+/i.test(reason?.trim() ?? '');
}
export function isOpenCodeSessionRefreshResponseState(input: {
responseState?: OpenCodeDeliveryResponseState;
reason?: string | null;
diagnostics?: readonly string[];
}): boolean {
if (input.responseState === 'session_stale') {
return true;
}
if (
isOpenCodeResolvedBehaviorChangedReason(input.reason) ||
isOpenCodeSessionTransportChangedReason(input.reason)
) {
return true;
}
return (input.diagnostics ?? []).some(
(diagnostic) =>
isOpenCodeResolvedBehaviorChangedReason(diagnostic) ||
isOpenCodeSessionTransportChangedReason(diagnostic)
);
}
export function isOpenCodePromptDeliveryAttemptDue(
record: OpenCodePromptDeliveryLedgerRecord,
nowMs: number = Date.now()
): boolean {
if (!record.nextAttemptAt) {
return true;
}
const dueMs = Date.parse(record.nextAttemptAt);
return !Number.isFinite(dueMs) || dueMs <= nowMs;
}
export function validateOpenCodePromptDeliveryLedgerRecords(
value: unknown
): OpenCodePromptDeliveryLedgerRecord[] {
if (!Array.isArray(value)) {
throw new Error('OpenCode prompt delivery ledger must be an array');
}
const seen = new Set<string>();
return value.map((record, index) => {
if (!isOpenCodePromptDeliveryLedgerRecord(record)) {
throw new Error(`Invalid OpenCode prompt delivery ledger record at index ${index}`);
}
if (seen.has(record.id)) {
throw new Error(`Duplicate OpenCode prompt delivery ledger id: ${record.id}`);
}
seen.add(record.id);
return record;
});
}
function isOpenCodePromptDeliveryLedgerRecord(
value: unknown
): value is OpenCodePromptDeliveryLedgerRecord {
const record = value && typeof value === 'object' ? (value as Record<string, unknown>) : null;
return Boolean(
record &&
typeof record.id === 'string' &&
typeof record.teamName === 'string' &&
typeof record.memberName === 'string' &&
typeof record.laneId === 'string' &&
isOptionalNullableString(record.runId) &&
isOptionalNullableString(record.runtimeSessionId) &&
isOptionalNullableString(record.runtimePromptMessageId) &&
isOptionalStringArray(record.runtimePromptMessageIds) &&
isOptionalNullableString(record.lastRuntimePromptMessageId) &&
isOptionalNullableString(record.lastDeliveryAttemptIdWithAcceptedPrompt) &&
typeof record.inboxMessageId === 'string' &&
typeof record.inboxTimestamp === 'string' &&
isOpenCodePromptDeliverySource(record.source) &&
isOptionalNullableInboxMessageKind(record.messageKind) &&
typeof record.replyRecipient === 'string' &&
isOptionalNullableActionMode(record.actionMode) &&
isTaskRefArray(record.taskRefs) &&
typeof record.payloadHash === 'string' &&
isOpenCodePromptDeliveryStatus(record.status) &&
isOpenCodeDeliveryResponseState(record.responseState) &&
isNonNegativeInteger(record.attempts) &&
isNonNegativeInteger(record.maxAttempts) &&
isOptionalNonNegativeInteger(record.sessionRefreshAttempts) &&
isOptionalNonNegativeInteger(record.maxSessionRefreshAttempts) &&
isOptionalNullableString(record.lastSessionRefreshReason) &&
typeof record.acceptanceUnknown === 'boolean' &&
isOptionalNullableString(record.nextAttemptAt) &&
isOptionalNullableString(record.lastAttemptAt) &&
isOptionalNullableString(record.lastObservedAt) &&
isOptionalNullableString(record.acceptedAt) &&
isOptionalNullableString(record.respondedAt) &&
isOptionalNullableString(record.failedAt) &&
isOptionalNullableString(record.inboxReadCommittedAt) &&
isOptionalNullableString(record.inboxReadCommitError) &&
isOptionalNullableString(record.prePromptCursor) &&
isOptionalNullableString(record.postPromptCursor) &&
isOptionalNullableString(record.deliveredUserMessageId) &&
isOptionalNullableString(record.observedAssistantMessageId) &&
isOptionalNullableString(record.observedAssistantPreview) &&
isStringArray(record.observedToolCallNames) &&
isOptionalNullableString(record.observedVisibleMessageId) &&
isOptionalNullableString(record.visibleReplyMessageId) &&
isOptionalNullableString(record.visibleReplyInbox) &&
isOptionalNullableVisibleReplyCorrelation(record.visibleReplyCorrelation) &&
isOptionalNullableString(record.lastReason) &&
isStringArray(record.diagnostics) &&
typeof record.createdAt === 'string' &&
typeof record.updatedAt === 'string'
);
}
function isOpenCodePromptDeliveryStatus(value: unknown): value is OpenCodePromptDeliveryStatus {
return (
typeof value === 'string' &&
OPENCODE_PROMPT_DELIVERY_STATUSES.has(value as OpenCodePromptDeliveryStatus)
);
}
function isOpenCodeDeliveryResponseState(value: unknown): value is OpenCodeDeliveryResponseState {
return (
typeof value === 'string' &&
OPENCODE_DELIVERY_RESPONSE_STATES.has(value as OpenCodeDeliveryResponseState)
);
}
function isOpenCodePromptDeliverySource(
value: unknown
): value is OpenCodePromptDeliveryLedgerRecord['source'] {
return (
typeof value === 'string' &&
OPENCODE_PROMPT_DELIVERY_SOURCES.has(value as OpenCodePromptDeliveryLedgerRecord['source'])
);
}
function isOptionalNullableVisibleReplyCorrelation(
value: unknown
): value is OpenCodeDeliveryVisibleReplyCorrelation | null | undefined {
return (
value === undefined ||
value === null ||
(typeof value === 'string' &&
OPENCODE_DELIVERY_VISIBLE_REPLY_CORRELATIONS.has(
value as OpenCodeDeliveryVisibleReplyCorrelation
))
);
}
function isOptionalNullableActionMode(value: unknown): value is AgentActionMode | null | undefined {
return (
value === undefined ||
value === null ||
(typeof value === 'string' && AGENT_ACTION_MODES.has(value as AgentActionMode))
);
}
function isOptionalNullableInboxMessageKind(
value: unknown
): value is InboxMessageKind | null | undefined {
return (
value === undefined ||
value === null ||
value === 'default' ||
value === 'slash_command' ||
value === 'slash_command_result' ||
value === 'task_comment_notification' ||
value === 'task_stall_remediation' ||
value === 'member_work_sync_nudge' ||
value === 'agent_error'
);
}
function isOptionalNullableString(value: unknown): value is string | null | undefined {
return value === undefined || value === null || typeof value === 'string';
}
function isStringArray(value: unknown): value is string[] {
return Array.isArray(value) && value.every((item) => typeof item === 'string');
}
function isOptionalStringArray(value: unknown): value is string[] | undefined {
return value === undefined || isStringArray(value);
}
function isNonNegativeInteger(value: unknown): value is number {
return Number.isInteger(value) && (value as number) >= 0;
}
function isOptionalNonNegativeInteger(value: unknown): value is number | undefined {
return value === undefined || isNonNegativeInteger(value);
}
function isTaskRefArray(value: unknown): value is TaskRef[] {
return (
Array.isArray(value) &&
value.every((item) => {
if (!item || typeof item !== 'object' || Array.isArray(item)) {
return false;
}
const taskRef = item as Record<string, unknown>;
return (
typeof taskRef.taskId === 'string' &&
typeof taskRef.displayId === 'string' &&
typeof taskRef.teamName === 'string'
);
})
);
}
function isTerminalForAutomaticSelection(record: OpenCodePromptDeliveryLedgerRecord): boolean {
if (
record.status === 'responded' &&
record.responseState === 'responded_plain_text' &&
!record.visibleReplyMessageId &&
!record.inboxReadCommittedAt
) {
return false;
}
return record.status === 'failed_terminal' || record.status === 'responded';
}
function compareOpenCodePromptDeliveryDueOrder(
left: OpenCodePromptDeliveryLedgerRecord,
right: OpenCodePromptDeliveryLedgerRecord
): number {
const leftDue = left.nextAttemptAt ? Date.parse(left.nextAttemptAt) : Date.parse(left.createdAt);
const rightDue = right.nextAttemptAt
? Date.parse(right.nextAttemptAt)
: Date.parse(right.createdAt);
const dueDelta = safeSortableTime(leftDue) - safeSortableTime(rightDue);
if (dueDelta !== 0) {
return dueDelta;
}
return Date.parse(left.createdAt) - Date.parse(right.createdAt);
}
function safeSortableTime(value: number): number {
return Number.isFinite(value) ? value : 0;
}
function shouldPruneOpenCodePromptDeliveryRecord(
record: OpenCodePromptDeliveryLedgerRecord,
nowMs: number,
respondedRetentionMs: number,
failedRetentionMs: number
): boolean {
if (record.status === 'responded' && record.inboxReadCommittedAt) {
const committedMs = Date.parse(record.inboxReadCommittedAt);
return Number.isFinite(committedMs) && nowMs - committedMs >= respondedRetentionMs;
}
if (record.status === 'failed_terminal') {
const failedMs = Date.parse(record.failedAt ?? record.updatedAt);
return Number.isFinite(failedMs) && nowMs - failedMs >= failedRetentionMs;
}
return false;
}
function selectOpenCodeDestinationProofInsufficientReason(
diagnostics: readonly string[] | undefined
): string {
const normalizedDiagnostics = (diagnostics ?? []).map((diagnostic) =>
diagnostic.trim().toLowerCase()
);
if (
normalizedDiagnostics.includes('visible_reply_missing_task_refs') ||
normalizedDiagnostics.includes('visible_reply_missing_task_refs_after_merge') ||
normalizedDiagnostics.includes('visible_reply_task_refs_merge_failed')
) {
return 'visible_reply_missing_task_refs';
}
return 'visible_reply_ack_only_still_requires_answer';
}
function mergeDiagnostics(existing: string[], next: string[]): string[] {
return [...new Set([...existing, ...next].filter((item) => item.trim()))];
}