agent-ecosystem/test/main/services/team/RuntimeDeliveryService.test.ts
2026-04-21 20:28:22 +03:00

356 lines
11 KiB
TypeScript

import { promises as fs } from 'fs';
import * as os from 'os';
import * as path from 'path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
buildRuntimeDestinationMessageId,
createRuntimeDeliveryJournalStore,
hashRuntimeDeliveryEnvelope,
resolveRuntimeDeliveryDestination,
type RuntimeDeliveryDestinationRef,
type RuntimeDeliveryEnvelope,
type RuntimeDeliveryLocation,
} from '../../../../src/main/services/team/opencode/delivery/RuntimeDeliveryJournal';
import {
RuntimeDeliveryDestinationRegistry,
RuntimeDeliveryReconciler,
RuntimeDeliveryService,
type RuntimeDeliveryDestinationPort,
type RuntimeDeliveryDiagnosticsSink,
type RuntimeDeliveryRunStateReader,
type RuntimeDeliveryTeamChangeEmitter,
type RuntimeDeliveryTeamChangeEvent,
type RuntimeDeliveryVerifyResult,
} from '../../../../src/main/services/team/opencode/delivery/RuntimeDeliveryService';
let tempDir: string;
let now: Date;
let journal: ReturnType<typeof createRuntimeDeliveryJournalStore>;
let destination: FakeDestinationPort;
let diagnostics: FakeDiagnosticsSink;
let emitter: FakeTeamChangeEmitter;
let runState: FakeRunStateReader;
describe('RuntimeDeliveryService', () => {
beforeEach(async () => {
tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'opencode-runtime-delivery-'));
now = new Date('2026-04-21T12:00:00.000Z');
journal = createRuntimeDeliveryJournalStore({
filePath: path.join(tempDir, 'delivery-journal.json'),
clock: () => now,
});
destination = new FakeDestinationPort('member_inbox');
diagnostics = new FakeDiagnosticsSink();
emitter = new FakeTeamChangeEmitter();
runState = new FakeRunStateReader('run-1');
});
afterEach(async () => {
await fs.rm(tempDir, { recursive: true, force: true });
});
it('does not poison idempotency when crash happens before destination write', async () => {
destination.writeImpl = async () => {
throw new Error('simulated crash before write');
};
const service = createService();
await expect(service.deliver(envelope())).rejects.toThrow('simulated crash before write');
await expect(journal.get('delivery-1')).resolves.toMatchObject({
status: 'failed_retryable',
attempts: 1,
});
destination.writeImpl = undefined;
const retry = await service.deliver(envelope());
expect(retry).toMatchObject({
ok: true,
delivered: true,
reason: null,
});
await expect(journal.get('delivery-1')).resolves.toMatchObject({
status: 'committed',
attempts: 2,
committedLocation: expect.objectContaining({
kind: 'member_inbox',
memberName: 'Reviewer',
}),
});
expect(destination.messages).toHaveLength(1);
});
it('commits pending journal when destination already contains deterministic message id', async () => {
const message = envelope();
const destinationRef = resolveRuntimeDeliveryDestination(message);
const destinationMessageId = buildRuntimeDestinationMessageId(message);
await journal.begin({
idempotencyKey: message.idempotencyKey,
payloadHash: hashRuntimeDeliveryEnvelope(message),
runId: message.runId,
teamName: message.teamName,
fromMemberName: message.fromMemberName,
providerId: message.providerId,
runtimeSessionId: message.runtimeSessionId,
destination: destinationRef,
destinationMessageId,
now: now.toISOString(),
});
destination.messages.set(destinationMessageId, {
kind: 'member_inbox',
teamName: 'team-a',
memberName: 'Reviewer',
messageId: destinationMessageId,
});
const reconciler = new RuntimeDeliveryReconciler(
journal,
new RuntimeDeliveryDestinationRegistry([destination]),
diagnostics,
() => now
);
await reconciler.reconcileTeam('team-a');
await expect(journal.get(message.idempotencyKey)).resolves.toMatchObject({
status: 'committed',
committedLocation: expect.objectContaining({
messageId: destinationMessageId,
}),
});
expect(diagnostics.append).not.toHaveBeenCalled();
});
it('commits duplicate destination found without writing a second message', async () => {
const message = envelope();
const destinationMessageId = buildRuntimeDestinationMessageId(message);
destination.messages.set(destinationMessageId, {
kind: 'member_inbox',
teamName: 'team-a',
memberName: 'Reviewer',
messageId: destinationMessageId,
});
const service = createService();
const ack = await service.deliver(message);
expect(ack).toMatchObject({
ok: true,
delivered: false,
reason: 'duplicate_destination_found',
});
expect(destination.writeCalls).toBe(0);
await expect(journal.get(message.idempotencyKey)).resolves.toMatchObject({
status: 'committed',
});
});
it('rejects same idempotency key with different payload hash', async () => {
const service = createService();
await expect(service.deliver(envelope())).resolves.toMatchObject({
ok: true,
delivered: true,
});
await expect(
service.deliver({
...envelope(),
text: 'different text',
})
).resolves.toMatchObject({
ok: false,
delivered: false,
reason: 'idempotency_conflict',
});
expect(diagnostics.append).toHaveBeenCalledWith(
expect.objectContaining({
type: 'runtime_delivery_conflict',
severity: 'error',
})
);
expect(destination.messages).toHaveLength(1);
});
it('rejects stale run before journal reservation', async () => {
runState.currentRunId = 'new-run';
const service = createService();
await expect(service.deliver(envelope())).resolves.toEqual({
ok: false,
delivered: false,
reason: 'stale_run',
idempotencyKey: 'delivery-1',
});
await expect(journal.list()).resolves.toEqual([]);
expect(destination.writeCalls).toBe(0);
});
it('emits a bounded change event after verified commit', async () => {
const service = createService();
await service.deliver(envelope());
expect(emitter.events).toEqual([
{
type: 'runtime-delivery',
teamName: 'team-a',
data: {
kind: 'member_inbox',
},
},
]);
});
});
describe('RuntimeDeliveryReconciler', () => {
it('diagnoses pending records that are not visible in destination', async () => {
const tempDir = await fs.mkdtemp(path.join(os.tmpdir(), 'opencode-runtime-delivery-reconcile-'));
try {
const now = new Date('2026-04-21T12:00:00.000Z');
const journal = createRuntimeDeliveryJournalStore({
filePath: path.join(tempDir, 'delivery-journal.json'),
clock: () => now,
});
const message = envelope();
await journal.begin({
idempotencyKey: message.idempotencyKey,
payloadHash: hashRuntimeDeliveryEnvelope(message),
runId: message.runId,
teamName: message.teamName,
fromMemberName: message.fromMemberName,
providerId: message.providerId,
runtimeSessionId: message.runtimeSessionId,
destination: resolveRuntimeDeliveryDestination(message),
destinationMessageId: buildRuntimeDestinationMessageId(message),
now: now.toISOString(),
});
const diagnostics = new FakeDiagnosticsSink();
const reconciler = new RuntimeDeliveryReconciler(
journal,
new RuntimeDeliveryDestinationRegistry([new FakeDestinationPort('member_inbox')]),
diagnostics,
() => now
);
await reconciler.reconcileTeam('team-a');
expect(diagnostics.append).toHaveBeenCalledWith(
expect.objectContaining({
type: 'runtime_delivery_recovery_needed',
teamName: 'team-a',
runId: 'run-1',
severity: 'warning',
})
);
} finally {
await fs.rm(tempDir, { recursive: true, force: true });
}
});
});
function createService(): RuntimeDeliveryService {
return new RuntimeDeliveryService(
runState,
journal,
new RuntimeDeliveryDestinationRegistry([destination]),
diagnostics,
emitter,
() => now
);
}
function envelope(overrides: Partial<RuntimeDeliveryEnvelope> = {}): RuntimeDeliveryEnvelope {
return {
idempotencyKey: 'delivery-1',
runId: 'run-1',
teamName: 'team-a',
fromMemberName: 'Builder',
providerId: 'opencode',
runtimeSessionId: 'session-1',
to: { memberName: 'Reviewer' },
text: 'Please review this',
createdAt: '2026-04-21T12:00:00.000Z',
taskRefs: ['task-1'],
...overrides,
};
}
class FakeRunStateReader implements RuntimeDeliveryRunStateReader {
constructor(public currentRunId: string | null) {}
async getCurrentRunId(): Promise<string | null> {
return this.currentRunId;
}
}
class FakeDestinationPort implements RuntimeDeliveryDestinationPort {
readonly messages = new Map<string, RuntimeDeliveryLocation>();
writeCalls = 0;
writeImpl:
| ((input: {
envelope: RuntimeDeliveryEnvelope;
destinationMessageId: string;
}) => Promise<RuntimeDeliveryLocation>)
| undefined;
constructor(readonly kind: RuntimeDeliveryDestinationRef['kind']) {}
async write(input: {
envelope: RuntimeDeliveryEnvelope;
destinationMessageId: string;
}): Promise<RuntimeDeliveryLocation> {
this.writeCalls += 1;
if (this.writeImpl) {
return this.writeImpl(input);
}
const location: RuntimeDeliveryLocation = {
kind: 'member_inbox',
teamName: input.envelope.teamName,
memberName:
typeof input.envelope.to === 'object' && 'memberName' in input.envelope.to
? input.envelope.to.memberName
: 'unknown',
messageId: input.destinationMessageId,
};
this.messages.set(input.destinationMessageId, location);
return location;
}
async verify(input: {
destination: RuntimeDeliveryDestinationRef;
destinationMessageId: string;
}): Promise<RuntimeDeliveryVerifyResult> {
const location = this.messages.get(input.destinationMessageId) ?? null;
return {
found: location !== null,
location,
diagnostics: [],
};
}
buildChangeEvent(input: {
teamName: string;
location: RuntimeDeliveryLocation;
}): RuntimeDeliveryTeamChangeEvent {
return {
type: 'runtime-delivery',
teamName: input.teamName,
data: {
kind: input.location.kind,
},
};
}
}
class FakeDiagnosticsSink implements RuntimeDeliveryDiagnosticsSink {
readonly append = vi.fn(async () => {});
}
class FakeTeamChangeEmitter implements RuntimeDeliveryTeamChangeEmitter {
readonly events: RuntimeDeliveryTeamChangeEvent[] = [];
emit(event: RuntimeDeliveryTeamChangeEvent): void {
this.events.push(event);
}
}