fix(team): recover duplicate opencode send outcomes
This commit is contained in:
parent
9dc7858e22
commit
3e1695fd80
2 changed files with 100 additions and 7 deletions
|
|
@ -79,6 +79,8 @@ const DEFAULT_STOP_TIMEOUT_MS = 30_000;
|
|||
const DEFAULT_CLEANUP_TIMEOUT_MS = 10_000;
|
||||
const DEFAULT_BACKFILL_TIMEOUT_MS = 45_000;
|
||||
const DEFAULT_COMMAND_STATUS_TIMEOUT_MS = 5_000;
|
||||
const OPEN_CODE_COMPLETED_COMMAND_RECOVERY_MESSAGE =
|
||||
'OpenCode bridge command already completed; recover through commandStatus';
|
||||
|
||||
function buildSendPayloadHash(input: OpenCodeSendMessageCommandBody): string {
|
||||
const { payloadHash: _payloadHash, settlementMode: _settlementMode, ...hashable } = input;
|
||||
|
|
@ -277,6 +279,17 @@ export class OpenCodeReadinessBridge implements OpenCodeTeamRuntimeBridgePort {
|
|||
result = executed.result;
|
||||
activeRequestId = executed.requestId;
|
||||
} catch (error) {
|
||||
if (body.settlementMode === 'acceptance' && isOpenCodeCompletedCommandRecoveryError(error)) {
|
||||
const recovered = await this.recoverSendMessageOutcome({
|
||||
originalRequestId: null,
|
||||
body: activeBody,
|
||||
diagnosticCode: 'opencode_send_recovered_after_duplicate_completed_command',
|
||||
diagnosticMessage: 'OpenCode bridge outcome recovered after duplicate completed command.',
|
||||
});
|
||||
if (recovered) {
|
||||
return recovered;
|
||||
}
|
||||
}
|
||||
if (
|
||||
body.settlementMode !== 'acceptance' ||
|
||||
!isOpenCodeAcceptanceContractMissingError(error)
|
||||
|
|
@ -316,9 +329,11 @@ export class OpenCodeReadinessBridge implements OpenCodeTeamRuntimeBridgePort {
|
|||
: result.data;
|
||||
}
|
||||
if (result.error.kind === 'timeout') {
|
||||
const recovered = await this.recoverTimedOutSendMessage({
|
||||
const recovered = await this.recoverSendMessageOutcome({
|
||||
originalRequestId: activeRequestId,
|
||||
body: activeBody,
|
||||
diagnosticCode: 'opencode_send_recovered_after_bridge_timeout',
|
||||
diagnosticMessage: 'OpenCode bridge outcome recovered after timeout.',
|
||||
});
|
||||
if (recovered) {
|
||||
return usedObservedFallback ? withOpenCodeObservedFallbackDiagnostic(recovered) : recovered;
|
||||
|
|
@ -342,13 +357,17 @@ export class OpenCodeReadinessBridge implements OpenCodeTeamRuntimeBridgePort {
|
|||
};
|
||||
}
|
||||
|
||||
private async recoverTimedOutSendMessage(input: {
|
||||
originalRequestId: string;
|
||||
private async recoverSendMessageOutcome(input: {
|
||||
originalRequestId?: string | null;
|
||||
body: OpenCodeSendMessageCommandBody;
|
||||
diagnosticCode: string;
|
||||
diagnosticMessage: string;
|
||||
}): Promise<OpenCodeSendMessageCommandData | null> {
|
||||
if (!input.originalRequestId && !input.body.deliveryAttemptId) {
|
||||
return null;
|
||||
}
|
||||
const statusBody: OpenCodeCommandStatusCommandBody = {
|
||||
originalCommand: 'opencode.sendMessage',
|
||||
originalRequestId: input.originalRequestId,
|
||||
deliveryAttemptId: input.body.deliveryAttemptId,
|
||||
teamId: input.body.teamId,
|
||||
teamName: input.body.teamName,
|
||||
|
|
@ -358,6 +377,7 @@ export class OpenCodeReadinessBridge implements OpenCodeTeamRuntimeBridgePort {
|
|||
payloadHash: input.body.payloadHash,
|
||||
projectPath: input.body.projectPath,
|
||||
runId: input.body.runId,
|
||||
...(input.originalRequestId ? { originalRequestId: input.originalRequestId } : {}),
|
||||
};
|
||||
const statusResult = await this.bridge.execute<
|
||||
OpenCodeCommandStatusCommandBody,
|
||||
|
|
@ -370,7 +390,11 @@ export class OpenCodeReadinessBridge implements OpenCodeTeamRuntimeBridgePort {
|
|||
return null;
|
||||
}
|
||||
const status = statusResult.data;
|
||||
if (status.originalRequestId && status.originalRequestId !== input.originalRequestId) {
|
||||
if (
|
||||
input.originalRequestId &&
|
||||
status.originalRequestId &&
|
||||
status.originalRequestId !== input.originalRequestId
|
||||
) {
|
||||
return null;
|
||||
}
|
||||
if (
|
||||
|
|
@ -385,9 +409,9 @@ export class OpenCodeReadinessBridge implements OpenCodeTeamRuntimeBridgePort {
|
|||
}
|
||||
const diagnostics = [
|
||||
{
|
||||
code: 'opencode_send_recovered_after_bridge_timeout',
|
||||
code: input.diagnosticCode,
|
||||
severity: 'warning' as const,
|
||||
message: 'OpenCode bridge outcome recovered after timeout.',
|
||||
message: input.diagnosticMessage,
|
||||
},
|
||||
...status.diagnostics.map((message) => ({
|
||||
code: 'opencode_command_status',
|
||||
|
|
@ -620,6 +644,11 @@ function isOpenCodeAcceptanceContractMissingError(error: unknown): boolean {
|
|||
return message.includes('OpenCode delivery acceptance mode is required');
|
||||
}
|
||||
|
||||
function isOpenCodeCompletedCommandRecoveryError(error: unknown): boolean {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
return message.includes(OPEN_CODE_COMPLETED_COMMAND_RECOVERY_MESSAGE);
|
||||
}
|
||||
|
||||
function withOpenCodeObservedFallbackDiagnostic(
|
||||
data: OpenCodeSendMessageCommandData
|
||||
): OpenCodeSendMessageCommandData {
|
||||
|
|
|
|||
|
|
@ -625,6 +625,70 @@ describe('OpenCodeReadinessBridge', () => {
|
|||
expect(executor.execute).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('recovers duplicate completed guarded send through commandStatus without resending', async () => {
|
||||
const executor = fakeExecutor(
|
||||
bridgeCommandSuccess({
|
||||
command: 'opencode.commandStatus',
|
||||
requestId: 'status-req-duplicate',
|
||||
data: {
|
||||
status: 'prompt_accepted',
|
||||
safeToRetry: false,
|
||||
accepted: true,
|
||||
deliveryAttemptId: 'ledger-1:1:payload',
|
||||
sessionId: 'session-bob',
|
||||
runtimePromptMessageId: 'msg_prompt_1',
|
||||
diagnostics: ['OpenCode prompt acceptance recovered from completed idempotent command.'],
|
||||
},
|
||||
})
|
||||
);
|
||||
const stateChangingExecute = vi.fn(async () => {
|
||||
throw new Error('OpenCode bridge command already completed; recover through commandStatus');
|
||||
});
|
||||
const bridge = new OpenCodeReadinessBridge(executor, {
|
||||
stateChangingCommands: { execute: stateChangingExecute },
|
||||
});
|
||||
const executeMock = executor.execute as unknown as ReturnType<typeof vi.fn>;
|
||||
|
||||
await expect(
|
||||
bridge.sendOpenCodeTeamMessage({
|
||||
runId: 'run-1',
|
||||
teamId: 'team-a',
|
||||
teamName: 'team-a',
|
||||
laneId: 'secondary:opencode:bob',
|
||||
projectPath: '/repo',
|
||||
memberName: 'bob',
|
||||
text: 'hello',
|
||||
messageId: 'message-1',
|
||||
deliveryAttemptId: 'ledger-1:1:payload',
|
||||
settlementMode: 'acceptance',
|
||||
})
|
||||
).resolves.toMatchObject({
|
||||
accepted: true,
|
||||
sessionId: 'session-bob',
|
||||
runtimePromptMessageId: 'msg_prompt_1',
|
||||
diagnostics: expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
code: 'opencode_send_recovered_after_duplicate_completed_command',
|
||||
}),
|
||||
]),
|
||||
});
|
||||
|
||||
expect(stateChangingExecute).toHaveBeenCalledTimes(1);
|
||||
expect(executeMock).toHaveBeenCalledTimes(1);
|
||||
const [command, body, options] = executeMock.mock.calls[0] ?? [];
|
||||
expect(command).toBe('opencode.commandStatus');
|
||||
expect(body).toMatchObject({
|
||||
originalCommand: 'opencode.sendMessage',
|
||||
deliveryAttemptId: 'ledger-1:1:payload',
|
||||
payloadHash: expect.any(String),
|
||||
});
|
||||
expect(body).not.toHaveProperty('originalRequestId');
|
||||
expect(options).toMatchObject({
|
||||
cwd: '/repo',
|
||||
timeoutMs: 5_000,
|
||||
});
|
||||
});
|
||||
|
||||
it('falls back to observed send mode when guarded acceptance contract validation fails', async () => {
|
||||
const executor = fakeExecutor(
|
||||
bridgeCommandSuccess<OpenCodeSendMessageCommandData>({
|
||||
|
|
|
|||
Loading…
Reference in a new issue