From 8f4a4dd50251ab7b1677be16a64521240c6ce8a2 Mon Sep 17 00:00:00 2001 From: 777genius Date: Mon, 25 May 2026 13:50:59 +0300 Subject: [PATCH] fix(member-work-sync): recover stale nudge payload conflicts --- .../MemberWorkSyncNudgeOutboxPlanner.ts | 136 +++++++++++++++--- .../core/MemberWorkSyncUseCases.test.ts | 114 ++++++++++++++- 2 files changed, 227 insertions(+), 23 deletions(-) diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts index a531819a..03f44fbd 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts @@ -11,6 +11,7 @@ import type { MemberWorkSyncOutboxEnsureInput, MemberWorkSyncStatus } from '../. import type { MemberWorkSyncUseCaseDeps } from './ports'; const STATUS_ONLY_RECOVERY_INTENT_PREFIX = 'status-only'; +const AGENDA_SYNC_REFRESH_INTENT_PREFIX = 'agenda-sync-refresh'; function getReviewRequestEventIds(status: MemberWorkSyncStatus): string[] { return [ @@ -59,6 +60,22 @@ function shouldPlanStatusOnlyRecovery(input: { ); } +function shouldPlanAgendaSyncRefreshRecovery(input: { + status: MemberWorkSyncStatus; + baseInput: MemberWorkSyncOutboxEnsureInput; + existingItem: { agendaFingerprint: string; status: string }; +}): boolean { + return ( + input.status.state === 'needs_sync' && + input.status.shadow?.wouldNudge === true && + input.baseInput.payload.workSyncIntent === 'agenda_sync' && + input.baseInput.payload.workSyncIntentKey === undefined && + input.existingItem.status === 'delivered' && + input.existingItem.agendaFingerprint === input.baseInput.agendaFingerprint && + input.status.report?.accepted !== true + ); +} + export interface MemberWorkSyncNudgeOutboxPlanResult { planned: boolean; code: @@ -106,6 +123,65 @@ export class MemberWorkSyncNudgeOutboxPlanner { }; } + private buildAgendaSyncRefreshRecoveryInput( + status: MemberWorkSyncStatus, + baseInput: MemberWorkSyncOutboxEnsureInput + ): MemberWorkSyncOutboxEnsureInput { + const intentKey = `${AGENDA_SYNC_REFRESH_INTENT_PREFIX}:${status.agenda.fingerprint}:${baseInput.payloadHash}`; + const payload = { + ...baseInput.payload, + workSyncIntentKey: intentKey, + text: [ + 'Work sync refresh: the previous work-sync nudge was delivered before the current required report instructions.', + 'Use this latest nudge as the current required sync action.', + baseInput.payload.text, + ].join('\n'), + }; + + return { + ...baseInput, + id: buildMemberWorkSyncNudgeId({ + teamName: status.teamName, + memberName: status.memberName, + agendaFingerprint: status.agenda.fingerprint, + intentKey, + }), + payload, + payloadHash: buildMemberWorkSyncNudgePayloadHash(this.deps.hash, payload), + }; + } + + private async planStatusOnlyRecovery( + status: MemberWorkSyncStatus, + baseInput: MemberWorkSyncOutboxEnsureInput + ): Promise { + const outboxStore = this.deps.outboxStore; + if (!outboxStore) { + return { planned: false, code: 'outbox_unavailable' }; + } + const recoveryInput = this.buildStatusOnlyRecoveryInput(status, baseInput); + const recoveryResult = await outboxStore.ensurePending(recoveryInput); + if (!recoveryResult.ok) { + this.deps.logger?.warn('member work sync status-only recovery payload conflict', { + teamName: status.teamName, + memberName: status.memberName, + outboxId: recoveryInput.id, + existingPayloadHash: recoveryResult.existingPayloadHash, + requestedPayloadHash: recoveryResult.requestedPayloadHash, + }); + await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' }); + return { planned: false, code: 'payload_conflict' }; + } + + const recoveryPlanned = recoveryResult.item.status !== 'delivered'; + const recoveryPlanResult = { + planned: recoveryPlanned, + code: recoveryResult.outcome, + } as const; + await this.appendPlanAudit(status, recoveryPlanResult); + return recoveryPlanResult; + } + async plan(status: MemberWorkSyncStatus): Promise { if (!this.deps.outboxStore) { return { planned: false, code: 'outbox_unavailable' }; @@ -196,6 +272,44 @@ export class MemberWorkSyncNudgeOutboxPlanner { await this.appendPlanAudit(status, { planned: false, code }); return { planned: false, code }; } + if ( + shouldPlanAgendaSyncRefreshRecovery({ + status, + baseInput: input, + existingItem: result.item, + }) + ) { + const recoveryInput = this.buildAgendaSyncRefreshRecoveryInput(status, input); + const recoveryResult = await this.deps.outboxStore.ensurePending(recoveryInput); + if (!recoveryResult.ok) { + this.deps.logger?.warn('member work sync agenda-sync refresh payload conflict', { + teamName: status.teamName, + memberName: status.memberName, + outboxId: recoveryInput.id, + existingPayloadHash: recoveryResult.existingPayloadHash, + requestedPayloadHash: recoveryResult.requestedPayloadHash, + }); + await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' }); + return { planned: false, code: 'payload_conflict' }; + } + if ( + shouldPlanStatusOnlyRecovery({ + status, + baseInput: input, + existingItemStatus: recoveryResult.item.status, + }) + ) { + return this.planStatusOnlyRecovery(status, input); + } + + const recoveryPlanned = recoveryResult.item.status !== 'delivered'; + const recoveryPlanResult = { + planned: recoveryPlanned, + code: recoveryResult.outcome, + } as const; + await this.appendPlanAudit(status, recoveryPlanResult); + return recoveryPlanResult; + } this.deps.logger?.warn('member work sync nudge outbox payload conflict', { teamName: status.teamName, memberName: status.memberName, @@ -220,27 +334,7 @@ export class MemberWorkSyncNudgeOutboxPlanner { existingItemStatus: result.item.status, }) ) { - const recoveryInput = this.buildStatusOnlyRecoveryInput(status, input); - const recoveryResult = await this.deps.outboxStore.ensurePending(recoveryInput); - if (!recoveryResult.ok) { - this.deps.logger?.warn('member work sync status-only recovery payload conflict', { - teamName: status.teamName, - memberName: status.memberName, - outboxId: recoveryInput.id, - existingPayloadHash: recoveryResult.existingPayloadHash, - requestedPayloadHash: recoveryResult.requestedPayloadHash, - }); - await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' }); - return { planned: false, code: 'payload_conflict' }; - } - - const recoveryPlanned = recoveryResult.item.status !== 'delivered'; - const recoveryPlanResult = { - planned: recoveryPlanned, - code: recoveryResult.outcome, - } as const; - await this.appendPlanAudit(status, recoveryPlanResult); - return recoveryPlanResult; + return this.planStatusOnlyRecovery(status, input); } if ( input.payload.workSyncIntent === 'review_pickup' && diff --git a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts index 621c9b21..66041ad4 100644 --- a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts +++ b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts @@ -172,11 +172,21 @@ class InMemoryStatusStore implements MemberWorkSyncStatusStorePort { class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort { readonly ensures: MemberWorkSyncOutboxEnsureInput[] = []; readonly items = new Map(); + rejectPayloadConflicts = false; async ensurePending(input: MemberWorkSyncOutboxEnsureInput) { this.ensures.push(input); const current = this.items.get(input.id); if (current) { + if (this.rejectPayloadConflicts && current.payloadHash !== input.payloadHash) { + return { + ok: false as const, + outcome: 'payload_conflict' as const, + item: current, + existingPayloadHash: current.payloadHash, + requestedPayloadHash: input.payloadHash, + }; + } if (current.status === 'superseded') { const revived = { ...current, @@ -569,7 +579,7 @@ describe('MemberWorkSync use cases', () => { messageId: 'unused', }), }; - const { auditEvents, deps } = createDeps({ + const { deps } = createDeps({ items: [reviewPickupItem], providerId: 'opencode', outboxStore: outbox, @@ -898,6 +908,106 @@ describe('MemberWorkSync use cases', () => { expect(inbox.inserted[1]?.messageId).toContain('status-only'); }); + it('creates an agenda-sync refresh recovery when a delivered nudge has a stale payload hash', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + outbox.rejectPayloadConflicts = true; + const { auditEvents, deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const firstStatus = await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + const baseId = `member-work-sync:team-a:bob:${firstStatus.agenda.fingerprint}`; + const delivered = outbox.items.get(baseId); + expect(delivered).toMatchObject({ status: 'delivered' }); + outbox.items.set(baseId, { + ...delivered!, + payloadHash: 'legacy-payload-hash', + payload: { + ...delivered!.payload, + text: 'Legacy delivered work-sync nudge text.', + }, + }); + + await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + + const recoveryItems = [...outbox.items.values()].filter((item) => + item.payload.workSyncIntentKey?.startsWith('agenda-sync-refresh:') + ); + expect(recoveryItems).toHaveLength(1); + expect(recoveryItems[0]).toMatchObject({ + status: 'pending', + agendaFingerprint: firstStatus.agenda.fingerprint, + payload: { + workSyncIntent: 'agenda_sync', + taskRefs: [{ teamName: 'team-a', taskId: 'task-1', displayId: '11111111' }], + }, + }); + expect(recoveryItems[0]?.id).toContain(firstStatus.agenda.fingerprint); + expect(recoveryItems[0]?.payload.text).toContain('Work sync refresh'); + expect(recoveryItems[0]?.payload.text).toContain('current required sync action'); + expect(outbox.items.get(baseId)).toMatchObject({ + status: 'delivered', + payloadHash: 'legacy-payload-hash', + }); + expect( + auditEvents.filter( + (event) => event.event === 'nudge_skipped' && event.reason === 'payload_conflict' + ) + ).toHaveLength(0); + + await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + + expect( + [...outbox.items.values()].filter((item) => + item.payload.workSyncIntentKey?.startsWith('agenda-sync-refresh:') + ) + ).toHaveLength(1); + + await expect( + new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }) + ).resolves.toMatchObject({ claimed: 1, delivered: 1, superseded: 0 }); + + await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['turn_settled'] } + ); + + const statusOnlyItems = [...outbox.items.values()].filter((item) => + item.payload.workSyncIntentKey?.startsWith('status-only:') + ); + expect(statusOnlyItems).toHaveLength(1); + expect(statusOnlyItems[0]?.payload.text).toContain('Status-only recovery'); + }); + it('marks review pickup delivered only after the delivery port confirms prompt acceptance', async () => { const outbox = new InMemoryOutboxStore(); const inbox = new InMemoryInboxNudge(); @@ -918,7 +1028,7 @@ describe('MemberWorkSync use cases', () => { }; }, }; - const { auditEvents, deps } = createDeps({ + const { deps } = createDeps({ items: [reviewPickupItem], providerId: 'opencode', outboxStore: outbox,