diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts index db6c3a8d..310c2f14 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts @@ -56,6 +56,29 @@ function isTurnSettledReconcile(status: MemberWorkSyncStatus): boolean { return status.shadow?.triggerReasons?.includes('turn_settled') === true; } +function parseTime(value: string | undefined): number | null { + if (!value) { + return null; + } + const time = Date.parse(value); + return Number.isFinite(time) ? time : null; +} + +function hasActiveAcceptedWorkLease(status: MemberWorkSyncStatus): boolean { + const report = status.report; + if ( + report?.accepted !== true || + report.agendaFingerprint !== status.agenda.fingerprint || + (report.state !== 'still_working' && report.state !== 'blocked') + ) { + return false; + } + + const evaluatedAtMs = parseTime(status.evaluatedAt); + const expiresAtMs = parseTime(report.expiresAt); + return evaluatedAtMs != null && expiresAtMs != null && expiresAtMs > evaluatedAtMs; +} + function shouldPlanStatusOnlyRecovery(input: { status: MemberWorkSyncStatus; baseInput: MemberWorkSyncOutboxEnsureInput; @@ -68,7 +91,7 @@ function shouldPlanStatusOnlyRecovery(input: { input.baseInput.payload.workSyncIntent === 'agenda_sync' && input.baseInput.payload.workSyncIntentKey === undefined && input.existingItemStatus === 'delivered' && - input.status.report?.accepted !== true + !hasActiveAcceptedWorkLease(input.status) ); } @@ -84,18 +107,10 @@ function shouldPlanAgendaSyncRefreshRecovery(input: { input.baseInput.payload.workSyncIntentKey === undefined && input.existingItem.status === 'delivered' && input.existingItem.agendaFingerprint === input.baseInput.agendaFingerprint && - input.status.report?.accepted !== true + !hasActiveAcceptedWorkLease(input.status) ); } -function parseTime(value: string | undefined): number | null { - if (!value) { - return null; - } - const time = Date.parse(value); - return Number.isFinite(time) ? time : null; -} - function isDeliveredStillStuckRecoveryReason(reason: MemberWorkSyncNudgeActivationReason): boolean { return ( reason === 'shadow_ready' || @@ -125,7 +140,7 @@ function shouldPlanDeliveredStillStuckRecovery(input: { input.baseInput.payload.workSyncIntentKey !== undefined || !recoverableExistingItem || input.existingItem.agendaFingerprint !== input.baseInput.agendaFingerprint || - input.status.report?.accepted === true || + hasActiveAcceptedWorkLease(input.status) || !isDeliveredStillStuckRecoveryReason(input.activationReason) ) { return false; diff --git a/src/features/member-work-sync/core/application/ports.ts b/src/features/member-work-sync/core/application/ports.ts index 64e2c4fa..e779e92c 100644 --- a/src/features/member-work-sync/core/application/ports.ts +++ b/src/features/member-work-sync/core/application/ports.ts @@ -71,6 +71,7 @@ export type MemberWorkSyncAuditEventName = | 'turn_settled_ignored' | 'queue_enqueued' | 'queue_coalesced' + | 'queue_retry_scheduled' | 'queue_reconciled' | 'queue_dropped' | 'reconcile_started' diff --git a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts index 4904265a..c0017130 100644 --- a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts +++ b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts @@ -110,6 +110,7 @@ interface OutboxIndexFile { type OutboxIndexRoute = OutboxIndexFile['items'][string]; type OutboxDueRoute = [string, OutboxIndexRoute]; +const MEMBER_WORK_SYNC_OUTBOX_CLAIM_STALE_MS = 5 * 60 * 1000; export interface JsonMemberWorkSyncStoreDeps { auditJournal?: MemberWorkSyncAuditJournalPort; @@ -117,8 +118,12 @@ export interface JsonMemberWorkSyncStoreDeps { now?: () => Date; } -function normalizeMemberKey(memberName: string): string { - return memberName.trim().toLowerCase(); +function normalizeMemberKey(memberName: unknown): string { + return typeof memberName === 'string' ? memberName.trim().toLowerCase() : ''; +} + +function normalizeTeamKey(teamName: unknown): string { + return typeof teamName === 'string' ? teamName.trim().toLowerCase() : ''; } function emptyMetricsIndex(): MetricsIndexFile { @@ -242,6 +247,46 @@ function canReviveOutboxItem(status: MemberWorkSyncOutboxItem['status']): boolea return status === 'superseded' || (!isOutboxTerminal(status) && status !== 'pending'); } +function isReportIntentOwnedBy( + teamName: string, + memberName: string, + intent: MemberWorkSyncReportIntent +): boolean { + return ( + normalizeTeamKey(intent.teamName) === normalizeTeamKey(teamName) && + normalizeMemberKey(intent.memberName) === normalizeMemberKey(memberName) + ); +} + +function isOutboxItemOwnedBy( + teamName: string, + memberName: string, + item: MemberWorkSyncOutboxItem +): boolean { + return ( + normalizeTeamKey(item.teamName) === normalizeTeamKey(teamName) && + normalizeMemberKey(item.memberName) === normalizeMemberKey(memberName) + ); +} + +function parseIsoMs(value: string | undefined): number | null { + if (!value) { + return null; + } + const ms = Date.parse(value); + return Number.isFinite(ms) ? ms : null; +} + +function isStaleClaim(claimedAt: string | undefined, nowIso: string): boolean { + const claimedAtMs = parseIsoMs(claimedAt); + const nowMs = parseIsoMs(nowIso); + return ( + claimedAtMs != null && + nowMs != null && + nowMs - claimedAtMs >= MEMBER_WORK_SYNC_OUTBOX_CLAIM_STALE_MS + ); +} + function applyOptionalNextAttemptAt( item: MemberWorkSyncOutboxItem, nextAttemptAt: string | undefined @@ -254,6 +299,9 @@ function applyOptionalNextAttemptAt( } function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boolean { + if (item.status === 'claimed') { + return isStaleClaim(item.claimedAt ?? item.updatedAt, nowIso); + } if (item.status !== 'pending' && item.status !== 'failed_retryable') { return false; } @@ -263,14 +311,23 @@ function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boo return item.nextAttemptAt <= nowIso; } +function canClaimOutboxRoute(route: OutboxIndexRoute, nowIso: string): boolean { + if (route.status === 'claimed') { + return isStaleClaim(route.updatedAt, nowIso); + } + return ( + (route.status === 'pending' || route.status === 'failed_retryable') && + (!route.nextAttemptAt || route.nextAttemptAt <= nowIso) + ); +} + function getDueOutboxRoutes( index: OutboxIndexFile, nowIso: string, limit: number ): OutboxDueRoute[] { return Object.entries(index.items) - .filter(([, route]) => route.status === 'pending' || route.status === 'failed_retryable') - .filter(([, route]) => !route.nextAttemptAt || route.nextAttemptAt <= nowIso) + .filter(([, route]) => canClaimOutboxRoute(route, nowIso)) .sort((left, right) => { const leftTime = left[1].nextAttemptAt ?? left[1].updatedAt; const rightTime = right[1].nextAttemptAt ?? right[1].updatedAt; @@ -623,10 +680,10 @@ export class JsonMemberWorkSyncStore staleIndex = true; } } - const missingIndexedPending = staleIndex + const unindexedOrStaleIndexedPending = staleIndex ? false - : await this.hasMissingIndexedPendingReport(teamName, index); - if (staleIndex || missingIndexedPending) { + : await this.hasUnindexedOrStaleIndexedPendingReport(teamName, index); + if (staleIndex || unindexedOrStaleIndexedPending) { await this.enqueue(teamName, async () => { await withFileLock(this.paths.getPendingReportsIndexPath(teamName), async () => { index = await this.repairPendingReportsIndex(teamName); @@ -666,29 +723,58 @@ export class JsonMemberWorkSyncStore if (!route) { return; } - await withFileLock( - this.paths.getMemberReportsPath(teamName, route.memberName), - async () => { - const reports = await this.readMemberReportsFile(teamName, route.memberName); - const current = reports.intents[id]; - if (current?.status !== 'pending') { - return; + const updateRoute = async ( + targetRoute: PendingReportsIndexFile['items'][string] + ): Promise => { + let staleRoute = false; + await withFileLock( + this.paths.getMemberReportsPath(teamName, targetRoute.memberName), + async () => { + const reports = await this.readMemberReportsFile(teamName, targetRoute.memberName); + const current = reports.intents[id]; + if (!current) { + delete index.items[id]; + staleRoute = true; + return; + } + if (!isReportIntentOwnedBy(teamName, targetRoute.memberName, current)) { + delete index.items[id]; + staleRoute = true; + return; + } + if (current.status !== 'pending') { + return; + } + const next: MemberWorkSyncReportIntent = { + ...current, + status: result.status, + resultCode: result.resultCode, + processedAt: result.processedAt, + }; + reports.intents[id] = next; + await this.writeMemberReportsFile(teamName, targetRoute.memberName, reports); + index.items[id] = toPendingReportIndexItem( + next, + this.paths.getMemberKey(next.memberName) + ); + await this.writePendingReportsIndexFile(teamName, index); } - reports.intents[id] = { - ...current, - status: result.status, - resultCode: result.resultCode, - processedAt: result.processedAt, - }; - await this.writeMemberReportsFile(teamName, route.memberName, reports); - index.items[id] = { - ...route, - status: result.status, - processedAt: result.processedAt, - }; - await this.writePendingReportsIndexFile(teamName, index); + ); + return staleRoute; + }; + + let staleRoute = await updateRoute(route); + if (staleRoute) { + index = await this.repairPendingReportsIndex(teamName); + const repairedRoute = index.items[id]; + if (!repairedRoute) { + return; } - ); + staleRoute = await updateRoute(repairedRoute); + if (staleRoute) { + await this.repairPendingReportsIndex(teamName); + } + } }); }); } @@ -801,45 +887,67 @@ export class JsonMemberWorkSyncStore } let dueRoutes = getDueOutboxRoutes(index, input.nowIso, input.limit); if ( - dueRoutes.length > 0 && dueRoutes.length < Math.max(0, input.limit) && - (await this.hasMissingIndexedDueOutboxItem(input.teamName, index, input.nowIso)) + (await this.hasUnindexedOrStaleIndexedDueOutboxItem(input.teamName, index, input.nowIso)) ) { index = await this.repairOutboxIndex(input.teamName); dueRoutes = getDueOutboxRoutes(index, input.nowIso, input.limit); } - let staleIndex = false; - for (const [id, route] of dueRoutes) { - await withFileLock( - this.paths.getMemberOutboxPath(input.teamName, route.memberName), - async () => { - const outbox = await this.readMemberOutboxFile(input.teamName, route.memberName); - const item = outbox.items[id]; - if (!item || !canClaimOutboxItem(item, input.nowIso)) { - delete index.items[id]; - staleIndex = true; - return; - } - const next: MemberWorkSyncOutboxItem = { - ...item, - status: 'claimed', - attemptGeneration: item.attemptGeneration + 1, - claimedBy: input.claimedBy, - claimedAt: input.nowIso, - updatedAt: input.nowIso, - }; - delete next.lastError; - outbox.items[id] = next; - await this.writeMemberOutboxFile(input.teamName, route.memberName, outbox); - index.items[id] = toOutboxIndexItem(next, route.memberKey); - claimed.push(next); + const claimRoutes = async (routes: OutboxDueRoute[]): Promise => { + let staleIndex = false; + for (const [id, route] of routes) { + if (claimed.length >= Math.max(0, input.limit)) { + break; } - ); - } + await withFileLock( + this.paths.getMemberOutboxPath(input.teamName, route.memberName), + async () => { + const outbox = await this.readMemberOutboxFile(input.teamName, route.memberName); + const item = outbox.items[id]; + if (!item || !canClaimOutboxItem(item, input.nowIso)) { + delete index.items[id]; + staleIndex = true; + return; + } + const memberKey = this.paths.getMemberKey(item.memberName); + if (!isOutboxItemOwnedBy(input.teamName, route.memberName, item)) { + delete index.items[id]; + staleIndex = true; + return; + } + const next: MemberWorkSyncOutboxItem = { + ...item, + status: 'claimed', + attemptGeneration: item.attemptGeneration + 1, + claimedBy: input.claimedBy, + claimedAt: input.nowIso, + updatedAt: input.nowIso, + }; + delete next.nextAttemptAt; + delete next.lastError; + outbox.items[id] = next; + await this.writeMemberOutboxFile(input.teamName, route.memberName, outbox); + index.items[id] = toOutboxIndexItem(next, memberKey); + claimed.push(next); + } + ); + } + return staleIndex; + }; + let staleIndex = await claimRoutes(dueRoutes); if (staleIndex) { index = await this.repairOutboxIndex(input.teamName); + const remainingLimit = Math.max(0, input.limit) - claimed.length; + dueRoutes = + remainingLimit > 0 ? getDueOutboxRoutes(index, input.nowIso, remainingLimit) : []; + staleIndex = dueRoutes.length > 0 ? await claimRoutes(dueRoutes) : false; + if (staleIndex) { + await this.repairOutboxIndex(input.teamName); + } else if (dueRoutes.length > 0) { + await this.writeOutboxIndexFile(input.teamName, index); + } } else if (dueRoutes.length > 0) { await this.writeOutboxIndexFile(input.teamName, index); } @@ -996,7 +1104,8 @@ export class JsonMemberWorkSyncStore (item) => item.payload.workSyncIntentKey === intentKey && item.updatedAt >= input.sinceIso && - item.status !== 'failed_terminal' + item.status !== 'failed_terminal' && + item.status !== 'superseded' ) .sort((left, right) => right.updatedAt.localeCompare(left.updatedAt)); const latest = matches[0]; @@ -1171,17 +1280,48 @@ export class JsonMemberWorkSyncStore if (!route) { return; } - await withFileLock(this.paths.getMemberOutboxPath(teamName, route.memberName), async () => { - const outbox = await this.readMemberOutboxFile(teamName, route.memberName); - const next = updater(outbox.items[id]); - if (!next) { + const updateRoute = async (targetRoute: OutboxIndexRoute): Promise => { + let staleRoute = false; + await withFileLock( + this.paths.getMemberOutboxPath(teamName, targetRoute.memberName), + async () => { + const outbox = await this.readMemberOutboxFile(teamName, targetRoute.memberName); + const current = outbox.items[id]; + if (!current) { + delete index.items[id]; + staleRoute = true; + return; + } + if (!isOutboxItemOwnedBy(teamName, targetRoute.memberName, current)) { + delete index.items[id]; + staleRoute = true; + return; + } + const next = updater(current); + if (!next) { + return; + } + outbox.items[id] = next; + await this.writeMemberOutboxFile(teamName, targetRoute.memberName, outbox); + index.items[id] = toOutboxIndexItem(next, this.paths.getMemberKey(next.memberName)); + await this.writeOutboxIndexFile(teamName, index); + } + ); + return staleRoute; + }; + + let staleRoute = await updateRoute(route); + if (staleRoute) { + index = await this.repairOutboxIndex(teamName); + const repairedRoute = index.items[id]; + if (!repairedRoute) { return; } - outbox.items[id] = next; - await this.writeMemberOutboxFile(teamName, route.memberName, outbox); - index.items[id] = toOutboxIndexItem(next, route.memberKey); - await this.writeOutboxIndexFile(teamName, index); - }); + staleRoute = await updateRoute(repairedRoute); + if (staleRoute) { + await this.repairOutboxIndex(teamName); + } + } }); }); } @@ -1251,11 +1391,17 @@ export class JsonMemberWorkSyncStore for (const { memberName, reports } of await this.scanMemberReports(teamName)) { const memberKey = this.paths.getMemberKey(memberName); for (const intent of Object.values(reports.intents)) { + if (!isReportIntentOwnedBy(teamName, memberName, intent)) { + continue; + } index.items[intent.id] = toPendingReportIndexItem(intent, memberKey); repairedMembers.add(intent.memberName); } } for (const intent of Object.values((await this.readLegacyPendingFile(teamName)).intents)) { + if (!isReportIntentOwnedBy(teamName, intent.memberName, intent)) { + continue; + } const memberKey = this.paths.getMemberKey(intent.memberName); if (!index.items[intent.id]) { await withFileLock( @@ -1300,11 +1446,17 @@ export class JsonMemberWorkSyncStore for (const { memberName, outbox } of await this.scanMemberOutboxes(teamName)) { const memberKey = this.paths.getMemberKey(memberName); for (const item of Object.values(outbox.items)) { + if (!isOutboxItemOwnedBy(teamName, memberName, item)) { + continue; + } index.items[item.id] = toOutboxIndexItem(item, memberKey); repairedMembers.add(item.memberName); } } for (const item of Object.values((await this.readLegacyOutboxFile(teamName)).items)) { + if (!isOutboxItemOwnedBy(teamName, item.memberName, item)) { + continue; + } const memberKey = this.paths.getMemberKey(item.memberName); if (!index.items[item.id]) { await withFileLock(this.paths.getMemberOutboxPath(teamName, item.memberName), async () => { @@ -1382,26 +1534,54 @@ export class JsonMemberWorkSyncStore return reports; } - private async hasMissingIndexedPendingReport( + private async hasUnindexedOrStaleIndexedPendingReport( teamName: string, index: PendingReportsIndexFile ): Promise { - const indexedIds = new Set(Object.keys(index.items)); - for (const { reports } of await this.scanMemberReports(teamName)) { + const routes = index.items; + for (const { memberName, reports } of await this.scanMemberReports(teamName)) { for (const intent of Object.values(reports.intents)) { - if (intent.status === 'pending' && !indexedIds.has(intent.id)) { + if (!isReportIntentOwnedBy(teamName, memberName, intent)) { + continue; + } + const route = routes[intent.id]; + if ( + intent.status === 'pending' && + !this.isCurrentPendingReportRoute(teamName, route, intent) + ) { return true; } } } for (const intent of Object.values((await this.readLegacyPendingFile(teamName)).intents)) { - if (intent.status === 'pending' && !indexedIds.has(intent.id)) { + if (!isReportIntentOwnedBy(teamName, intent.memberName, intent)) { + continue; + } + const route = routes[intent.id]; + if ( + intent.status === 'pending' && + !this.isCurrentPendingReportRoute(teamName, route, intent) + ) { return true; } } return false; } + private isCurrentPendingReportRoute( + teamName: string, + route: PendingReportsIndexFile['items'][string] | undefined, + intent: MemberWorkSyncReportIntent + ): boolean { + return ( + !!route && + normalizeTeamKey(intent.teamName) === normalizeTeamKey(teamName) && + route.status === 'pending' && + normalizeMemberKey(route.memberName) === normalizeMemberKey(intent.memberName) && + route.memberKey === this.paths.getMemberKey(intent.memberName) + ); + } + private async scanMemberOutboxes( teamName: string ): Promise<{ memberName: string; outbox: MemberOutboxFile }[]> { @@ -1412,27 +1592,56 @@ export class JsonMemberWorkSyncStore return outboxes; } - private async hasMissingIndexedDueOutboxItem( + private async hasUnindexedOrStaleIndexedDueOutboxItem( teamName: string, index: OutboxIndexFile, nowIso: string ): Promise { - const indexedIds = new Set(Object.keys(index.items)); - for (const { outbox } of await this.scanMemberOutboxes(teamName)) { + const routes = index.items; + for (const { memberName, outbox } of await this.scanMemberOutboxes(teamName)) { for (const item of Object.values(outbox.items)) { - if (canClaimOutboxItem(item, nowIso) && !indexedIds.has(item.id)) { + if (!isOutboxItemOwnedBy(teamName, memberName, item)) { + continue; + } + const route = routes[item.id]; + if ( + canClaimOutboxItem(item, nowIso) && + !this.isCurrentDueOutboxRoute(teamName, route, item, nowIso) + ) { return true; } } } for (const item of Object.values((await this.readLegacyOutboxFile(teamName)).items)) { - if (canClaimOutboxItem(item, nowIso) && !indexedIds.has(item.id)) { + if (!isOutboxItemOwnedBy(teamName, item.memberName, item)) { + continue; + } + const route = routes[item.id]; + if ( + canClaimOutboxItem(item, nowIso) && + !this.isCurrentDueOutboxRoute(teamName, route, item, nowIso) + ) { return true; } } return false; } + private isCurrentDueOutboxRoute( + teamName: string, + route: OutboxIndexRoute | undefined, + item: MemberWorkSyncOutboxItem, + nowIso: string + ): boolean { + return ( + !!route && + normalizeTeamKey(item.teamName) === normalizeTeamKey(teamName) && + normalizeMemberKey(route.memberName) === normalizeMemberKey(item.memberName) && + route.memberKey === this.paths.getMemberKey(item.memberName) && + canClaimOutboxRoute(route, nowIso) + ); + } + private async appendAudit(input: { teamName: string; memberName: string; diff --git a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts index 7365c92f..446bb726 100644 --- a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts +++ b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts @@ -61,6 +61,7 @@ interface QueueItem { maxRunAt: number; triggerReasons: Set; triggerReasonCounts: Map; + retryCount: number; recovery?: MemberWorkSyncReconcileContext['recovery']; } @@ -87,6 +88,8 @@ export interface MemberWorkSyncEventQueueDeps { quietWindowMs?: number; triggerTiming?: Partial>>; concurrency?: number; + retryDelayMs?: number; + maxRetryAttempts?: number; now?: () => number; nowIso?: () => string; auditJournal?: MemberWorkSyncAuditJournalPort; @@ -107,6 +110,8 @@ export class MemberWorkSyncEventQueue { private readonly inFlight = new Set>(); private readonly quietWindowMs: number; private readonly concurrency: number; + private readonly retryDelayMs: number; + private readonly maxRetryAttempts: number; private readonly now: () => number; private readonly nowIso: () => string; private timer: ReturnType | null = null; @@ -122,6 +127,8 @@ export class MemberWorkSyncEventQueue { constructor(private readonly deps: MemberWorkSyncEventQueueDeps) { this.quietWindowMs = deps.quietWindowMs ?? 90_000; this.concurrency = Math.max(1, deps.concurrency ?? 2); + this.retryDelayMs = Math.max(0, deps.retryDelayMs ?? 30_000); + this.maxRetryAttempts = Math.max(0, deps.maxRetryAttempts ?? 3); this.now = deps.now ?? Date.now; this.nowIso = deps.nowIso ?? (() => new Date().toISOString()); } @@ -209,6 +216,7 @@ export class MemberWorkSyncEventQueue { ? Math.min(existing.runAt, runAt) : Math.min(Math.max(existing.runAt, runAt), existing.maxRunAt); incrementReasonCount(existing.triggerReasonCounts, input.triggerReason); + existing.retryCount = 0; this.counters.coalesced += 1; this.appendAudit({ teamName, @@ -230,6 +238,7 @@ export class MemberWorkSyncEventQueue { maxRunAt: now + timing.maxCoalesceWaitMs, triggerReasons: new Set([input.triggerReason]), triggerReasonCounts: new Map([[input.triggerReason, 1]]), + retryCount: 0, ...(input.recovery ? { recovery: input.recovery } : {}), }); this.counters.enqueued += 1; @@ -366,8 +375,10 @@ export class MemberWorkSyncEventQueue { }; this.running.set(key, running); + let failed = false; const promise = this.executeItem(key, item, running) .catch((error: unknown) => { + failed = true; this.counters.failed += 1; this.deps.logger?.warn('member work sync queue reconcile failed', { teamName: item.teamName, @@ -380,6 +391,8 @@ export class MemberWorkSyncEventQueue { this.inFlight.delete(promise); if (running.rerunRequested && !this.stopped) { this.enqueueFollowUp(item, running); + } else if (failed && !this.stopped) { + this.enqueueRetryAfterFailure(key, item, running); } this.pump(); }); @@ -387,6 +400,53 @@ export class MemberWorkSyncEventQueue { this.inFlight.add(promise); } + private enqueueRetryAfterFailure(key: string, item: QueueItem, running: RunningItem): void { + if (item.retryCount >= this.maxRetryAttempts) { + this.counters.dropped += 1; + this.appendAudit({ + teamName: item.teamName, + memberName: item.memberName, + event: 'queue_dropped', + source: 'event_queue', + reason: 'reconcile_failed_max_retries', + triggerReasons: [...running.triggerReasons].sort(), + metadata: { + retryCount: item.retryCount, + maxRetryAttempts: this.maxRetryAttempts, + }, + }); + return; + } + + const now = this.now(); + const retryCount = item.retryCount + 1; + const recovery = running.recovery ?? item.recovery; + this.items.set(key, { + ...item, + lastQueuedAt: now, + runAt: now + this.retryDelayMs, + maxRunAt: now + this.retryDelayMs, + triggerReasons: new Set(running.triggerReasons), + triggerReasonCounts: new Map(item.triggerReasonCounts), + retryCount, + ...(recovery ? { recovery } : {}), + }); + this.appendAudit({ + teamName: item.teamName, + memberName: item.memberName, + event: 'queue_retry_scheduled', + source: 'event_queue', + reason: 'reconcile_failed', + triggerReasons: [...running.triggerReasons].sort(), + metadata: { + retryCount, + retryDelayMs: this.retryDelayMs, + maxRetryAttempts: this.maxRetryAttempts, + }, + }); + this.schedule(); + } + private enqueueFollowUp(item: QueueItem, running: RunningItem): void { const reasons = [...running.triggerReasons].sort(); const recovery = running.recovery ?? item.recovery; diff --git a/src/main/index.ts b/src/main/index.ts index af6bb697..91ae9cc8 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -2146,17 +2146,8 @@ async function initializeServices(): Promise { return Number.isFinite(expiresAtMs) && expiresAtMs > Date.now(); }); scheduleStartupTask(() => { - void teamDataService - .listTeams() - .then(async (teams) => { - const lifecycleActiveTeamNames = teams - .filter( - (team) => - !team.deletedAt && - (teamProvisioningService.isTeamAlive(team.teamName) || - teamProvisioningService.hasProvisioningRun(team.teamName)) - ) - .map((team) => team.teamName); + void listMemberWorkSyncLifecycleActiveTeamNames() + .then(async (lifecycleActiveTeamNames) => { await memberWorkSyncFeature?.replayPendingReports(lifecycleActiveTeamNames); await memberWorkSyncFeature?.enqueueStartupScan(lifecycleActiveTeamNames); }) diff --git a/src/main/services/team/TeamProvisioningService.ts b/src/main/services/team/TeamProvisioningService.ts index 9d44b02c..5466b0ed 100644 --- a/src/main/services/team/TeamProvisioningService.ts +++ b/src/main/services/team/TeamProvisioningService.ts @@ -6162,6 +6162,27 @@ export class TeamProvisioningService { return enabled; } + private async markOpenCodePromptLedgerFailedTerminal(input: { + ledger: OpenCodePromptDeliveryLedgerStore; + id: string; + reason: string; + diagnostics?: string[]; + failedAt: string; + eventContext?: Record; + }): Promise { + const failed = await input.ledger.markFailedTerminal({ + id: input.id, + reason: input.reason, + ...(input.diagnostics ? { diagnostics: input.diagnostics } : {}), + failedAt: input.failedAt, + }); + this.logOpenCodePromptDeliveryEvent('opencode_prompt_delivery_terminal_failure', failed, { + reason: input.reason, + ...(input.eventContext ?? {}), + }); + return failed; + } + private async findOpenCodeVisibleReplyByRelayOfMessageId(input: { teamName: string; replyRecipient?: string | null; @@ -7243,7 +7264,8 @@ export class TeamProvisioningService { input.ledgerRecord.maxSessionRefreshAttempts ?? OPENCODE_PROMPT_DELIVERY_SESSION_REFRESH_MAX_ATTEMPTS; if ((input.ledgerRecord.sessionRefreshAttempts ?? 0) >= maxSessionRefreshAttempts) { - return await input.ledger.markFailedTerminal({ + return await this.markOpenCodePromptLedgerFailedTerminal({ + ledger: input.ledger, id: input.ledgerRecord.id, reason: 'opencode_session_stale_observe_loop_after_accepted_prompt', diagnostics: [ @@ -7251,6 +7273,11 @@ export class TeamProvisioningService { `OpenCode session stayed stale while observing an accepted prompt after ${maxSessionRefreshAttempts} attempt(s).`, ], failedAt: now, + eventContext: { + observeOnlyAfterAcceptedPrompt: true, + sessionRefreshAttempts: input.ledgerRecord.sessionRefreshAttempts ?? 0, + maxSessionRefreshAttempts, + }, }); } const delayMs = OPENCODE_PROMPT_DELIVERY_RETRY_DELAY_MS; @@ -7287,7 +7314,8 @@ export class TeamProvisioningService { input.ledgerRecord.maxSessionRefreshAttempts ?? OPENCODE_PROMPT_DELIVERY_SESSION_REFRESH_MAX_ATTEMPTS; if ((input.ledgerRecord.sessionRefreshAttempts ?? 0) >= maxSessionRefreshAttempts) { - return await input.ledger.markFailedTerminal({ + return await this.markOpenCodePromptLedgerFailedTerminal({ + ledger: input.ledger, id: input.ledgerRecord.id, reason: 'opencode_session_refresh_loop_after_resolved_behavior_changed', diagnostics: [ @@ -7295,6 +7323,11 @@ export class TeamProvisioningService { `OpenCode session stayed stale after ${maxSessionRefreshAttempts} session refresh attempt(s).`, ], failedAt: now, + eventContext: { + retry: true, + sessionRefreshAttempts: input.ledgerRecord.sessionRefreshAttempts ?? 0, + maxSessionRefreshAttempts, + }, }); } const delayMs = this.getOpenCodeDeliveryNextDelayMs({ @@ -7338,10 +7371,12 @@ export class TeamProvisioningService { input.ledgerRecord.attempts >= input.ledgerRecord.maxAttempts && !canScheduleNoAssistantRecoveryRetry ) { - return await input.ledger.markFailedTerminal({ + return await this.markOpenCodePromptLedgerFailedTerminal({ + ledger: input.ledger, id: input.ledgerRecord.id, reason: input.reason, failedAt: now, + eventContext: { retry: input.retry }, }); } const delayMs = this.getOpenCodeDeliveryNextDelayMs({ @@ -23493,6 +23528,29 @@ export class TeamProvisioningService { .catch(() => []); const targetMessage = inboxMessages.find((message) => message.messageId === onlyMessageId); if (targetMessage?.read) { + if (targetMessage.messageKind === 'member_work_sync_nudge') { + this.scheduleOpenCodeMemberInboxDeliveryWake({ + teamName, + memberName, + messageId: onlyMessageId, + delayMs: 500, + }); + const diagnostic = `opencode_work_sync_read_commit_waiting_for_active_relay: ${onlyMessageId}`; + return { + relayed: 0, + attempted: 1, + delivered: 0, + failed: 0, + lastDelivery: { + delivered: true, + accepted: false, + responsePending: true, + reason: 'opencode_work_sync_read_commit_waiting_for_active_relay', + diagnostics: [diagnostic], + }, + diagnostics: [diagnostic], + }; + } return { relayed: 0, attempted: 1, @@ -23576,7 +23634,7 @@ export class TeamProvisioningService { const onlyMessageId = options.onlyMessageId?.trim(); if (onlyMessageId) { const targetMessage = inboxMessages.find((message) => message.messageId === onlyMessageId); - if (targetMessage?.read) { + if (targetMessage?.read && targetMessage.messageKind !== 'member_work_sync_nudge') { return { relayed: 0, attempted: 1, @@ -23603,8 +23661,13 @@ export class TeamProvisioningService { } const unread = inboxMessages .filter((message): message is InboxMessage & { messageId: string } => { - if (message.read) return false; if (onlyMessageId && message.messageId !== onlyMessageId) return false; + if ( + message.read && + (!onlyMessageId || message.messageKind !== 'member_work_sync_nudge') + ) { + return false; + } if (typeof message.text !== 'string' || message.text.trim().length === 0) return false; return this.hasStableMessageId(message); }) @@ -23813,17 +23876,14 @@ export class TeamProvisioningService { pendingRecord ); } - failedRecord = await promptLedger.markFailedTerminal({ + failedRecord = await this.markOpenCodePromptLedgerFailedTerminal({ + ledger: promptLedger, id: pendingRecord.id, reason: attachmentPayloads.reason, diagnostics: attachmentPayloads.diagnostics, failedAt: nowIso(), + eventContext: { attachmentPayloadUnavailable: true }, }); - this.logOpenCodePromptDeliveryEvent( - 'opencode_prompt_delivery_response_observed', - failedRecord, - { attachmentPayloadUnavailable: true } - ); } catch (error) { const diagnostic = `opencode_inbox_attachment_terminal_ledger_failed: ${getErrorMessage( error diff --git a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts index 92b983a7..25f747de 100644 --- a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts +++ b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts @@ -902,9 +902,7 @@ describe('MemberWorkSync use cases', () => { busySignal: { isBusy: async () => { busyChecks += 1; - return busyChecks > 1 - ? { busy: true, reason: 'recent_tool_activity' } - : { busy: false }; + return busyChecks > 1 ? { busy: true, reason: 'recent_tool_activity' } : { busy: false }; }, }, }); @@ -1093,6 +1091,31 @@ describe('MemberWorkSync use cases', () => { expect(recoverySummary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 }); expect(inbox.inserted).toHaveLength(2); expect(inbox.inserted[1]?.messageId).toContain('agenda-sync-still-stuck'); + + clock.set('2026-04-29T01:02:00.000Z'); + store.metricsGeneratedAt = '2026-04-29T01:02:00.000Z'; + await reconciler.execute( + { + teamName: 'team-a', + memberName: 'team-lead', + }, + { reconciledBy: 'queue', triggerReasons: ['manual_refresh'] } + ); + + const recoveryItems = [...outbox.items.values()].filter((item) => + item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:') + ); + expect(recoveryItems).toHaveLength(2); + expect(new Set(recoveryItems.map((item) => item.id)).size).toBe(2); + + const secondRecoverySummary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(secondRecoverySummary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 }); + expect(inbox.inserted).toHaveLength(3); + expect(inbox.inserted[2]?.messageId).toContain('agenda-sync-still-stuck'); }); it('creates an agenda-sync refresh recovery when a delivered nudge has a stale payload hash', async () => { @@ -1396,6 +1419,184 @@ describe('MemberWorkSync use cases', () => { expect(inbox.inserted).toHaveLength(3); }); + it('creates a delivered-still-stuck recovery after an accepted still_working lease expires', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + const { clock, deps, store } = createDeps({ + providerId: 'codex', + outboxStore: outbox, + inboxNudge: inbox, + }); + store.phase2ReadinessState = 'shadow_ready'; + + const reconciler = new MemberWorkSyncReconciler(deps); + const reporter = new MemberWorkSyncReporter(deps); + const firstStatus = await reconciler.execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + const baseId = `member-work-sync:team-a:bob:${firstStatus.agenda.fingerprint}`; + expect(outbox.items.get(baseId)).toMatchObject({ status: 'delivered' }); + + await reporter.execute({ + teamName: 'team-a', + memberName: 'bob', + state: 'still_working', + agendaFingerprint: firstStatus.agenda.fingerprint, + reportToken: firstStatus.reportToken, + taskIds: ['task-1'], + leaseTtlMs: 120_000, + source: 'test', + }); + + clock.set('2026-04-29T00:10:00.000Z'); + store.phase2ReadinessState = 'blocked'; + store.phase2ReadinessReasons = ['would_nudge_rate_high']; + store.metricsGeneratedAt = '2026-04-29T00:10:00.000Z'; + store.recentEvents = [ + { + id: 'old-report-accepted', + teamName: 'team-a', + memberName: 'bob', + kind: 'report_accepted', + state: 'still_working', + agendaFingerprint: firstStatus.agenda.fingerprint, + recordedAt: '2026-04-29T00:01:00.000Z', + actionableCount: 1, + providerId: 'codex', + }, + { + id: 'needs-sync-after-lease-expired', + teamName: 'team-a', + memberName: 'bob', + kind: 'status_evaluated', + state: 'needs_sync', + agendaFingerprint: firstStatus.agenda.fingerprint, + recordedAt: '2026-04-29T00:04:00.000Z', + actionableCount: 1, + providerId: 'codex', + }, + ]; + + const expiredStatus = await reconciler.execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['manual_refresh'] } + ); + + expect(expiredStatus).toMatchObject({ + state: 'needs_sync', + diagnostics: expect.arrayContaining(['report_lease_expired']), + }); + expect(expiredStatus.report).toBeUndefined(); + const recovery = [...outbox.items.values()].find((item) => + item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:') + ); + expect(recovery).toMatchObject({ + status: 'pending', + agendaFingerprint: firstStatus.agenda.fingerprint, + }); + + const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(summary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 }); + expect(inbox.inserted).toHaveLength(2); + expect(inbox.inserted[1]?.messageId).toContain('agenda-sync-still-stuck'); + }); + + it('creates a delivered-still-stuck recovery for mixed review pickup and native work under noisy metrics', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + const inProgressItem: MemberWorkSyncActionableWorkItem = { + ...workItem, + reason: 'owned_in_progress_task', + evidence: { + status: 'in_progress', + owner: 'bob', + }, + }; + const { clock, deps, store } = createDeps({ + items: [reviewPickupItem, inProgressItem], + providerId: 'codex', + outboxStore: outbox, + inboxNudge: inbox, + }); + store.phase2ReadinessState = 'shadow_ready'; + + const reconciler = new MemberWorkSyncReconciler(deps); + const firstStatus = await reconciler.execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + const baseId = `member-work-sync:team-a:bob:${firstStatus.agenda.fingerprint}`; + expect(outbox.items.get(baseId)).toMatchObject({ status: 'delivered' }); + + clock.set('2026-04-29T00:10:00.000Z'); + store.phase2ReadinessState = 'blocked'; + store.phase2ReadinessReasons = ['would_nudge_rate_high']; + store.metricsGeneratedAt = '2026-04-29T00:10:00.000Z'; + store.recentEvents = [ + { + id: 'mixed-needs-sync-stable', + teamName: 'team-a', + memberName: 'bob', + kind: 'status_evaluated', + state: 'needs_sync', + agendaFingerprint: firstStatus.agenda.fingerprint, + recordedAt: '2026-04-29T00:02:00.000Z', + actionableCount: 2, + providerId: 'codex', + }, + ]; + + await reconciler.execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['manual_refresh'] } + ); + + const recovery = [...outbox.items.values()].find((item) => + item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:') + ); + expect(recovery).toMatchObject({ + status: 'pending', + agendaFingerprint: firstStatus.agenda.fingerprint, + }); + expect(recovery?.payload.text).toContain('still no accepted member_work_sync_report'); + + const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(summary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 }); + expect(inbox.inserted).toHaveLength(2); + expect(inbox.inserted[1]?.messageId).toContain('agenda-sync-still-stuck'); + }); + it('records an existing delivered agenda nudge as skipped before still-stuck recovery age', async () => { const outbox = new InMemoryOutboxStore(); const inbox = new InMemoryInboxNudge(); diff --git a/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts b/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts index 87553f82..74dc094b 100644 --- a/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts +++ b/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts @@ -305,6 +305,107 @@ describe('JsonMemberWorkSyncStore', () => { ).toEqual(['bob', 'tom']); }); + it('repairs a stale processed pending-report index route when member report is pending', async () => { + const request = { + teamName: 'team-a', + memberName: 'bob', + state: 'still_working' as const, + agendaFingerprint: 'agenda:v1:abc', + reportToken: 'wrs:v1.test', + source: 'mcp' as const, + }; + + await store.appendPendingReport(request, 'control_api_unavailable'); + const [intent] = await store.listPendingReports('team-a'); + await store.markPendingReportProcessed('team-a', intent!.id, { + status: 'accepted', + resultCode: 'accepted', + processedAt: '2026-04-29T00:01:00.000Z', + }); + + const reportsPath = join(memberWorkSyncDir(root, 'team-a', 'bob'), 'reports.json'); + const reports = JSON.parse(await readFile(reportsPath, 'utf8')); + reports.intents[intent!.id] = { + ...reports.intents[intent!.id], + status: 'pending', + }; + delete reports.intents[intent!.id].resultCode; + delete reports.intents[intent!.id].processedAt; + await writeFile(reportsPath, JSON.stringify(reports), 'utf8'); + + const pending = await store.listPendingReports('team-a'); + expect(pending).toHaveLength(1); + expect(pending[0]).toMatchObject({ + id: intent!.id, + memberName: 'bob', + status: 'pending', + }); + }); + + it('repairs stale pending-report update routes before marking processed', async () => { + const request = { + teamName: 'team-a', + memberName: 'bob', + state: 'still_working' as const, + agendaFingerprint: 'agenda:v1:abc', + reportToken: 'wrs:v1.test', + source: 'mcp' as const, + }; + + await store.appendPendingReport(request, 'control_api_unavailable'); + await mkdir(memberWorkSyncDir(root, 'team-a', 'tom'), { recursive: true }); + const [intent] = await store.listPendingReports('team-a'); + await writeFile( + join(memberWorkSyncDir(root, 'team-a', 'tom'), 'reports.json'), + JSON.stringify({ + schemaVersion: 2, + intents: { + [intent!.id]: { + ...intent!, + teamName: 'other-team', + memberName: 'tom', + }, + }, + }), + 'utf8' + ); + const indexPath = join( + root, + 'team-a', + '.member-work-sync', + 'indexes', + 'pending-reports-index.json' + ); + const index = JSON.parse(await readFile(indexPath, 'utf8')); + index.items[intent!.id] = { + ...index.items[intent!.id], + memberKey: 'tom', + memberName: 'tom', + }; + await writeFile(indexPath, JSON.stringify(index), 'utf8'); + + await store.markPendingReportProcessed('team-a', intent!.id, { + status: 'accepted', + resultCode: 'accepted', + processedAt: '2026-04-29T00:01:00.000Z', + }); + + const reports = JSON.parse( + await readFile(join(memberWorkSyncDir(root, 'team-a', 'bob'), 'reports.json'), 'utf8') + ); + expect(reports.intents[intent!.id]).toMatchObject({ + memberName: 'bob', + status: 'accepted', + resultCode: 'accepted', + }); + const repaired = JSON.parse(await readFile(indexPath, 'utf8')); + expect(repaired.items[intent!.id]).toMatchObject({ + memberKey: 'bob', + memberName: 'bob', + status: 'accepted', + }); + }); + it('records bounded shadow metrics from status writes', async () => { await store.write(makeStatus({})); await store.write( @@ -761,6 +862,34 @@ describe('JsonMemberWorkSyncStore', () => { ).resolves.toBeNull(); }); + it('ignores superseded rows for logical recovery lookup', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:superseded', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:superseded', + payloadHash: 'hash-a', + payload: makeNudgePayload({ workSyncIntentKey: 'proof-missing:message-1' }), + nowIso: '2026-04-29T00:00:00.000Z', + }; + await store.ensurePending(input); + await store.markSuperseded({ + teamName: 'team-a', + id: input.id, + reason: 'status_no_longer_matches_outbox', + nowIso: '2026-04-29T00:01:00.000Z', + }); + + await expect( + store.findRecentRecoveryByIntent({ + teamName: 'team-a', + memberName: 'bob', + intentKey: 'proof-missing:message-1', + sinceIso: '2026-04-29T00:00:00.000Z', + }) + ).resolves.toBeNull(); + }); + it('claims due outbox items and fences terminal updates by attempt generation', async () => { const input = { id: 'member-work-sync:team-a:bob:agenda:v1:abc', @@ -838,6 +967,56 @@ describe('JsonMemberWorkSyncStore', () => { }); }); + it('reclaims stale claimed outbox items without waiting for a fresh reconcile', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:stale-claim', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:stale-claim', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + await store.ensurePending(input); + + const [claimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + expect(claimed).toMatchObject({ + id: input.id, + status: 'claimed', + attemptGeneration: 1, + claimedBy: 'dispatcher-a', + claimedAt: '2026-04-29T00:01:00.000Z', + }); + + await expect( + store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-b', + nowIso: '2026-04-29T00:05:59.000Z', + limit: 1, + }) + ).resolves.toEqual([]); + + const [reclaimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-b', + nowIso: '2026-04-29T00:06:00.000Z', + limit: 1, + }); + expect(reclaimed).toMatchObject({ + id: input.id, + status: 'claimed', + attemptGeneration: 2, + claimedBy: 'dispatcher-b', + claimedAt: '2026-04-29T00:06:00.000Z', + }); + }); + it('claims due outbox items from the index without scanning unrelated member outboxes', async () => { const bobInput = { id: 'member-work-sync:team-a:bob:agenda:v1:abc', @@ -1210,6 +1389,221 @@ describe('JsonMemberWorkSyncStore', () => { expect(claimed.map((item) => item.memberName).sort()).toEqual(['bob', 'tom']); }); + it('rewrites stale due outbox member keys while claiming', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:abc', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:abc', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + await store.ensurePending(input); + const indexPath = join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json'); + const index = JSON.parse(await readFile(indexPath, 'utf8')); + index.items[input.id] = { + ...index.items[input.id], + memberKey: 'tom', + memberName: 'bob', + }; + await writeFile(indexPath, JSON.stringify(index), 'utf8'); + + const [claimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + + expect(claimed).toMatchObject({ + id: input.id, + memberName: 'bob', + status: 'claimed', + }); + const repaired = JSON.parse(await readFile(indexPath, 'utf8')); + expect(repaired.items[input.id]).toMatchObject({ + memberKey: 'bob', + memberName: 'bob', + status: 'claimed', + }); + }); + + it('repairs stale outbox update routes before marking failures', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:abc', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:abc', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + await store.ensurePending(input); + await mkdir(memberWorkSyncDir(root, 'team-a', 'tom'), { recursive: true }); + const [claimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + await writeFile( + join(memberWorkSyncDir(root, 'team-a', 'tom'), 'outbox.json'), + JSON.stringify({ + schemaVersion: 2, + items: { + [input.id]: { + ...input, + teamName: 'other-team', + memberName: 'tom', + status: 'claimed', + attemptGeneration: claimed!.attemptGeneration, + claimedBy: 'dispatcher-a', + claimedAt: '2026-04-29T00:01:00.000Z', + updatedAt: '2026-04-29T00:01:00.000Z', + }, + }, + }), + 'utf8' + ); + const indexPath = join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json'); + const index = JSON.parse(await readFile(indexPath, 'utf8')); + index.items[input.id] = { + ...index.items[input.id], + memberKey: 'tom', + memberName: 'tom', + }; + await writeFile(indexPath, JSON.stringify(index), 'utf8'); + + await store.markFailed({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimed!.attemptGeneration, + error: 'delivery failed', + retryable: true, + nextAttemptAt: '2026-04-29T00:10:00.000Z', + nowIso: '2026-04-29T00:02:00.000Z', + }); + + const memberOutbox = JSON.parse( + await readFile(join(memberWorkSyncDir(root, 'team-a', 'bob'), 'outbox.json'), 'utf8') + ); + expect(memberOutbox.items[input.id]).toMatchObject({ + status: 'failed_retryable', + lastError: 'delivery failed', + nextAttemptAt: '2026-04-29T00:10:00.000Z', + }); + const repaired = JSON.parse(await readFile(indexPath, 'utf8')); + expect(repaired.items[input.id]).toMatchObject({ + memberKey: 'bob', + memberName: 'bob', + status: 'failed_retryable', + }); + }); + + it('repairs wrong-member due outbox index routes before returning a limited claim', async () => { + const bobInput = { + id: 'member-work-sync:team-a:bob:agenda:v1:abc', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:abc', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + await store.ensurePending(bobInput); + await mkdir(memberWorkSyncDir(root, 'team-a', 'tom'), { recursive: true }); + await writeFile( + join(memberWorkSyncDir(root, 'team-a', 'tom'), 'outbox.json'), + JSON.stringify({ + schemaVersion: 2, + items: { + [bobInput.id]: { + ...bobInput, + teamName: 'other-team', + memberName: 'tom', + status: 'pending', + createdAt: '2026-04-29T00:00:00.000Z', + updatedAt: '2026-04-29T00:00:00.000Z', + }, + }, + }), + 'utf8' + ); + const indexPath = join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json'); + const index = JSON.parse(await readFile(indexPath, 'utf8')); + index.items[bobInput.id] = { + ...index.items[bobInput.id], + memberKey: 'tom', + memberName: 'tom', + }; + await writeFile(indexPath, JSON.stringify(index), 'utf8'); + + const claimed = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + + expect(claimed.map((item) => item.memberName)).toEqual(['bob']); + const repaired = JSON.parse(await readFile(indexPath, 'utf8')); + expect(repaired.items[bobInput.id]).toMatchObject({ + memberKey: 'bob', + memberName: 'bob', + status: 'claimed', + }); + }); + + it('repairs stale terminal outbox index routes when member-scoped item is due', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:abc', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:abc', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + await store.ensurePending(input); + const [claimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + await store.markDelivered({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimed!.attemptGeneration, + deliveredMessageId: input.id, + nowIso: '2026-04-29T00:02:00.000Z', + }); + + const memberOutboxPath = join(memberWorkSyncDir(root, 'team-a', 'bob'), 'outbox.json'); + const memberOutbox = JSON.parse(await readFile(memberOutboxPath, 'utf8')); + memberOutbox.items[input.id] = { + ...memberOutbox.items[input.id], + status: 'pending', + updatedAt: '2026-04-29T00:03:00.000Z', + }; + delete memberOutbox.items[input.id].deliveredMessageId; + await writeFile(memberOutboxPath, JSON.stringify(memberOutbox), 'utf8'); + + const [reclaimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-b', + nowIso: '2026-04-29T00:04:00.000Z', + limit: 1, + }); + expect(reclaimed).toMatchObject({ + id: input.id, + status: 'claimed', + attemptGeneration: 2, + claimedBy: 'dispatcher-b', + }); + }); + it('falls back to legacy v1 status and materializes legacy outbox during claim', async () => { const auditEvents: MemberWorkSyncAuditEvent[] = []; store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(root), { diff --git a/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts b/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts index 9185af3a..99b56878 100644 --- a/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts +++ b/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts @@ -1,6 +1,5 @@ -import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; - import { MemberWorkSyncEventQueue } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; describe('MemberWorkSyncEventQueue', () => { beforeEach(() => { @@ -370,4 +369,100 @@ describe('MemberWorkSyncEventQueue', () => { expect(reconciles).toHaveLength(2); await queue.stop(); }); + + it('retries a failed reconcile with bounded backoff', async () => { + const reconciles: unknown[] = []; + const auditEvents: string[] = []; + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 1, + retryDelayMs: 10, + maxRetryAttempts: 2, + reconcile: async (request) => { + reconciles.push(request); + if (reconciles.length === 1) { + throw new Error('transient'); + } + }, + isTeamActive: () => true, + auditJournal: { + append: async (event) => { + auditEvents.push(event.event); + }, + }, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'turn_settled' }); + await vi.advanceTimersByTimeAsync(1); + + expect(reconciles).toHaveLength(1); + expect(queue.getDiagnostics()).toMatchObject({ failed: 1, queued: 1, reconciled: 0 }); + expect(auditEvents).toEqual(['queue_enqueued', 'queue_retry_scheduled']); + + await vi.advanceTimersByTimeAsync(9); + expect(reconciles).toHaveLength(1); + + await vi.advanceTimersByTimeAsync(1); + expect(reconciles).toHaveLength(2); + expect(queue.getDiagnostics()).toMatchObject({ failed: 1, queued: 0, reconciled: 1 }); + + await queue.stop(); + }); + + it('drops a failed reconcile after the retry budget is exhausted', async () => { + const reconcile = vi.fn(async () => { + throw new Error('still failing'); + }); + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 1, + retryDelayMs: 10, + maxRetryAttempts: 1, + reconcile, + isTeamActive: () => true, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'turn_settled' }); + await vi.advanceTimersByTimeAsync(1); + await vi.advanceTimersByTimeAsync(10); + await vi.advanceTimersByTimeAsync(1_000); + + expect(reconcile).toHaveBeenCalledTimes(2); + expect(queue.getDiagnostics()).toMatchObject({ + dropped: 1, + failed: 2, + queued: 0, + reconciled: 0, + }); + + await queue.stop(); + }); + + it('resets retry budget when a fresh event joins a queued retry item', async () => { + const reconcile = vi.fn(async () => { + throw new Error('still failing'); + }); + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 1, + retryDelayMs: 10, + maxRetryAttempts: 1, + reconcile, + isTeamActive: () => true, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'turn_settled' }); + await vi.advanceTimersByTimeAsync(1); + expect(queue.getDiagnostics()).toMatchObject({ failed: 1, queued: 1, dropped: 0 }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' }); + await vi.advanceTimersByTimeAsync(10); + + expect(reconcile).toHaveBeenCalledTimes(2); + expect(queue.getDiagnostics()).toMatchObject({ + dropped: 0, + failed: 2, + queued: 1, + reconciled: 0, + }); + + await queue.stop(); + }); }); diff --git a/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts b/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts index 90cc0a30..729ece23 100644 --- a/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts +++ b/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts @@ -1,6 +1,5 @@ -import { describe, expect, it, vi } from 'vitest'; - import { MemberWorkSyncNudgeDispatchScheduler } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler'; +import { describe, expect, it, vi } from 'vitest'; describe('MemberWorkSyncNudgeDispatchScheduler', () => { it('dispatches due nudges for unique active teams without overlapping runs', async () => { diff --git a/test/main/services/team/TeamProvisioningService.test.ts b/test/main/services/team/TeamProvisioningService.test.ts index 86609500..0554258f 100644 --- a/test/main/services/team/TeamProvisioningService.test.ts +++ b/test/main/services/team/TeamProvisioningService.test.ts @@ -711,9 +711,7 @@ type TeamProvisioningServicePrivateHarness = { applyProcessBootstrapTransportOverlay: ( input: Record ) => Record; - reconcilePersistedLaunchState: ( - teamName: string - ) => Promise<{ + reconcilePersistedLaunchState: (teamName: string) => Promise<{ snapshot: null; statuses: Record; }>; @@ -1152,6 +1150,104 @@ describe('TeamProvisioningService', () => { expect(nextRecord.status).toBe('retry_scheduled'); }); + it('emits a terminal failure event when exhausted work-sync proof retries fail', async () => { + const svc = new TeamProvisioningService(); + const taskRefs = [{ taskId: 'task-1', displayId: 'task-1', teamName: 'team-a' }]; + const record = { + id: 'opencode-prompt:work-sync-proof-missing', + teamName: 'team-a', + memberName: 'atlas', + laneId: 'secondary:opencode:atlas', + runId: 'run-1', + runtimeSessionId: 'ses-1', + inboxMessageId: 'msg-work-sync-proof-missing', + inboxTimestamp: '2026-05-18T08:31:00.000Z', + source: 'watcher', + messageKind: 'member_work_sync_nudge', + workSyncIntent: 'agenda_sync', + replyRecipient: 'team-lead', + actionMode: 'do', + taskRefs, + payloadHash: 'sha256:work-sync', + status: 'retry_scheduled', + responseState: 'responded_non_visible_tool', + attempts: 3, + maxAttempts: 3, + acceptanceUnknown: false, + nextAttemptAt: null, + lastAttemptAt: '2026-05-18T08:31:30.000Z', + lastObservedAt: '2026-05-18T08:31:45.000Z', + acceptedAt: '2026-05-18T08:31:30.000Z', + respondedAt: '2026-05-18T08:31:45.000Z', + failedAt: null, + inboxReadCommittedAt: null, + inboxReadCommitError: null, + prePromptCursor: null, + postPromptCursor: null, + deliveredUserMessageId: 'delivered-1', + observedAssistantMessageId: 'assistant-1', + observedAssistantPreview: null, + observedToolCallNames: ['member_work_sync_status'], + observedVisibleMessageId: null, + visibleReplyMessageId: null, + visibleReplyInbox: null, + visibleReplyCorrelation: null, + lastReason: 'member_work_sync_report_required', + diagnostics: ['member_work_sync_report_required'], + createdAt: '2026-05-18T08:31:00.000Z', + updatedAt: '2026-05-18T08:31:45.000Z', + }; + const failedRecord = { + ...record, + status: 'failed_terminal', + failedAt: '2026-05-18T08:32:00.000Z', + updatedAt: '2026-05-18T08:32:00.000Z', + }; + const ledger = { + markFailedTerminal: vi.fn(async () => failedRecord), + markNextAttemptScheduled: vi.fn(), + }; + const harness = svc as unknown as { + scheduleOpenCodePromptDeliveryWatchdog: ReturnType; + logOpenCodePromptDeliveryEvent: ReturnType; + scheduleOpenCodePromptLedgerFollowUp(input: { + ledger: typeof ledger; + ledgerRecord: typeof record; + teamName: string; + memberName: string; + retry: boolean; + reason: string; + }): Promise; + }; + harness.scheduleOpenCodePromptDeliveryWatchdog = vi.fn(); + harness.logOpenCodePromptDeliveryEvent = vi.fn(); + + const nextRecord = await harness.scheduleOpenCodePromptLedgerFollowUp({ + ledger, + ledgerRecord: record, + teamName: 'team-a', + memberName: 'atlas', + retry: true, + reason: 'member_work_sync_report_required', + }); + + expect(nextRecord).toBe(failedRecord); + expect(ledger.markFailedTerminal).toHaveBeenCalledWith( + expect.objectContaining({ + id: record.id, + reason: 'member_work_sync_report_required', + }) + ); + expect(harness.logOpenCodePromptDeliveryEvent).toHaveBeenCalledWith( + 'opencode_prompt_delivery_terminal_failure', + failedRecord, + expect.objectContaining({ + reason: 'member_work_sync_report_required', + retry: true, + }) + ); + }); + it('uses stamped OpenCode session-refresh evidence instead of stale historical diagnostics', async () => { const svc = new TeamProvisioningService(); (svc as any).scheduleOpenCodePromptDeliveryWatchdog = vi.fn(); @@ -16725,8 +16821,7 @@ describe('TeamProvisioningService', () => { return launchIdentity; }); (svc as any).buildTeamRuntimeLaunchArgsPlan = vi.fn(async (input) => ({ - fastModeArgs: - input.launchIdentity === launchIdentity ? ['--test-codex-fast-mode'] : [], + fastModeArgs: input.launchIdentity === launchIdentity ? ['--test-codex-fast-mode'] : [], runtimeTurnSettledHookArgs: [], providerArgs: [], settingsArgs: [], @@ -22260,9 +22355,10 @@ describe('TeamProvisioningService', () => { expect(bobOutcome).toBeNull(); // The transcript tail is parsed once and shared: a single cache entry for the // file rather than one parse per member. - expect((svc as unknown as Record>).parsedBootstrapTranscriptTailCache.size).toBe( - 1 - ); + expect( + (svc as unknown as Record>).parsedBootstrapTranscriptTailCache + .size + ).toBe(1); }); it('caches persisted bootstrap transcript outcome lookup between close polling reads', async () => { @@ -24523,12 +24619,10 @@ describe('TeamProvisioningService', () => { scheduled: true, reason: 'scheduled', })); - const sendMessageToRun = vi.fn( - async (targetRun: LeadRelayPriorityTestRun, message: string) => { - deliveredPrompt = message; - targetRun.leadRelayCapture?.resolveOnce(''); - } - ); + const sendMessageToRun = vi.fn(async (targetRun: LeadRelayPriorityTestRun, message: string) => { + deliveredPrompt = message; + targetRun.leadRelayCapture?.resolveOnce(''); + }); harness.runs.set(run.runId, run); harness.aliveRunByTeam.set(teamName, run.runId); @@ -25854,23 +25948,22 @@ describe('TeamProvisioningService', () => { it('does not keep healed confirmed-bootstrap status alive when refreshed runtime metadata is an error', async () => { const svc = new TeamProvisioningService(); const harness = privateHarness(svc); - harness.getLiveTeamAgentRuntimeMetadata = vi.fn( - () => - Promise.resolve( - new Map([ - [ - 'tom', - { - alive: false, - model: 'sonnet', - livenessKind: 'not_found', - pidSource: 'process_table', - runtimeDiagnostic: 'Runtime process crashed', - runtimeDiagnosticSeverity: 'error', - }, - ], - ]) - ) + harness.getLiveTeamAgentRuntimeMetadata = vi.fn(() => + Promise.resolve( + new Map([ + [ + 'tom', + { + alive: false, + model: 'sonnet', + livenessKind: 'not_found', + pidSource: 'process_table', + runtimeDiagnostic: 'Runtime process crashed', + runtimeDiagnosticSeverity: 'error', + }, + ], + ]) + ) ); const result = await harness.attachLiveRuntimeMetadataToStatuses('signal-ops', { diff --git a/test/main/services/team/TeamProvisioningServiceRelay.test.ts b/test/main/services/team/TeamProvisioningServiceRelay.test.ts index 288bc228..5c16db1c 100644 --- a/test/main/services/team/TeamProvisioningServiceRelay.test.ts +++ b/test/main/services/team/TeamProvisioningServiceRelay.test.ts @@ -3445,6 +3445,9 @@ Messages: }, ]); const deliverSpy = vi.spyOn(service, 'deliverOpenCodeMemberMessage'); + const logSpy = vi + .spyOn(service as any, 'logOpenCodePromptDeliveryEvent') + .mockImplementation(() => undefined); const relay = await service.relayOpenCodeMemberInboxMessages(teamName, 'jack'); const expectedReason = 'opencode_inbox_attachment_payload_unavailable: att-1'; @@ -3469,6 +3472,18 @@ Messages: status: 'failed_terminal', lastReason: expectedReason, }); + expect(logSpy).toHaveBeenCalledWith( + 'opencode_prompt_delivery_terminal_failure', + expect.objectContaining({ + inboxMessageId: 'opencode-attachment-1', + status: 'failed_terminal', + lastReason: expectedReason, + }), + expect.objectContaining({ + attachmentPayloadUnavailable: true, + reason: expectedReason, + }) + ); }); it('rebuilds missing OpenCode prompt ledger rows from unread inbox on startup scan', async () => { @@ -3721,6 +3736,101 @@ Messages: } }); + it('keeps an already-read work-sync nudge pending when it is queued behind an active relay', async () => { + vi.useFakeTimers(); + const service = new TeamProvisioningService(); + const teamName = 'my-team'; + try { + hoisted.files.set( + `/mock/teams/${teamName}/config.json`, + JSON.stringify({ + name: teamName, + projectPath: '/tmp/my-team', + members: [ + { name: 'team-lead', agentType: 'team-lead' }, + { name: 'jack', role: 'developer', providerId: 'opencode', model: 'openrouter/test' }, + ], + }) + ); + seedMemberInbox(teamName, 'jack', [ + { + from: 'bob', + to: 'jack', + text: 'Older watcher message.', + timestamp: '2026-02-23T17:00:00.000Z', + read: false, + messageId: 'opencode-inflight-old', + }, + ]); + + const oldDeliveryStarted = createDeferred(); + const releaseOldDelivery = createDeferred(); + vi.spyOn(service, 'deliverOpenCodeMemberMessage').mockImplementation( + async (_teamName, input) => { + if (input.messageId === 'opencode-inflight-old') { + oldDeliveryStarted.resolve(undefined); + await releaseOldDelivery.promise; + } + return { delivered: true, diagnostics: [] }; + } + ); + const wakeSpy = vi + .spyOn(service, 'scheduleOpenCodeMemberInboxDeliveryWake') + .mockImplementation(() => undefined); + + const watcherRelay = service.relayOpenCodeMemberInboxMessages(teamName, 'jack'); + await oldDeliveryStarted.promise; + seedMemberInbox(teamName, 'jack', [ + { + from: 'bob', + to: 'jack', + text: 'Older watcher message.', + timestamp: '2026-02-23T17:00:00.000Z', + read: false, + messageId: 'opencode-inflight-old', + }, + { + from: 'system', + to: 'jack', + text: 'Call member_work_sync_status, then member_work_sync_report.', + timestamp: '2026-02-23T17:00:01.000Z', + read: true, + messageId: 'work-sync-read-queued', + messageKind: 'member_work_sync_nudge', + workSyncIntent: 'agenda_sync', + }, + ]); + + await expect( + service.relayOpenCodeMemberInboxMessages(teamName, 'jack', { + onlyMessageId: 'work-sync-read-queued', + source: 'watchdog', + }) + ).resolves.toMatchObject({ + attempted: 1, + delivered: 0, + failed: 0, + lastDelivery: { + delivered: true, + accepted: false, + responsePending: true, + reason: 'opencode_work_sync_read_commit_waiting_for_active_relay', + }, + }); + expect(wakeSpy).toHaveBeenCalledWith({ + teamName, + memberName: 'jack', + messageId: 'work-sync-read-queued', + delayMs: 500, + }); + + releaseOldDelivery.resolve(undefined); + await watcherRelay; + } finally { + vi.useRealTimers(); + } + }); + it('treats an already-read specific OpenCode inbox row as delivered for UI-send relay', async () => { const service = new TeamProvisioningService(); const teamName = 'my-team'; @@ -3762,6 +3872,68 @@ Messages: expect(deliverSpy).not.toHaveBeenCalled(); }); + it('does not treat an already-read work-sync nudge as delivered without the work-sync proof path', async () => { + const service = new TeamProvisioningService(); + const teamName = 'my-team'; + hoisted.files.set( + `/mock/teams/${teamName}/config.json`, + JSON.stringify({ + name: teamName, + projectPath: '/tmp/my-team', + members: [ + { name: 'team-lead', agentType: 'team-lead' }, + { name: 'jack', role: 'developer', providerId: 'opencode', model: 'openrouter/test' }, + ], + }) + ); + seedMemberInbox(teamName, 'jack', [ + { + from: 'system', + to: 'jack', + text: 'Call member_work_sync_status, then member_work_sync_report.', + timestamp: '2026-02-23T17:02:00.000Z', + read: true, + messageId: 'work-sync-read-1', + messageKind: 'member_work_sync_nudge', + workSyncIntent: 'agenda_sync', + taskRefs: [{ taskId: 'task-1', teamName }], + }, + ]); + const deliverSpy = vi.spyOn(service, 'deliverOpenCodeMemberMessage').mockResolvedValue({ + delivered: true, + accepted: false, + responsePending: true, + reason: 'member_work_sync_report_required', + diagnostics: ['member_work_sync_report_required'], + }); + + const relay = await service.relayOpenCodeMemberInboxMessages(teamName, 'jack', { + onlyMessageId: 'work-sync-read-1', + source: 'watchdog', + }); + + expect(deliverSpy).toHaveBeenCalledWith( + teamName, + expect.objectContaining({ + memberName: 'jack', + messageId: 'work-sync-read-1', + messageKind: 'member_work_sync_nudge', + workSyncIntent: 'agenda_sync', + }) + ); + expect(relay).toMatchObject({ + attempted: 1, + delivered: 0, + failed: 0, + lastDelivery: { + delivered: true, + accepted: false, + responsePending: true, + reason: 'member_work_sync_report_required', + }, + }); + }); + it('routes watcher inbox changes for OpenCode members through direct runtime relay', async () => { const service = new TeamProvisioningService(); const teamName = 'my-team'; @@ -4357,7 +4529,10 @@ Messages: ], }) ); - hoisted.files.set(`${teamsBasePath}/${teamName}/inboxes/${memberName}.json`, JSON.stringify([])); + hoisted.files.set( + `${teamsBasePath}/${teamName}/inboxes/${memberName}.json`, + JSON.stringify([]) + ); (service as any).resolveOpenCodeMemberDeliveryIdentity = vi.fn(async () => ({ ok: true, canonicalMemberName: memberName, @@ -4415,7 +4590,10 @@ Messages: ], }) ); - hoisted.files.set(`${teamsBasePath}/${teamName}/inboxes/${memberName}.json`, JSON.stringify([])); + hoisted.files.set( + `${teamsBasePath}/${teamName}/inboxes/${memberName}.json`, + JSON.stringify([]) + ); (service as any).resolveOpenCodeMemberDeliveryIdentity = vi.fn(async () => ({ ok: true, canonicalMemberName: memberName,