fix(member-work-sync): recover stale nudge payload conflicts
This commit is contained in:
parent
2cee9cabaf
commit
8f4a4dd502
2 changed files with 227 additions and 23 deletions
|
|
@ -11,6 +11,7 @@ import type { MemberWorkSyncOutboxEnsureInput, MemberWorkSyncStatus } from '../.
|
|||
import type { MemberWorkSyncUseCaseDeps } from './ports';
|
||||
|
||||
const STATUS_ONLY_RECOVERY_INTENT_PREFIX = 'status-only';
|
||||
const AGENDA_SYNC_REFRESH_INTENT_PREFIX = 'agenda-sync-refresh';
|
||||
|
||||
function getReviewRequestEventIds(status: MemberWorkSyncStatus): string[] {
|
||||
return [
|
||||
|
|
@ -59,6 +60,22 @@ function shouldPlanStatusOnlyRecovery(input: {
|
|||
);
|
||||
}
|
||||
|
||||
function shouldPlanAgendaSyncRefreshRecovery(input: {
|
||||
status: MemberWorkSyncStatus;
|
||||
baseInput: MemberWorkSyncOutboxEnsureInput;
|
||||
existingItem: { agendaFingerprint: string; status: string };
|
||||
}): boolean {
|
||||
return (
|
||||
input.status.state === 'needs_sync' &&
|
||||
input.status.shadow?.wouldNudge === true &&
|
||||
input.baseInput.payload.workSyncIntent === 'agenda_sync' &&
|
||||
input.baseInput.payload.workSyncIntentKey === undefined &&
|
||||
input.existingItem.status === 'delivered' &&
|
||||
input.existingItem.agendaFingerprint === input.baseInput.agendaFingerprint &&
|
||||
input.status.report?.accepted !== true
|
||||
);
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncNudgeOutboxPlanResult {
|
||||
planned: boolean;
|
||||
code:
|
||||
|
|
@ -106,6 +123,65 @@ export class MemberWorkSyncNudgeOutboxPlanner {
|
|||
};
|
||||
}
|
||||
|
||||
private buildAgendaSyncRefreshRecoveryInput(
|
||||
status: MemberWorkSyncStatus,
|
||||
baseInput: MemberWorkSyncOutboxEnsureInput
|
||||
): MemberWorkSyncOutboxEnsureInput {
|
||||
const intentKey = `${AGENDA_SYNC_REFRESH_INTENT_PREFIX}:${status.agenda.fingerprint}:${baseInput.payloadHash}`;
|
||||
const payload = {
|
||||
...baseInput.payload,
|
||||
workSyncIntentKey: intentKey,
|
||||
text: [
|
||||
'Work sync refresh: the previous work-sync nudge was delivered before the current required report instructions.',
|
||||
'Use this latest nudge as the current required sync action.',
|
||||
baseInput.payload.text,
|
||||
].join('\n'),
|
||||
};
|
||||
|
||||
return {
|
||||
...baseInput,
|
||||
id: buildMemberWorkSyncNudgeId({
|
||||
teamName: status.teamName,
|
||||
memberName: status.memberName,
|
||||
agendaFingerprint: status.agenda.fingerprint,
|
||||
intentKey,
|
||||
}),
|
||||
payload,
|
||||
payloadHash: buildMemberWorkSyncNudgePayloadHash(this.deps.hash, payload),
|
||||
};
|
||||
}
|
||||
|
||||
private async planStatusOnlyRecovery(
|
||||
status: MemberWorkSyncStatus,
|
||||
baseInput: MemberWorkSyncOutboxEnsureInput
|
||||
): Promise<MemberWorkSyncNudgeOutboxPlanResult> {
|
||||
const outboxStore = this.deps.outboxStore;
|
||||
if (!outboxStore) {
|
||||
return { planned: false, code: 'outbox_unavailable' };
|
||||
}
|
||||
const recoveryInput = this.buildStatusOnlyRecoveryInput(status, baseInput);
|
||||
const recoveryResult = await outboxStore.ensurePending(recoveryInput);
|
||||
if (!recoveryResult.ok) {
|
||||
this.deps.logger?.warn('member work sync status-only recovery payload conflict', {
|
||||
teamName: status.teamName,
|
||||
memberName: status.memberName,
|
||||
outboxId: recoveryInput.id,
|
||||
existingPayloadHash: recoveryResult.existingPayloadHash,
|
||||
requestedPayloadHash: recoveryResult.requestedPayloadHash,
|
||||
});
|
||||
await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' });
|
||||
return { planned: false, code: 'payload_conflict' };
|
||||
}
|
||||
|
||||
const recoveryPlanned = recoveryResult.item.status !== 'delivered';
|
||||
const recoveryPlanResult = {
|
||||
planned: recoveryPlanned,
|
||||
code: recoveryResult.outcome,
|
||||
} as const;
|
||||
await this.appendPlanAudit(status, recoveryPlanResult);
|
||||
return recoveryPlanResult;
|
||||
}
|
||||
|
||||
async plan(status: MemberWorkSyncStatus): Promise<MemberWorkSyncNudgeOutboxPlanResult> {
|
||||
if (!this.deps.outboxStore) {
|
||||
return { planned: false, code: 'outbox_unavailable' };
|
||||
|
|
@ -196,6 +272,44 @@ export class MemberWorkSyncNudgeOutboxPlanner {
|
|||
await this.appendPlanAudit(status, { planned: false, code });
|
||||
return { planned: false, code };
|
||||
}
|
||||
if (
|
||||
shouldPlanAgendaSyncRefreshRecovery({
|
||||
status,
|
||||
baseInput: input,
|
||||
existingItem: result.item,
|
||||
})
|
||||
) {
|
||||
const recoveryInput = this.buildAgendaSyncRefreshRecoveryInput(status, input);
|
||||
const recoveryResult = await this.deps.outboxStore.ensurePending(recoveryInput);
|
||||
if (!recoveryResult.ok) {
|
||||
this.deps.logger?.warn('member work sync agenda-sync refresh payload conflict', {
|
||||
teamName: status.teamName,
|
||||
memberName: status.memberName,
|
||||
outboxId: recoveryInput.id,
|
||||
existingPayloadHash: recoveryResult.existingPayloadHash,
|
||||
requestedPayloadHash: recoveryResult.requestedPayloadHash,
|
||||
});
|
||||
await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' });
|
||||
return { planned: false, code: 'payload_conflict' };
|
||||
}
|
||||
if (
|
||||
shouldPlanStatusOnlyRecovery({
|
||||
status,
|
||||
baseInput: input,
|
||||
existingItemStatus: recoveryResult.item.status,
|
||||
})
|
||||
) {
|
||||
return this.planStatusOnlyRecovery(status, input);
|
||||
}
|
||||
|
||||
const recoveryPlanned = recoveryResult.item.status !== 'delivered';
|
||||
const recoveryPlanResult = {
|
||||
planned: recoveryPlanned,
|
||||
code: recoveryResult.outcome,
|
||||
} as const;
|
||||
await this.appendPlanAudit(status, recoveryPlanResult);
|
||||
return recoveryPlanResult;
|
||||
}
|
||||
this.deps.logger?.warn('member work sync nudge outbox payload conflict', {
|
||||
teamName: status.teamName,
|
||||
memberName: status.memberName,
|
||||
|
|
@ -220,27 +334,7 @@ export class MemberWorkSyncNudgeOutboxPlanner {
|
|||
existingItemStatus: result.item.status,
|
||||
})
|
||||
) {
|
||||
const recoveryInput = this.buildStatusOnlyRecoveryInput(status, input);
|
||||
const recoveryResult = await this.deps.outboxStore.ensurePending(recoveryInput);
|
||||
if (!recoveryResult.ok) {
|
||||
this.deps.logger?.warn('member work sync status-only recovery payload conflict', {
|
||||
teamName: status.teamName,
|
||||
memberName: status.memberName,
|
||||
outboxId: recoveryInput.id,
|
||||
existingPayloadHash: recoveryResult.existingPayloadHash,
|
||||
requestedPayloadHash: recoveryResult.requestedPayloadHash,
|
||||
});
|
||||
await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' });
|
||||
return { planned: false, code: 'payload_conflict' };
|
||||
}
|
||||
|
||||
const recoveryPlanned = recoveryResult.item.status !== 'delivered';
|
||||
const recoveryPlanResult = {
|
||||
planned: recoveryPlanned,
|
||||
code: recoveryResult.outcome,
|
||||
} as const;
|
||||
await this.appendPlanAudit(status, recoveryPlanResult);
|
||||
return recoveryPlanResult;
|
||||
return this.planStatusOnlyRecovery(status, input);
|
||||
}
|
||||
if (
|
||||
input.payload.workSyncIntent === 'review_pickup' &&
|
||||
|
|
|
|||
|
|
@ -172,11 +172,21 @@ class InMemoryStatusStore implements MemberWorkSyncStatusStorePort {
|
|||
class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort {
|
||||
readonly ensures: MemberWorkSyncOutboxEnsureInput[] = [];
|
||||
readonly items = new Map<string, MemberWorkSyncOutboxItem>();
|
||||
rejectPayloadConflicts = false;
|
||||
|
||||
async ensurePending(input: MemberWorkSyncOutboxEnsureInput) {
|
||||
this.ensures.push(input);
|
||||
const current = this.items.get(input.id);
|
||||
if (current) {
|
||||
if (this.rejectPayloadConflicts && current.payloadHash !== input.payloadHash) {
|
||||
return {
|
||||
ok: false as const,
|
||||
outcome: 'payload_conflict' as const,
|
||||
item: current,
|
||||
existingPayloadHash: current.payloadHash,
|
||||
requestedPayloadHash: input.payloadHash,
|
||||
};
|
||||
}
|
||||
if (current.status === 'superseded') {
|
||||
const revived = {
|
||||
...current,
|
||||
|
|
@ -569,7 +579,7 @@ describe('MemberWorkSync use cases', () => {
|
|||
messageId: 'unused',
|
||||
}),
|
||||
};
|
||||
const { auditEvents, deps } = createDeps({
|
||||
const { deps } = createDeps({
|
||||
items: [reviewPickupItem],
|
||||
providerId: 'opencode',
|
||||
outboxStore: outbox,
|
||||
|
|
@ -898,6 +908,106 @@ describe('MemberWorkSync use cases', () => {
|
|||
expect(inbox.inserted[1]?.messageId).toContain('status-only');
|
||||
});
|
||||
|
||||
it('creates an agenda-sync refresh recovery when a delivered nudge has a stale payload hash', async () => {
|
||||
const outbox = new InMemoryOutboxStore();
|
||||
const inbox = new InMemoryInboxNudge();
|
||||
outbox.rejectPayloadConflicts = true;
|
||||
const { auditEvents, deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox });
|
||||
store.phase2ReadinessState = 'shadow_ready';
|
||||
|
||||
const firstStatus = await new MemberWorkSyncReconciler(deps).execute(
|
||||
{
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
},
|
||||
{ reconciledBy: 'queue', triggerReasons: ['task_changed'] }
|
||||
);
|
||||
await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
|
||||
teamNames: ['team-a'],
|
||||
claimedBy: 'test-dispatcher',
|
||||
});
|
||||
|
||||
const baseId = `member-work-sync:team-a:bob:${firstStatus.agenda.fingerprint}`;
|
||||
const delivered = outbox.items.get(baseId);
|
||||
expect(delivered).toMatchObject({ status: 'delivered' });
|
||||
outbox.items.set(baseId, {
|
||||
...delivered!,
|
||||
payloadHash: 'legacy-payload-hash',
|
||||
payload: {
|
||||
...delivered!.payload,
|
||||
text: 'Legacy delivered work-sync nudge text.',
|
||||
},
|
||||
});
|
||||
|
||||
await new MemberWorkSyncReconciler(deps).execute(
|
||||
{
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
},
|
||||
{ reconciledBy: 'queue', triggerReasons: ['task_changed'] }
|
||||
);
|
||||
|
||||
const recoveryItems = [...outbox.items.values()].filter((item) =>
|
||||
item.payload.workSyncIntentKey?.startsWith('agenda-sync-refresh:')
|
||||
);
|
||||
expect(recoveryItems).toHaveLength(1);
|
||||
expect(recoveryItems[0]).toMatchObject({
|
||||
status: 'pending',
|
||||
agendaFingerprint: firstStatus.agenda.fingerprint,
|
||||
payload: {
|
||||
workSyncIntent: 'agenda_sync',
|
||||
taskRefs: [{ teamName: 'team-a', taskId: 'task-1', displayId: '11111111' }],
|
||||
},
|
||||
});
|
||||
expect(recoveryItems[0]?.id).toContain(firstStatus.agenda.fingerprint);
|
||||
expect(recoveryItems[0]?.payload.text).toContain('Work sync refresh');
|
||||
expect(recoveryItems[0]?.payload.text).toContain('current required sync action');
|
||||
expect(outbox.items.get(baseId)).toMatchObject({
|
||||
status: 'delivered',
|
||||
payloadHash: 'legacy-payload-hash',
|
||||
});
|
||||
expect(
|
||||
auditEvents.filter(
|
||||
(event) => event.event === 'nudge_skipped' && event.reason === 'payload_conflict'
|
||||
)
|
||||
).toHaveLength(0);
|
||||
|
||||
await new MemberWorkSyncReconciler(deps).execute(
|
||||
{
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
},
|
||||
{ reconciledBy: 'queue', triggerReasons: ['task_changed'] }
|
||||
);
|
||||
|
||||
expect(
|
||||
[...outbox.items.values()].filter((item) =>
|
||||
item.payload.workSyncIntentKey?.startsWith('agenda-sync-refresh:')
|
||||
)
|
||||
).toHaveLength(1);
|
||||
|
||||
await expect(
|
||||
new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
|
||||
teamNames: ['team-a'],
|
||||
claimedBy: 'test-dispatcher',
|
||||
})
|
||||
).resolves.toMatchObject({ claimed: 1, delivered: 1, superseded: 0 });
|
||||
|
||||
await new MemberWorkSyncReconciler(deps).execute(
|
||||
{
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
},
|
||||
{ reconciledBy: 'queue', triggerReasons: ['turn_settled'] }
|
||||
);
|
||||
|
||||
const statusOnlyItems = [...outbox.items.values()].filter((item) =>
|
||||
item.payload.workSyncIntentKey?.startsWith('status-only:')
|
||||
);
|
||||
expect(statusOnlyItems).toHaveLength(1);
|
||||
expect(statusOnlyItems[0]?.payload.text).toContain('Status-only recovery');
|
||||
});
|
||||
|
||||
it('marks review pickup delivered only after the delivery port confirms prompt acceptance', async () => {
|
||||
const outbox = new InMemoryOutboxStore();
|
||||
const inbox = new InMemoryInboxNudge();
|
||||
|
|
@ -918,7 +1028,7 @@ describe('MemberWorkSync use cases', () => {
|
|||
};
|
||||
},
|
||||
};
|
||||
const { auditEvents, deps } = createDeps({
|
||||
const { deps } = createDeps({
|
||||
items: [reviewPickupItem],
|
||||
providerId: 'opencode',
|
||||
outboxStore: outbox,
|
||||
|
|
|
|||
Loading…
Reference in a new issue