From c8a3ad07acdc7600bda1a94274d960d5994dfa74 Mon Sep 17 00:00:00 2001 From: 777genius Date: Sat, 6 Jun 2026 21:19:52 +0300 Subject: [PATCH] fix(member-work-sync): repair nudges and stale report tokens --- .../MemberWorkSyncDiagnosticsReader.ts | 9 +- .../MemberWorkSyncNudgeOutboxPlanner.ts | 54 ++ ...mberWorkSyncPendingReportIntentReplayer.ts | 63 +- .../core/application/ports.ts | 7 + .../TeamInboxMemberWorkSyncNudgeSink.ts | 114 +++- .../TeamTaskStallJournalWorkSyncCooldown.ts | 5 +- .../memberWorkSyncTeamActivity.test.ts | 55 ++ .../createMemberWorkSyncFeature.ts | 48 ++ .../composition/memberWorkSyncTeamActivity.ts | 81 ++- src/features/member-work-sync/main/index.ts | 1 + src/main/index.ts | 11 +- src/main/services/team/TeamInboxWriter.ts | 86 +++ .../core/MemberWorkSyncUseCases.test.ts | 146 +++++ .../TeamInboxMemberWorkSyncNudgeSink.test.ts | 398 +++++++++++- .../main/createMemberWorkSyncFeature.test.ts | 476 +++++++++++++- .../team/MemberWorkSyncCodex.live.test.ts | 593 ++++++++++++++++++ ...penCodeAgendaSyncRecovery.safe-e2e.test.ts | 20 +- .../services/team/TeamInboxWriter.test.ts | 49 ++ 18 files changed, 2166 insertions(+), 50 deletions(-) diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncDiagnosticsReader.ts b/src/features/member-work-sync/core/application/MemberWorkSyncDiagnosticsReader.ts index c329abe1..748cf1c8 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncDiagnosticsReader.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncDiagnosticsReader.ts @@ -1,6 +1,9 @@ import { decideMemberWorkSyncStatus } from '../domain'; -import { finalizeMemberWorkSyncAgenda } from './MemberWorkSyncReconciler'; +import { + attachMemberWorkSyncReportToken, + finalizeMemberWorkSyncAgenda, +} from './MemberWorkSyncReconciler'; import { resolveMemberWorkSyncRuntimeActivity } from './MemberWorkSyncRuntimeActivity'; import type { MemberWorkSyncStatus, MemberWorkSyncStatusRequest } from '../../contracts'; @@ -28,7 +31,7 @@ export class MemberWorkSyncDiagnosticsReader { inactive: source.inactive || runtimeActivity.inactive, }); - return { + return attachMemberWorkSyncReportToken(this.deps, { teamName: agenda.teamName, memberName: agenda.memberName, state: decision.state, @@ -46,6 +49,6 @@ export class MemberWorkSyncDiagnosticsReader { 'status_snapshot_not_persisted', ], ...(source.providerId ? { providerId: source.providerId } : {}), - }; + }); } } diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts index 310c2f14..4b2caad8 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeOutboxPlanner.ts @@ -155,6 +155,21 @@ function shouldPlanDeliveredStillStuckRecovery(input: { ); } +function shouldRepairDeliveredAgendaSyncNudge(input: { + status: MemberWorkSyncStatus; + requestedInput: MemberWorkSyncOutboxEnsureInput; + existingItem: MemberWorkSyncOutboxItem; +}): boolean { + return ( + input.status.state === 'needs_sync' && + input.requestedInput.payload.workSyncIntent === 'agenda_sync' && + input.existingItem.status === 'delivered' && + input.existingItem.agendaFingerprint === input.requestedInput.agendaFingerprint && + input.existingItem.payloadHash === input.requestedInput.payloadHash && + !hasActiveAcceptedWorkLease(input.status) + ); +} + function isOutboxItemAwaitingDelivery(item: MemberWorkSyncOutboxItem): boolean { return item.status !== 'delivered' && item.status !== 'failed_terminal'; } @@ -296,6 +311,7 @@ export class MemberWorkSyncNudgeOutboxPlanner { await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' }); return { planned: false, code: 'payload_conflict' }; } + await this.repairDeliveredAgendaSyncNudgeIfNeeded(status, recoveryInput, recoveryResult.item); if (activationReason) { const deliveredStillStuckRecovery = await this.planDeliveredStillStuckRecovery( @@ -371,6 +387,7 @@ export class MemberWorkSyncNudgeOutboxPlanner { await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' }); return { planned: false, code: 'payload_conflict' }; } + await this.repairDeliveredAgendaSyncNudgeIfNeeded(status, recoveryInput, recoveryResult.item); const recoveryPlanned = isOutboxItemAwaitingDelivery(recoveryResult.item); const recoveryPlanResult = { @@ -491,6 +508,11 @@ export class MemberWorkSyncNudgeOutboxPlanner { await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' }); return { planned: false, code: 'payload_conflict' }; } + await this.repairDeliveredAgendaSyncNudgeIfNeeded( + status, + recoveryInput, + recoveryResult.item + ); if ( shouldPlanStatusOnlyRecovery({ status, @@ -544,6 +566,7 @@ export class MemberWorkSyncNudgeOutboxPlanner { await this.appendPlanAudit(status, { planned: false, code }); return { planned: false, code }; } + await this.repairDeliveredAgendaSyncNudgeIfNeeded(status, input, result.item); if ( shouldPlanStatusOnlyRecovery({ status, @@ -580,6 +603,37 @@ export class MemberWorkSyncNudgeOutboxPlanner { return planResult; } + private async repairDeliveredAgendaSyncNudgeIfNeeded( + status: MemberWorkSyncStatus, + requestedInput: MemberWorkSyncOutboxEnsureInput, + existingItem: MemberWorkSyncOutboxItem + ): Promise { + const inboxNudge = this.deps.inboxNudge; + if ( + !inboxNudge?.repairIfPresent || + !shouldRepairDeliveredAgendaSyncNudge({ status, requestedInput, existingItem }) + ) { + return; + } + + try { + await inboxNudge.repairIfPresent({ + teamName: status.teamName, + memberName: status.memberName, + messageId: existingItem.deliveredMessageId ?? existingItem.id, + payloadHash: existingItem.payloadHash, + payload: existingItem.payload, + }); + } catch (error) { + this.deps.logger?.warn('member work sync delivered nudge repair failed', { + teamName: status.teamName, + memberName: status.memberName, + outboxId: existingItem.id, + error: String(error), + }); + } + } + private async appendReviewPickupEscalationAudit( status: MemberWorkSyncStatus, reason: string diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts b/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts index a878e463..d1913dc9 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts @@ -1,6 +1,10 @@ import { MemberWorkSyncReporter } from './MemberWorkSyncReporter'; -import type { MemberWorkSyncReportIntentStatus } from '../../contracts'; +import type { + MemberWorkSyncReportIntent, + MemberWorkSyncReportIntentStatus, + MemberWorkSyncReportResult, +} from '../../contracts'; import type { MemberWorkSyncUseCaseDeps } from './ports'; export interface MemberWorkSyncPendingReportReplaySummary { @@ -52,10 +56,7 @@ export class MemberWorkSyncPendingReportIntentReplayer { let status: MemberWorkSyncReportIntentStatus = 'rejected'; let resultCode = 'replay_failed'; try { - const result = await this.reporter.execute({ - ...intent.request, - source: intent.request.source ?? 'mcp', - }); + const result = await this.executeReplay(intent); status = statusForResult(result); resultCode = result.code; } catch (error) { @@ -83,4 +84,56 @@ export class MemberWorkSyncPendingReportIntentReplayer { return summary; } + + private async executeReplay( + intent: MemberWorkSyncReportIntent + ): Promise { + const result = await this.reporter.execute({ + ...intent.request, + source: intent.request.source ?? 'mcp', + }); + const freshToken = await this.getFreshTokenForExpiredFallbackReport(intent, result); + if (!freshToken) { + return result; + } + return this.reporter.execute({ + ...intent.request, + agendaFingerprint: freshToken.agendaFingerprint, + reportToken: freshToken.reportToken, + source: intent.request.source ?? 'mcp', + }); + } + + private async getFreshTokenForExpiredFallbackReport( + intent: MemberWorkSyncReportIntent, + result: MemberWorkSyncReportResult + ): Promise<{ agendaFingerprint: string; reportToken: string } | null> { + if ( + result.accepted || + result.code !== 'invalid_report_token' || + intent.reason !== 'control_api_unavailable' || + !intent.request.reportToken || + !result.status.reportToken || + result.status.agenda.fingerprint !== intent.request.agendaFingerprint || + !this.deps.reportToken + ) { + return null; + } + + const validation = await this.deps.reportToken.verify({ + token: intent.request.reportToken, + teamName: result.status.teamName, + memberName: result.status.memberName, + agendaFingerprint: result.status.agenda.fingerprint, + nowIso: this.deps.clock.now().toISOString(), + }); + if (validation.ok || validation.reason !== 'expired') { + return null; + } + + return { + agendaFingerprint: result.status.agenda.fingerprint, + reportToken: result.status.reportToken, + }; + } } diff --git a/src/features/member-work-sync/core/application/ports.ts b/src/features/member-work-sync/core/application/ports.ts index 58400bce..a7ac4a5f 100644 --- a/src/features/member-work-sync/core/application/ports.ts +++ b/src/features/member-work-sync/core/application/ports.ts @@ -190,6 +190,13 @@ export interface MemberWorkSyncInboxNudgePort { payload: MemberWorkSyncOutboxItem['payload']; timestamp: string; }): Promise<{ inserted: boolean; messageId: string; conflict?: boolean }>; + repairIfPresent?(input: { + teamName: string; + memberName: string; + messageId: string; + payloadHash: string; + payload: MemberWorkSyncOutboxItem['payload']; + }): Promise<{ found: boolean; repaired: boolean; conflict?: boolean }>; } export interface MemberWorkSyncWatchdogCooldownPort { diff --git a/src/features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink.ts b/src/features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink.ts index d3c0c949..f3447230 100644 --- a/src/features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink.ts +++ b/src/features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink.ts @@ -3,24 +3,48 @@ import { TeamInboxWriter } from '@main/services/team/TeamInboxWriter'; import type { MemberWorkSyncInboxNudgePort } from '../../../core/application'; +type TeamInboxMemberWorkSyncNudgeInput = Parameters< + MemberWorkSyncInboxNudgePort['insertIfAbsent'] +>[0]; +type TeamInboxMemberWorkSyncNudgeRepairInput = Parameters< + NonNullable +>[0]; + +type TeamInboxMemberWorkSyncNudgeWriter = Pick & + Partial>; + +function isStoredMemberWorkSyncNudge( + message: Awaited>[number] +): boolean { + return message.messageKind === 'member_work_sync_nudge'; +} + export class TeamInboxMemberWorkSyncNudgeSink implements MemberWorkSyncInboxNudgePort { constructor( private readonly inboxReader: Pick = new TeamInboxReader(), - private readonly inboxWriter: Pick = new TeamInboxWriter(), + private readonly inboxWriter: TeamInboxMemberWorkSyncNudgeWriter = new TeamInboxWriter(), private readonly controlUrlResolver?: () => Promise | string | null ) {} - async insertIfAbsent(input: Parameters[0]) { + async insertIfAbsent(input: TeamInboxMemberWorkSyncNudgeInput) { const existing = await this.inboxReader.getMessagesFor(input.teamName, input.memberName); const existingMessage = existing.find((message) => message.messageId === input.messageId); if (existingMessage) { - if (existingMessage.workSyncPayloadHash !== input.payloadHash) { + if ( + existingMessage.workSyncPayloadHash !== input.payloadHash || + !isStoredMemberWorkSyncNudge(existingMessage) + ) { return { inserted: false, messageId: input.messageId, conflict: true }; } + await this.repairExistingControlUrlIfNeeded(input, existingMessage.text, { + required: Boolean(this.controlUrlResolver), + }); return { inserted: false, messageId: input.messageId }; } - const controlUrl = await this.resolveControlUrl(); + const controlUrl = await this.resolveControlUrl({ + required: Boolean(this.controlUrlResolver), + }); const text = controlUrl ? this.withControlUrl(input.payload.text, controlUrl) : input.payload.text; @@ -48,27 +72,89 @@ export class TeamInboxMemberWorkSyncNudgeSink implements MemberWorkSyncInboxNudg }; } - private async resolveControlUrl(): Promise { + async repairIfPresent(input: TeamInboxMemberWorkSyncNudgeRepairInput) { + const existing = await this.inboxReader.getMessagesFor(input.teamName, input.memberName); + const existingMessage = existing.find((message) => message.messageId === input.messageId); + if (!existingMessage) { + return { found: false, repaired: false }; + } + if ( + existingMessage.workSyncPayloadHash !== input.payloadHash || + !isStoredMemberWorkSyncNudge(existingMessage) + ) { + return { found: true, repaired: false, conflict: true }; + } + const repaired = await this.repairExistingControlUrlIfNeeded(input, existingMessage.text, { + required: Boolean(this.controlUrlResolver), + }); + return { found: true, repaired }; + } + + private async repairExistingControlUrlIfNeeded( + input: TeamInboxMemberWorkSyncNudgeRepairInput, + existingText: string | undefined, + options: { required?: boolean } = {} + ): Promise { + const controlUrl = await this.resolveControlUrl(options); + if (!controlUrl) { + return false; + } + const currentText = existingText ?? input.payload.text; + const repairedText = this.withControlUrl(currentText, controlUrl); + if (repairedText === currentText) { + return false; + } + if (typeof this.inboxWriter.updateMessageText !== 'function') { + if (options.required) { + throw new Error('member work sync inbox text update unavailable'); + } + return false; + } + const result = await this.inboxWriter.updateMessageText(input.teamName, { + member: input.memberName, + messageId: input.messageId, + text: repairedText, + expectedMessageKind: 'member_work_sync_nudge', + expectedWorkSyncPayloadHash: input.payloadHash, + }); + return result.updated; + } + + private async resolveControlUrl(options: { required?: boolean } = {}): Promise { if (!this.controlUrlResolver) { return null; } + let value: string | null | undefined; try { - const value = await this.controlUrlResolver(); - const trimmed = value?.trim(); - return trimmed ? trimmed : null; - } catch { + value = await this.controlUrlResolver(); + } catch (error) { + if (options.required) { + throw new Error(`member work sync control URL unavailable: ${String(error)}`); + } return null; } + + const trimmed = value?.trim(); + if (trimmed) { + return trimmed; + } + if (options.required) { + throw new Error('member work sync control URL unavailable'); + } + return null; } private withControlUrl(text: string, controlUrl: string): string { - if (text.includes('controlUrl')) { + const controlLine = `Required control API: pass controlUrl "${controlUrl}" in both member_work_sync_status and member_work_sync_report.`; + const existingControlLine = + /^Required control API: pass controlUrl "[^"\n]+" in both member_work_sync_status and member_work_sync_report\.$/m; + if (existingControlLine.test(text)) { + return text.replace(existingControlLine, controlLine); + } + if (text.includes(`controlUrl "${controlUrl}"`)) { return text; } - return [ - text, - `Required control API: pass controlUrl "${controlUrl}" in both member_work_sync_status and member_work_sync_report.`, - ].join('\n'); + return [text, controlLine].join('\n'); } } diff --git a/src/features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown.ts b/src/features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown.ts index c5dd44b4..654519db 100644 --- a/src/features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown.ts +++ b/src/features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown.ts @@ -12,7 +12,10 @@ interface StallJournalEntry { alertedAt?: string; } -type WatchdogCooldownResult = { active: boolean; retryAfterIso?: string }; +interface WatchdogCooldownResult { + active: boolean; + retryAfterIso?: string; +} function parseTime(value: string | undefined): number | null { if (!value) { diff --git a/src/features/member-work-sync/main/composition/__tests__/memberWorkSyncTeamActivity.test.ts b/src/features/member-work-sync/main/composition/__tests__/memberWorkSyncTeamActivity.test.ts index 7079c5e2..51b543a9 100644 --- a/src/features/member-work-sync/main/composition/__tests__/memberWorkSyncTeamActivity.test.ts +++ b/src/features/member-work-sync/main/composition/__tests__/memberWorkSyncTeamActivity.test.ts @@ -3,6 +3,7 @@ import { describe, expect, it } from 'vitest'; import { hasUncertainWorkSyncRuntimeActivity, hasWorkSyncActiveRuntime, + hasWorkSyncReachableRuntime, isRuntimeEntryActiveForWorkSync, isRuntimeMemberActiveForWorkSync, isRuntimeMemberActivityUncertainForWorkSync, @@ -87,6 +88,60 @@ describe('member work sync team activity', () => { ).toBe(false); }); + it('does not treat lead process evidence as active for ordinary teammates', () => { + for (const livenessKind of [undefined, 'runtime_process', 'confirmed_bootstrap'] as const) { + const snapshot = createRuntimeSnapshot({ + alice: createRuntimeEntry({ + memberName: 'alice', + backendType: 'process', + livenessKind, + pidSource: 'lead_process', + }), + }); + + expect(isRuntimeEntryActiveForWorkSync(snapshot.members.alice)).toBe(false); + expect(hasWorkSyncActiveRuntime(snapshot)).toBe(false); + expect(hasWorkSyncReachableRuntime(snapshot)).toBe(false); + expect(isRuntimeMemberActiveForWorkSync(snapshot, 'alice')).toBe(false); + } + }); + + it('keeps active lead processes reachable for targeted lead work-sync', () => { + const snapshot = createRuntimeSnapshot({ + 'team-lead': createRuntimeEntry({ + memberName: 'team-lead', + backendType: 'lead', + livenessKind: undefined, + pidSource: 'lead_process', + }), + alice: createRuntimeEntry({ + memberName: 'alice', + alive: false, + livenessKind: 'stale_metadata', + }), + }); + + expect(hasWorkSyncActiveRuntime(snapshot)).toBe(false); + expect(hasWorkSyncReachableRuntime(snapshot)).toBe(true); + expect(isRuntimeMemberActiveForWorkSync(snapshot, 'team-lead')).toBe(true); + expect(isRuntimeMemberActiveForWorkSync(snapshot, 'alice')).toBe(false); + }); + + it('keeps ordinary teammates named lead active from normal agent process evidence', () => { + const snapshot = createRuntimeSnapshot({ + lead: createRuntimeEntry({ + memberName: 'lead', + backendType: 'process', + livenessKind: 'confirmed_bootstrap', + pidSource: 'agent_process_table', + }), + }); + + expect(hasWorkSyncActiveRuntime(snapshot)).toBe(true); + expect(hasWorkSyncReachableRuntime(snapshot)).toBe(true); + expect(isRuntimeMemberActiveForWorkSync(snapshot, 'lead')).toBe(true); + }); + it('does not treat inactive liveness diagnostics as active by themselves', () => { for (const livenessKind of [ 'permission_blocked', diff --git a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts index c617e653..06ef962e 100644 --- a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts +++ b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts @@ -88,6 +88,22 @@ function getAcceptedWorkLeaseStaleness( return reportExpiresAtMs <= nowMs ? 'expired' : null; } +function getReportTokenStaleness( + status: MemberWorkSyncStatus, + nowMs: number +): 'missing' | 'expired' | null { + if (!status.reportToken?.trim()) { + return 'missing'; + } + + const tokenExpiresAtMs = Date.parse(status.reportTokenExpiresAt ?? ''); + if (!Number.isFinite(tokenExpiresAtMs) || !Number.isFinite(nowMs)) { + return 'missing'; + } + + return tokenExpiresAtMs <= nowMs ? 'expired' : null; +} + function isEmptyAgendaStaleState(status: MemberWorkSyncStatus): boolean { return ( status.agenda.items.length === 0 && @@ -99,6 +115,10 @@ function isEmptyAgendaStaleState(status: MemberWorkSyncStatus): boolean { } function statusNeedsBackgroundRefresh(status: MemberWorkSyncStatus, nowMs: number): boolean { + if (getReportTokenStaleness(status, nowMs) !== null) { + return true; + } + if (isEmptyAgendaStaleState(status)) { return true; } @@ -125,6 +145,13 @@ function statusNeedsBackgroundRefresh(status: MemberWorkSyncStatus, nowMs: numbe function getStatusStalenessDiagnostics(status: MemberWorkSyncStatus, nowMs: number): string[] { const diagnostics: string[] = []; + const tokenStaleness = getReportTokenStaleness(status, nowMs); + if (tokenStaleness === 'missing') { + diagnostics.push('report_token_missing_refresh_enqueued'); + } else if (tokenStaleness === 'expired') { + diagnostics.push('report_token_expired_refresh_enqueued'); + } + const evaluatedAtMs = Date.parse(status.evaluatedAt); if (!Number.isFinite(evaluatedAtMs)) { diagnostics.push('status_evaluated_at_invalid'); @@ -150,6 +177,12 @@ function getStatusStalenessDiagnostics(status: MemberWorkSyncStatus, nowMs: numb return [...new Set(diagnostics)]; } +function shouldRefreshStatusSynchronously(stalenessDiagnostics: string[]): boolean { + return stalenessDiagnostics.some( + (diagnostic) => diagnostic !== 'caught_up_stale_refresh_enqueued' + ); +} + export function buildMemberWorkSyncRuntimeTurnSettledEnvironment(input: { teamsBasePath: string; provider: RuntimeTurnSettledProvider; @@ -505,6 +538,21 @@ export function createMemberWorkSyncFeature(deps: { if (stalenessDiagnostics.length === 0) { return status; } + if (shouldRefreshStatusSynchronously(stalenessDiagnostics)) { + try { + return await reconciler.execute(request, { + reconciledBy: 'request', + triggerReasons: ['manual_refresh'], + }); + } catch (error) { + deps.logger?.warn('member work sync synchronous status refresh failed', { + teamName: status.teamName, + memberName: status.memberName, + diagnostics: stalenessDiagnostics, + error: String(error), + }); + } + } queue.enqueue({ teamName: status.teamName, memberName: status.memberName, diff --git a/src/features/member-work-sync/main/composition/memberWorkSyncTeamActivity.ts b/src/features/member-work-sync/main/composition/memberWorkSyncTeamActivity.ts index a739f201..e4672d97 100644 --- a/src/features/member-work-sync/main/composition/memberWorkSyncTeamActivity.ts +++ b/src/features/member-work-sync/main/composition/memberWorkSyncTeamActivity.ts @@ -26,11 +26,46 @@ const WORK_SYNC_BOOTSTRAP_ONLY_PID_SOURCES = new Set( 'persisted_metadata', ]); -const WORK_SYNC_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES = new Set([ +const WORK_SYNC_MEMBER_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES = new Set([ 'agent_process_table', 'opencode_bridge', ]); +const WORK_SYNC_LEAD_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES = new Set([ + 'lead_process', +]); + +function isWorkSyncLeadLikeMemberName(memberName: string): boolean { + const normalized = normalizeMemberName(memberName).replace(/[\s_]+/g, '-'); + return ( + normalized === 'lead' || + normalized === 'team-lead' || + normalized === 'teamlead' || + normalized === 'team-leader' + ); +} + +function hasActiveWorkSyncProcessEvidence( + entry: Pick | null | undefined, + confirmedBootstrapActivePidSources: ReadonlySet +): boolean { + if (entry?.alive !== true) { + return false; + } + if ( + entry.livenessKind === 'confirmed_bootstrap' && + (!entry.pidSource || + WORK_SYNC_BOOTSTRAP_ONLY_PID_SOURCES.has(entry.pidSource) || + !confirmedBootstrapActivePidSources.has(entry.pidSource)) + ) { + return false; + } + if (!entry.livenessKind) { + return true; + } + return !WORK_SYNC_INACTIVE_LIVENESS_KINDS.has(entry.livenessKind); +} + export function isRuntimeEntryActiveForWorkSync( entry: | Pick< @@ -40,7 +75,7 @@ export function isRuntimeEntryActiveForWorkSync( | null | undefined ): boolean { - if (entry?.alive !== true) { + if (!entry) { return false; } if ( @@ -50,17 +85,33 @@ export function isRuntimeEntryActiveForWorkSync( return false; } if ( - entry.livenessKind === 'confirmed_bootstrap' && - (!entry.pidSource || - WORK_SYNC_BOOTSTRAP_ONLY_PID_SOURCES.has(entry.pidSource) || - !WORK_SYNC_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES.has(entry.pidSource)) + entry.pidSource && + WORK_SYNC_LEAD_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES.has(entry.pidSource) ) { return false; } - if (!entry.livenessKind) { - return true; + return hasActiveWorkSyncProcessEvidence( + entry, + WORK_SYNC_MEMBER_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES + ); +} + +function isRuntimeLeadEntryActiveForWorkSync( + entry: + | Pick< + TeamAgentRuntimeEntry, + 'alive' | 'backendType' | 'livenessKind' | 'memberName' | 'pidSource' + > + | null + | undefined +): boolean { + if (!entry || !isWorkSyncLeadLikeMemberName(entry.memberName)) { + return false; } - return !WORK_SYNC_INACTIVE_LIVENESS_KINDS.has(entry.livenessKind); + return ( + entry.backendType === 'lead' && + hasActiveWorkSyncProcessEvidence(entry, WORK_SYNC_LEAD_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES) + ); } function isRuntimeEntryRelevantForWorkSync( @@ -95,6 +146,14 @@ export function hasWorkSyncActiveRuntime( return Object.values(snapshot?.members ?? {}).some(isRuntimeEntryActiveForWorkSync); } +export function hasWorkSyncReachableRuntime( + snapshot: Pick | null | undefined +): boolean { + return Object.values(snapshot?.members ?? {}).some( + (entry) => isRuntimeEntryActiveForWorkSync(entry) || isRuntimeLeadEntryActiveForWorkSync(entry) + ); +} + export function isRuntimeMemberActiveForWorkSync( snapshot: Pick | null | undefined, memberName: string @@ -106,7 +165,9 @@ export function isRuntimeMemberActiveForWorkSync( return Object.values(snapshot?.members ?? {}).some( (entry) => normalizeMemberName(entry.memberName) === normalizedMemberName && - isRuntimeEntryActiveForWorkSync(entry) + (isRuntimeEntryActiveForWorkSync(entry) || + (isWorkSyncLeadLikeMemberName(normalizedMemberName) && + isRuntimeLeadEntryActiveForWorkSync(entry))) ); } diff --git a/src/features/member-work-sync/main/index.ts b/src/features/member-work-sync/main/index.ts index f451680f..fa0ac4e4 100644 --- a/src/features/member-work-sync/main/index.ts +++ b/src/features/member-work-sync/main/index.ts @@ -11,6 +11,7 @@ export { export { hasUncertainWorkSyncRuntimeActivity, hasWorkSyncActiveRuntime, + hasWorkSyncReachableRuntime, isRuntimeEntryActiveForWorkSync, isRuntimeMemberActiveForWorkSync, isRuntimeMemberActivityUncertainForWorkSync, diff --git a/src/main/index.ts b/src/main/index.ts index 46d16e8d..f19e0349 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -41,7 +41,7 @@ import { buildMemberWorkSyncRuntimeTurnSettledEnvironment, createMemberWorkSyncFeature, hasUncertainWorkSyncRuntimeActivity, - hasWorkSyncActiveRuntime, + hasWorkSyncReachableRuntime, isRuntimeMemberActivityUncertainForWorkSync, isRuntimeMemberActiveForWorkSync, type MemberWorkSyncFeatureFacade, @@ -1919,7 +1919,7 @@ async function initializeServices(): Promise { if (!snapshot) { return null; } - const active = hasWorkSyncActiveRuntime(snapshot); + const active = hasWorkSyncReachableRuntime(snapshot); if (!active && hasUncertainWorkSyncRuntimeActivity(snapshot)) { return null; } @@ -2037,7 +2037,12 @@ async function initializeServices(): Promise { isBusy: (input) => teamProvisioningService.getOpenCodeMemberDeliveryBusyStatus(input), }, ], - resolveControlUrl: async () => getTeamControlApiBaseUrl(), + resolveControlUrl: async () => { + if (!httpServer.isRunning()) { + await startHttpServer(handleModeSwitch); + } + return getTeamControlApiBaseUrl(); + }, proofMissingRecoveryGuard: { shouldDispatch: async (input) => { const isOpenCodeRecipient = await teamProvisioningService diff --git a/src/main/services/team/TeamInboxWriter.ts b/src/main/services/team/TeamInboxWriter.ts index 63326acf..c26a1e61 100644 --- a/src/main/services/team/TeamInboxWriter.ts +++ b/src/main/services/team/TeamInboxWriter.ts @@ -6,9 +6,23 @@ import * as path from 'path'; import { atomicWriteAsync } from './atomicWrite'; import { withFileLock } from './fileLock'; import { withInboxLock } from './inboxLock'; +import { getEffectiveInboxMessageId } from './inboxMessageIdentity'; import type { InboxMessage, SendMessageRequest, SendMessageResult, TaskRef } from '@shared/types'; +export interface UpdateInboxMessageTextRequest { + member: string; + messageId: string; + text: string; + expectedMessageKind?: InboxMessage['messageKind']; + expectedWorkSyncPayloadHash?: string; +} + +export interface UpdateInboxMessageTextResult { + found: boolean; + updated: boolean; +} + export interface MergeRuntimeDeliveryTaskRefsRequest { inboxName: string; messageId: string; @@ -137,6 +151,78 @@ export class TeamInboxWriter { }; } + async updateMessageText( + teamName: string, + request: UpdateInboxMessageTextRequest + ): Promise { + const messageId = request.messageId.trim(); + if (!messageId) { + return { found: false, updated: false }; + } + + const inboxPath = path.join(getTeamsBasePath(), teamName, 'inboxes', `${request.member}.json`); + let result: UpdateInboxMessageTextResult = { found: false, updated: false }; + + await withFileLock(inboxPath, async () => { + await withInboxLock(inboxPath, async () => { + let raw: string; + try { + raw = await fs.promises.readFile(inboxPath, 'utf8'); + } catch (error) { + if ((error as NodeJS.ErrnoException).code === 'ENOENT') { + return; + } + throw error; + } + + let parsed: unknown; + try { + parsed = JSON.parse(raw) as unknown; + } catch { + return; + } + if (!Array.isArray(parsed)) { + return; + } + + let changed = false; + for (const item of parsed) { + if (!item || typeof item !== 'object') { + continue; + } + const row = item as Record; + const rowMessageId = getEffectiveInboxMessageId(row); + if (rowMessageId !== messageId) { + continue; + } + result = { found: true, updated: changed }; + if (request.expectedMessageKind && row.messageKind !== request.expectedMessageKind) { + continue; + } + if ( + request.expectedWorkSyncPayloadHash && + row.workSyncPayloadHash !== request.expectedWorkSyncPayloadHash + ) { + continue; + } + if (row.text === request.text) { + continue; + } + row.text = request.text; + changed = true; + result = { found: true, updated: true }; + } + + if (!changed) { + return; + } + await atomicWriteAsync(inboxPath, JSON.stringify(parsed, null, 2)); + }); + }); + + return result; + } + async mergeRuntimeDeliveryTaskRefs( teamName: string, request: MergeRuntimeDeliveryTaskRefsRequest diff --git a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts index af3d67cf..71217843 100644 --- a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts +++ b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts @@ -311,8 +311,13 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort { class InMemoryInboxNudge implements MemberWorkSyncInboxNudgePort { readonly inserted: Array[0]> = []; + readonly repaired: Array< + Parameters>[0] + > = []; fail = false; conflict = false; + repairFail = false; + repairConflict = false; async insertIfAbsent(input: Parameters[0]) { if (this.fail) { @@ -324,6 +329,19 @@ class InMemoryInboxNudge implements MemberWorkSyncInboxNudgePort { this.inserted.push(input); return { inserted: true, messageId: input.messageId }; } + + async repairIfPresent( + input: Parameters>[0] + ) { + if (this.repairFail) { + throw new Error('inbox repair unavailable'); + } + if (this.repairConflict) { + return { found: true, repaired: false, conflict: true }; + } + this.repaired.push(input); + return { found: true, repaired: true }; + } } function createDeps(options?: { @@ -1885,6 +1903,19 @@ describe('MemberWorkSync use cases', () => { item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:') ) ).toHaveLength(1); + expect(inbox.inserted).toHaveLength(2); + expect(inbox.repaired).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + messageId: baseId, + payloadHash: outbox.items.get(baseId)?.payloadHash, + }), + expect.objectContaining({ + messageId: recovery?.id, + payloadHash: recovery?.payloadHash, + }), + ]) + ); clock.set('2026-04-29T01:02:00.000Z'); store.metricsGeneratedAt = '2026-04-29T01:02:00.000Z'; @@ -2007,6 +2038,33 @@ describe('MemberWorkSync use cases', () => { expect(summary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 }); expect(inbox.inserted).toHaveLength(2); expect(inbox.inserted[1]?.messageId).toContain('agenda-sync-still-stuck'); + + clock.set('2026-04-29T01:02:00.000Z'); + store.phase2ReadinessState = 'shadow_ready'; + store.phase2ReadinessReasons = []; + store.metricsGeneratedAt = '2026-04-29T01:02:00.000Z'; + await reconciler.execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['config_changed', 'task_changed'] } + ); + + const recoveryItems = [...outbox.items.values()].filter((item) => + item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:') + ); + expect(recoveryItems).toHaveLength(2); + expect(new Set(recoveryItems.map((item) => item.id)).size).toBe(2); + + const secondSummary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(secondSummary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 }); + expect(inbox.inserted).toHaveLength(3); + expect(inbox.inserted[2]?.messageId).toContain('agenda-sync-still-stuck'); }); it('creates a delivered-still-stuck recovery for mixed review pickup and native work under noisy metrics', async () => { @@ -2130,6 +2188,15 @@ describe('MemberWorkSync use cases', () => { item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:') ) ).toHaveLength(0); + expect(inbox.inserted).toHaveLength(1); + expect(inbox.repaired).toEqual([ + expect.objectContaining({ + teamName: 'team-a', + memberName: 'bob', + messageId: baseId, + payloadHash: outbox.items.get(baseId)?.payloadHash, + }), + ]); expect(auditEvents).toEqual( expect.arrayContaining([ expect.objectContaining({ @@ -2945,6 +3012,85 @@ describe('MemberWorkSync use cases', () => { expect(store.writes.at(-1)?.state).toBe('still_working'); }); + it('refreshes expired fallback pending report tokens during replay', async () => { + const { deps, store } = createDeps(); + const reader = new MemberWorkSyncReconciler(deps); + const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' }); + const baseReportToken = deps.reportToken!; + deps.reportToken = { + create: baseReportToken.create, + verify: async (input) => + input.token === 'expired-token' + ? { ok: false, reason: 'expired' } + : baseReportToken.verify(input), + }; + store.pendingIntents.set('intent-1', { + id: 'intent-1', + teamName: 'team-a', + memberName: 'bob', + status: 'pending', + reason: 'control_api_unavailable', + recordedAt: '2026-04-29T00:16:00.000Z', + request: { + teamName: 'team-a', + memberName: 'bob', + state: 'still_working', + agendaFingerprint: current.agenda.fingerprint, + reportToken: 'expired-token', + leaseTtlMs: 120_000, + source: 'mcp', + }, + }); + + const summary = await new MemberWorkSyncPendingReportIntentReplayer(deps).replayTeam('team-a'); + + expect(summary).toEqual({ processed: 1, accepted: 1, rejected: 0, superseded: 0 }); + expect(store.pendingIntents.get('intent-1')).toMatchObject({ + status: 'accepted', + resultCode: 'accepted', + }); + expect(store.writes.at(-1)?.report).toMatchObject({ + accepted: true, + source: 'mcp', + state: 'still_working', + }); + }); + + it('rejects invalid fallback pending report tokens without refreshing identity', async () => { + const { deps, store } = createDeps(); + const reader = new MemberWorkSyncReconciler(deps); + const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' }); + store.pendingIntents.set('intent-1', { + id: 'intent-1', + teamName: 'team-a', + memberName: 'bob', + status: 'pending', + reason: 'control_api_unavailable', + recordedAt: '2026-04-29T00:00:01.000Z', + request: { + teamName: 'team-a', + memberName: 'bob', + state: 'still_working', + agendaFingerprint: current.agenda.fingerprint, + reportToken: 'invalid-token', + leaseTtlMs: 120_000, + source: 'mcp', + }, + }); + + const summary = await new MemberWorkSyncPendingReportIntentReplayer(deps).replayTeam('team-a'); + + expect(summary).toEqual({ processed: 1, accepted: 0, rejected: 1, superseded: 0 }); + expect(store.pendingIntents.get('intent-1')).toMatchObject({ + status: 'rejected', + resultCode: 'invalid_report_token', + }); + expect(store.writes.at(-1)?.report).toMatchObject({ + accepted: false, + rejectionCode: 'invalid_report_token', + }); + }); + it('supersedes pending controller intents when the member runtime is inactive', async () => { const { deps, store } = createDeps(); const reader = new MemberWorkSyncReconciler(deps); diff --git a/test/features/member-work-sync/main/TeamInboxMemberWorkSyncNudgeSink.test.ts b/test/features/member-work-sync/main/TeamInboxMemberWorkSyncNudgeSink.test.ts index 021036c9..3ab07ef7 100644 --- a/test/features/member-work-sync/main/TeamInboxMemberWorkSyncNudgeSink.test.ts +++ b/test/features/member-work-sync/main/TeamInboxMemberWorkSyncNudgeSink.test.ts @@ -1,6 +1,5 @@ -import { describe, expect, it, vi } from 'vitest'; - import { TeamInboxMemberWorkSyncNudgeSink } from '@features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink'; +import { describe, expect, it, vi } from 'vitest'; import type { MemberWorkSyncInboxNudgePort } from '@features/member-work-sync/core/application'; @@ -32,7 +31,11 @@ describe('TeamInboxMemberWorkSyncNudgeSink', () => { const input = makeInput(); const inboxReader = { getMessagesFor: vi.fn(async () => [ - { messageId: input.messageId, workSyncPayloadHash: input.payloadHash }, + { + messageId: input.messageId, + messageKind: 'member_work_sync_nudge', + workSyncPayloadHash: input.payloadHash, + }, ]), }; const inboxWriter = { @@ -49,6 +52,309 @@ describe('TeamInboxMemberWorkSyncNudgeSink', () => { expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); }); + it('repairs an existing idempotent nudge row that is missing the current controlUrl', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => [ + { + messageId: input.messageId, + text: input.payload.text, + messageKind: 'member_work_sync_nudge', + workSyncPayloadHash: input.payloadHash, + }, + ]), + }; + const inboxWriter = { + sendMessage: vi.fn(), + updateMessageText: vi.fn(async () => ({ found: true, updated: true })), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => 'http://127.0.0.1:43123' + ); + + await expect(sink.insertIfAbsent(input)).resolves.toEqual({ + inserted: false, + messageId: input.messageId, + }); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + expect(inboxWriter.updateMessageText).toHaveBeenCalledWith('team-a', { + member: 'bob', + messageId: input.messageId, + text: `${input.payload.text}\nRequired control API: pass controlUrl "http://127.0.0.1:43123" in both member_work_sync_status and member_work_sync_report.`, + expectedMessageKind: 'member_work_sync_nudge', + expectedWorkSyncPayloadHash: input.payloadHash, + }); + }); + + it('refreshes a stale controlUrl on an existing idempotent nudge row', async () => { + const input = makeInput(); + const existingText = `${input.payload.text}\nRequired control API: pass controlUrl "http://127.0.0.1:11111" in both member_work_sync_status and member_work_sync_report.`; + const inboxReader = { + getMessagesFor: vi.fn(async () => [ + { + messageId: input.messageId, + text: existingText, + messageKind: 'member_work_sync_nudge', + workSyncPayloadHash: input.payloadHash, + }, + ]), + }; + const inboxWriter = { + sendMessage: vi.fn(), + updateMessageText: vi.fn(async () => ({ found: true, updated: true })), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => 'http://127.0.0.1:43123' + ); + + await expect(sink.insertIfAbsent(input)).resolves.toEqual({ + inserted: false, + messageId: input.messageId, + }); + + expect(inboxWriter.updateMessageText).toHaveBeenCalledWith( + 'team-a', + expect.objectContaining({ + text: `${input.payload.text}\nRequired control API: pass controlUrl "http://127.0.0.1:43123" in both member_work_sync_status and member_work_sync_report.`, + }) + ); + }); + + it('fails closed when an existing idempotent nudge needs controlUrl repair but resolver is unavailable', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => [ + { + messageId: input.messageId, + text: input.payload.text, + messageKind: 'member_work_sync_nudge', + workSyncPayloadHash: input.payloadHash, + }, + ]), + }; + const inboxWriter = { + sendMessage: vi.fn(), + updateMessageText: vi.fn(), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => null + ); + + await expect(sink.insertIfAbsent(input)).rejects.toThrow( + 'member work sync control URL unavailable' + ); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + expect(inboxWriter.updateMessageText).not.toHaveBeenCalled(); + }); + + it('fails closed when an existing idempotent nudge needs controlUrl repair but writer cannot update text', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => [ + { + messageId: input.messageId, + text: input.payload.text, + messageKind: 'member_work_sync_nudge', + workSyncPayloadHash: input.payloadHash, + }, + ]), + }; + const inboxWriter = { + sendMessage: vi.fn(), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => 'http://127.0.0.1:43123' + ); + + await expect(sink.insertIfAbsent(input)).rejects.toThrow( + 'member work sync inbox text update unavailable' + ); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + }); + + it('repairs a delivered nudge row by stable messageId without inserting a duplicate', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => [ + { + messageId: input.messageId, + text: input.payload.text, + messageKind: 'member_work_sync_nudge', + workSyncPayloadHash: input.payloadHash, + }, + ]), + }; + const inboxWriter = { + sendMessage: vi.fn(), + updateMessageText: vi.fn(async () => ({ found: true, updated: true })), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => 'http://127.0.0.1:43123' + ); + + await expect( + sink.repairIfPresent({ + teamName: input.teamName, + memberName: input.memberName, + messageId: input.messageId, + payloadHash: input.payloadHash, + payload: input.payload, + }) + ).resolves.toEqual({ found: true, repaired: true }); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + expect(inboxWriter.updateMessageText).toHaveBeenCalledWith( + 'team-a', + expect.objectContaining({ + messageId: input.messageId, + expectedWorkSyncPayloadHash: input.payloadHash, + }) + ); + }); + + it('reports direct repair as unrepaired when the guarded writer refuses the update', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => [ + { + messageId: input.messageId, + text: input.payload.text, + messageKind: 'member_work_sync_nudge', + workSyncPayloadHash: input.payloadHash, + }, + ]), + }; + const inboxWriter = { + sendMessage: vi.fn(), + updateMessageText: vi.fn(async () => ({ found: true, updated: false })), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => 'http://127.0.0.1:43123' + ); + + await expect( + sink.repairIfPresent({ + teamName: input.teamName, + memberName: input.memberName, + messageId: input.messageId, + payloadHash: input.payloadHash, + payload: input.payload, + }) + ).resolves.toEqual({ found: true, repaired: false }); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + expect(inboxWriter.updateMessageText).toHaveBeenCalledWith( + 'team-a', + expect.objectContaining({ + messageId: input.messageId, + expectedWorkSyncPayloadHash: input.payloadHash, + }) + ); + }); + + it('reports missing delivered rows during direct repair without inserting', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => []), + }; + const inboxWriter = { + sendMessage: vi.fn(), + updateMessageText: vi.fn(), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink(inboxReader as never, inboxWriter as never); + + await expect( + sink.repairIfPresent({ + teamName: input.teamName, + memberName: input.memberName, + messageId: input.messageId, + payloadHash: input.payloadHash, + payload: input.payload, + }) + ).resolves.toEqual({ found: false, repaired: false }); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + expect(inboxWriter.updateMessageText).not.toHaveBeenCalled(); + }); + + it('fails closed when direct repair finds a different payload hash', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => [ + { + messageId: input.messageId, + text: input.payload.text, + messageKind: 'member_work_sync_nudge', + workSyncPayloadHash: 'different-payload-hash', + }, + ]), + }; + const inboxWriter = { + sendMessage: vi.fn(), + updateMessageText: vi.fn(), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink(inboxReader as never, inboxWriter as never); + + await expect( + sink.repairIfPresent({ + teamName: input.teamName, + memberName: input.memberName, + messageId: input.messageId, + payloadHash: input.payloadHash, + payload: input.payload, + }) + ).resolves.toEqual({ found: true, repaired: false, conflict: true }); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + expect(inboxWriter.updateMessageText).not.toHaveBeenCalled(); + }); + + it('does not rewrite an existing idempotent nudge row with the current controlUrl', async () => { + const input = makeInput(); + const existingText = `${input.payload.text}\nRequired control API: pass controlUrl "http://127.0.0.1:43123" in both member_work_sync_status and member_work_sync_report.`; + const inboxReader = { + getMessagesFor: vi.fn(async () => [ + { + messageId: input.messageId, + text: existingText, + messageKind: 'member_work_sync_nudge', + workSyncPayloadHash: input.payloadHash, + }, + ]), + }; + const inboxWriter = { + sendMessage: vi.fn(), + updateMessageText: vi.fn(async () => ({ found: true, updated: true })), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => 'http://127.0.0.1:43123' + ); + + await expect(sink.insertIfAbsent(input)).resolves.toEqual({ + inserted: false, + messageId: input.messageId, + }); + + expect(inboxWriter.updateMessageText).not.toHaveBeenCalled(); + }); + it('fails closed when the existing stable messageId has a different payload hash', async () => { const input = makeInput(); const inboxReader = { @@ -70,6 +376,48 @@ describe('TeamInboxMemberWorkSyncNudgeSink', () => { expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); }); + it('fails closed when the existing stable messageId is not a work-sync nudge row', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => [ + { + messageId: input.messageId, + messageKind: 'task_comment_notification', + workSyncPayloadHash: input.payloadHash, + text: input.payload.text, + }, + ]), + }; + const inboxWriter = { + sendMessage: vi.fn(), + updateMessageText: vi.fn(), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => 'http://127.0.0.1:43123' + ); + + await expect(sink.insertIfAbsent(input)).resolves.toEqual({ + inserted: false, + messageId: input.messageId, + conflict: true, + }); + + await expect( + sink.repairIfPresent({ + teamName: input.teamName, + memberName: input.memberName, + messageId: input.messageId, + payloadHash: input.payloadHash, + payload: input.payload, + }) + ).resolves.toEqual({ found: true, repaired: false, conflict: true }); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + expect(inboxWriter.updateMessageText).not.toHaveBeenCalled(); + }); + it('treats legacy work-sync rows without payload hash as conflicts', async () => { const input = makeInput(); const inboxReader = { @@ -129,6 +477,50 @@ describe('TeamInboxMemberWorkSyncNudgeSink', () => { }); }); + it('does not insert a new nudge when a configured controlUrl resolver returns null', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => []), + }; + const inboxWriter = { + sendMessage: vi.fn(), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => null + ); + + await expect(sink.insertIfAbsent(input)).rejects.toThrow( + 'member work sync control URL unavailable' + ); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + }); + + it('does not insert a new nudge when a configured controlUrl resolver fails', async () => { + const input = makeInput(); + const inboxReader = { + getMessagesFor: vi.fn(async () => []), + }; + const inboxWriter = { + sendMessage: vi.fn(), + }; + const sink = new TeamInboxMemberWorkSyncNudgeSink( + inboxReader as never, + inboxWriter as never, + () => { + throw new Error('sidecar failed'); + } + ); + + await expect(sink.insertIfAbsent(input)).rejects.toThrow( + 'member work sync control URL unavailable' + ); + + expect(inboxWriter.sendMessage).not.toHaveBeenCalled(); + }); + it('propagates reader failures so dispatch can classify the attempt', async () => { const input = makeInput(); const inboxReader = { diff --git a/test/features/member-work-sync/main/createMemberWorkSyncFeature.test.ts b/test/features/member-work-sync/main/createMemberWorkSyncFeature.test.ts index 45a4d82c..692cc2cd 100644 --- a/test/features/member-work-sync/main/createMemberWorkSyncFeature.test.ts +++ b/test/features/member-work-sync/main/createMemberWorkSyncFeature.test.ts @@ -3,6 +3,7 @@ import { buildMemberWorkSyncRuntimeTurnSettledEnvironment, createMemberWorkSyncFeature, } from '@features/member-work-sync/main'; +import { HmacMemberWorkSyncReportTokenAdapter } from '@features/member-work-sync/main/infrastructure/HmacMemberWorkSyncReportTokenAdapter'; import { JsonMemberWorkSyncStore } from '@features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore'; import { MemberWorkSyncStorePaths } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths'; import { NodeHashAdapter } from '@features/member-work-sync/main/infrastructure/NodeHashAdapter'; @@ -1555,6 +1556,130 @@ describe('createMemberWorkSyncFeature composition', () => { } }); + it('keeps config provider when runtime member meta omits it before native stale recovery', async () => { + const claudeRoot = makeTempRoot(); + setClaudeBasePathOverride(claudeRoot); + const teamsBasePath = getTeamsBasePath(); + const teamName = 'team-native-stale-meta-provider'; + const memberName = 'nickname'; + const nudgeDeliveryWake = { + schedule: vi.fn(async () => undefined), + }; + const feature = createMemberWorkSyncFeature({ + teamsBasePath, + configReader: { + getConfig: vi.fn(async () => ({ + name: teamName, + members: [{ name: 'NickName', providerId: 'codex', model: 'gpt-5.5' }], + })), + } as never, + taskReader: { + getTasks: vi.fn(async () => [ + { + id: 'task-1', + displayId: '11111111', + subject: 'Review landing', + status: 'in_progress', + owner: 'NickName', + }, + ]), + } as never, + kanbanManager: { + getState: vi.fn(async () => ({ + teamName, + reviewers: [], + tasks: {}, + })), + } as never, + membersMetaStore: { + getMembers: vi.fn(async () => [ + { + name: 'NickName', + role: 'developer', + agentType: 'general-purpose', + color: 'blue', + }, + ]), + } as never, + isTeamActive: vi.fn(async () => true), + nudgeDeliveryWake, + queueQuietWindowMs: 1, + }); + + try { + feature.noteTeamChange({ type: 'task', teamName, taskId: 'task-1' } as never); + + let agendaFingerprint = ''; + await waitForAssertion(async () => { + const status = await feature.getStatus({ teamName, memberName }); + expect(status).toMatchObject({ + state: 'needs_sync', + providerId: 'codex', + diagnostics: expect.arrayContaining(['no_current_report']), + agenda: { + items: [ + expect.objectContaining({ + reason: 'owned_in_progress_task', + evidence: expect.objectContaining({ status: 'in_progress' }), + }), + ], + }, + }); + agendaFingerprint = status.agenda.fingerprint; + }); + expect(await readInboxMessages({ teamsBasePath, teamName, memberName })).toEqual([]); + + await seedNativeStaleInProgressBlockingMetrics({ + teamsBasePath, + teamName, + memberName, + agendaFingerprint, + }); + feature.noteTeamChange({ type: 'task', teamName, taskId: 'task-1' } as never); + + await waitForAssertion(async () => { + const nudges = (await readInboxMessages({ teamsBasePath, teamName, memberName })).filter( + (message) => message.messageKind === 'member_work_sync_nudge' + ); + expect(nudges).toHaveLength(1); + expect(nudges[0]?.text).toContain('Work sync check'); + expect(nudges[0]?.text).toContain('11111111'); + expect(nudgeDeliveryWake.schedule).toHaveBeenCalledWith({ + teamName, + memberName, + messageId: nudges[0]?.messageId, + providerId: 'codex', + reason: 'member_work_sync_nudge_inserted', + delayMs: 500, + }); + expect( + Object.values(await readMemberOutboxItems({ teamsBasePath, teamName, memberName })) + ).toEqual([ + expect.objectContaining({ + status: 'delivered', + deliveredMessageId: nudges[0]?.messageId, + }), + ]); + }); + + const journal = await fs.promises.readFile( + path.join( + teamsBasePath, + teamName, + 'members', + memberName, + '.member-work-sync', + 'journal.jsonl' + ), + 'utf8' + ); + expect(journal).toContain('"event":"nudge_delivered"'); + expect(journal).not.toContain('"reason":"blocking_metrics"'); + } finally { + await feature.dispose(); + } + }); + it('delivers native stale pending-work recovery nudges despite noisy global metrics', async () => { const claudeRoot = makeTempRoot(); setClaudeBasePathOverride(claudeRoot); @@ -3335,6 +3460,103 @@ describe('createMemberWorkSyncFeature composition', () => { } }); + it('keeps nudges retryable while configured controlUrl is unavailable and delivers after recovery', async () => { + const claudeRoot = makeTempRoot(); + setClaudeBasePathOverride(claudeRoot); + const teamsBasePath = getTeamsBasePath(); + const teamName = 'team-control-url-retry'; + const memberName = 'bob'; + let controlUrl: string | null = null; + const feature = createMemberWorkSyncFeature({ + teamsBasePath, + configReader: { + getConfig: vi.fn(async () => ({ + name: teamName, + members: [{ name: memberName, providerId: 'codex' }], + })), + } as never, + taskReader: { + getTasks: vi.fn(async () => [ + { + id: 'task-1', + displayId: '11111111', + subject: 'Ship sync after control URL recovery', + status: 'pending', + owner: memberName, + }, + ]), + } as never, + kanbanManager: { + getState: vi.fn(async () => ({ + teamName, + reviewers: [], + tasks: {}, + })), + } as never, + membersMetaStore: { + getMembers: vi.fn(async () => []), + } as never, + isTeamActive: vi.fn(async () => true), + queueQuietWindowMs: 1, + resolveControlUrl: vi.fn(async () => controlUrl), + }); + + try { + await seedShadowReadyMetrics({ teamsBasePath, teamName, memberName }); + feature.noteTeamChange({ type: 'task', teamName, taskId: 'task-1' } as never); + + await waitForAssertion(async () => { + expect(await readInboxMessages({ teamsBasePath, teamName, memberName })).toHaveLength(0); + expect( + Object.values(await readMemberOutboxItems({ teamsBasePath, teamName, memberName })) + ).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + status: 'failed_retryable', + lastError: expect.stringContaining('member work sync control URL unavailable'), + }), + ]) + ); + }); + await waitForQueueIdle(feature); + + controlUrl = 'http://127.0.0.1:43123'; + await forceRetryableOutboxDue({ + teamsBasePath, + teamName, + memberName, + nextAttemptAt: new Date(Date.now() - 1_000).toISOString(), + }); + + await expect(feature.dispatchDueNudges([teamName])).resolves.toEqual({ + claimed: 1, + delivered: 1, + superseded: 0, + retryable: 0, + terminal: 0, + }); + + const nudges = (await readInboxMessages({ teamsBasePath, teamName, memberName })).filter( + (message) => message.messageKind === 'member_work_sync_nudge' + ); + expect(nudges).toHaveLength(1); + expect(nudges[0]?.text).toContain('11111111'); + expect(nudges[0]?.text).toContain('controlUrl "http://127.0.0.1:43123"'); + expect( + Object.values(await readMemberOutboxItems({ teamsBasePath, teamName, memberName })) + ).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + status: 'delivered', + deliveredMessageId: expect.any(String), + }), + ]) + ); + } finally { + await feature.dispose(); + } + }); + it('respects watchdog cooldown and delivers after the retry window is due', async () => { const claudeRoot = makeTempRoot(); setClaudeBasePathOverride(claudeRoot); @@ -3978,6 +4200,252 @@ describe('createMemberWorkSyncFeature composition', () => { } }); + it('refreshes expired fallback pending report tokens through the real HMAC validator', async () => { + const claudeRoot = makeTempRoot(); + setClaudeBasePathOverride(claudeRoot); + const teamsBasePath = getTeamsBasePath(); + const teamName = 'team-expired-pending-report'; + const memberName = 'bob'; + const storePaths = new MemberWorkSyncStorePaths(teamsBasePath); + const feature = createMemberWorkSyncFeature({ + teamsBasePath, + configReader: { + getConfig: vi.fn(async () => ({ + name: teamName, + members: [{ name: memberName, providerId: 'codex' }], + })), + } as never, + taskReader: { + getTasks: vi.fn(async () => [ + { + id: 'task-1', + displayId: '11111111', + subject: 'Ship sync after expired fallback report', + status: 'pending', + owner: memberName, + }, + ]), + } as never, + kanbanManager: { + getState: vi.fn(async () => ({ + teamName, + reviewers: [], + tasks: {}, + })), + } as never, + membersMetaStore: { + getMembers: vi.fn(async () => []), + } as never, + isTeamActive: vi.fn(async () => true), + }); + + try { + const status = await feature.refreshStatus({ teamName, memberName }); + expect(status.reportToken).toBeTruthy(); + const expiredToken = await new HmacMemberWorkSyncReportTokenAdapter(storePaths).create({ + teamName, + memberName, + agendaFingerprint: status.agenda.fingerprint, + issuedAt: new Date(Date.now() - 60 * 60_000).toISOString(), + }); + const store = new JsonMemberWorkSyncStore(storePaths); + await store.appendPendingReport( + { + teamName, + memberName, + state: 'still_working', + agendaFingerprint: status.agenda.fingerprint, + reportToken: expiredToken.token, + taskIds: ['task-1'], + source: 'mcp', + }, + 'control_api_unavailable' + ); + + await expect(feature.replayPendingReports([teamName])).resolves.toEqual({ + processed: 1, + accepted: 1, + rejected: 0, + superseded: 0, + }); + + const finalStatus = await feature.getStatus({ teamName, memberName }); + expect(finalStatus).toMatchObject({ + state: 'still_working', + report: { + accepted: true, + state: 'still_working', + taskIds: ['task-1'], + source: 'mcp', + }, + }); + const memberReports = JSON.parse( + await fs.promises.readFile( + path.join( + teamsBasePath, + teamName, + 'members', + memberName, + '.member-work-sync', + 'reports.json' + ), + 'utf8' + ) + ) as { + intents?: Record< + string, + { status?: string; resultCode?: string; request?: { reportToken?: string } } + >; + }; + expect(Object.values(memberReports.intents ?? {})).toContainEqual( + expect.objectContaining({ + status: 'accepted', + resultCode: 'accepted', + request: expect.objectContaining({ reportToken: expiredToken.token }), + }) + ); + } finally { + await feature.dispose(); + } + }); + + it('returns a reportable status with a token when no stored status exists', async () => { + const claudeRoot = makeTempRoot(); + setClaudeBasePathOverride(claudeRoot); + const teamsBasePath = getTeamsBasePath(); + const teamName = 'team-a'; + const memberName = 'bob'; + const feature = createMemberWorkSyncFeature({ + teamsBasePath, + configReader: { + getConfig: vi.fn(async () => ({ + name: teamName, + members: [{ name: memberName, providerId: 'codex' }], + })), + } as never, + taskReader: { + getTasks: vi.fn(async () => [ + { + id: 'task-1', + displayId: '11111111', + subject: 'Wake from first status call', + status: 'pending', + owner: memberName, + }, + ]), + } as never, + kanbanManager: { + getState: vi.fn(async () => ({ + teamName, + reviewers: [], + tasks: {}, + })), + } as never, + membersMetaStore: { + getMembers: vi.fn(async () => []), + } as never, + isTeamActive: vi.fn(async () => true), + }); + + try { + const status = await feature.getStatus({ teamName, memberName }); + expect(status).toMatchObject({ + state: 'needs_sync', + shadow: { reconciledBy: 'request' }, + }); + expect(status.reportToken).toBeTruthy(); + + await expect( + feature.report({ + teamName, + memberName, + state: 'still_working', + agendaFingerprint: status.agenda.fingerprint, + reportToken: status.reportToken, + taskIds: ['task-1'], + source: 'test', + }) + ).resolves.toMatchObject({ + accepted: true, + status: { state: 'still_working', report: { accepted: true } }, + }); + } finally { + await feature.dispose(); + } + }); + + it('refreshes an expired stored report token before returning status to a teammate', async () => { + const claudeRoot = makeTempRoot(); + setClaudeBasePathOverride(claudeRoot); + const teamsBasePath = getTeamsBasePath(); + const teamName = 'team-a'; + const memberName = 'bob'; + const feature = createMemberWorkSyncFeature({ + teamsBasePath, + configReader: { + getConfig: vi.fn(async () => ({ + name: teamName, + members: [{ name: memberName, providerId: 'codex' }], + })), + } as never, + taskReader: { + getTasks: vi.fn(async () => [ + { + id: 'task-1', + displayId: '11111111', + subject: 'Wake with expired token', + status: 'pending', + owner: memberName, + }, + ]), + } as never, + kanbanManager: { + getState: vi.fn(async () => ({ + teamName, + reviewers: [], + tasks: {}, + })), + } as never, + membersMetaStore: { + getMembers: vi.fn(async () => []), + } as never, + isTeamActive: vi.fn(async () => true), + }); + + try { + const current = await feature.refreshStatus({ teamName, memberName }); + const store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(teamsBasePath)); + const expiredToken = 'wrs:v1.expired-token-for-regression'; + await store.write({ + ...current, + reportToken: expiredToken, + reportTokenExpiresAt: new Date(Date.now() - 60_000).toISOString(), + }); + + const refreshed = await feature.getStatus({ teamName, memberName }); + expect(refreshed.reportToken).toBeTruthy(); + expect(refreshed.reportToken).not.toBe(expiredToken); + expect(Date.parse(refreshed.reportTokenExpiresAt ?? '')).toBeGreaterThan(Date.now()); + + await expect( + feature.report({ + teamName, + memberName, + state: 'still_working', + agendaFingerprint: refreshed.agenda.fingerprint, + reportToken: refreshed.reportToken, + taskIds: ['task-1'], + source: 'test', + }) + ).resolves.toMatchObject({ + accepted: true, + status: { state: 'still_working', report: { accepted: true } }, + }); + } finally { + await feature.dispose(); + } + }); + it('refreshes stale needs_sync into inactive after the whole team stops', async () => { const claudeRoot = makeTempRoot(); setClaudeBasePathOverride(claudeRoot); @@ -4030,15 +4498,9 @@ describe('createMemberWorkSyncFeature composition', () => { teamActive = false; await expect(feature.getStatus({ teamName, memberName })).resolves.toMatchObject({ - state: 'needs_sync', - diagnostics: expect.arrayContaining(['status_stale_refresh_enqueued']), - }); - await waitForQueueIdle(feature); - - await expect(store.read({ teamName, memberName })).resolves.toMatchObject({ state: 'inactive', diagnostics: expect.arrayContaining(['team_runtime_inactive']), - shadow: { reconciledBy: 'queue', triggerReasons: ['manual_refresh'] }, + shadow: { reconciledBy: 'request', triggerReasons: ['manual_refresh'] }, }); } finally { await feature.dispose(); diff --git a/test/main/services/team/MemberWorkSyncCodex.live.test.ts b/test/main/services/team/MemberWorkSyncCodex.live.test.ts index 3c507599..ef82cd3c 100644 --- a/test/main/services/team/MemberWorkSyncCodex.live.test.ts +++ b/test/main/services/team/MemberWorkSyncCodex.live.test.ts @@ -8,6 +8,14 @@ import { createMemberWorkSyncFeature, type MemberWorkSyncFeatureFacade, } from '../../../../src/features/member-work-sync/main'; +import { + buildCodexTrustedProjectConfigOverrides, + buildCodexWorkspaceTrustSettingsArgs, + type WorkspaceTrustArgsOnlyPlanRequest, + type WorkspaceTrustCoordinator, + type WorkspaceTrustLaunchArgPatch, + type WorkspaceTrustLaunchArgTargetSurface, +} from '../../../../src/features/workspace-trust/main'; import { getTeamsBasePath, setClaudeBasePathOverride, @@ -49,6 +57,13 @@ const liveDescribe = const DEFAULT_ORCHESTRATOR_CLI = '/Users/belief/dev/projects/claude/agent_teams_orchestrator/cli-source'; const DEFAULT_MODEL = 'gpt-5.4-mini'; const DEFAULT_EFFORT = 'low' as const; +const LIVE_CODEX_WORKSPACE_TRUST_TARGET_SURFACES: WorkspaceTrustLaunchArgTargetSurface[] = [ + 'primary_provider_args', + 'cross_provider_member_args', + 'provider_facts_probe', + 'default_model_probe', +]; +const VITEST_HOME_PREFIX = 'agent-teams-vitest-home-'; liveDescribe('Member work sync Codex live e2e', () => { let tempDir: string; @@ -593,6 +608,355 @@ liveDescribe('Member work sync Codex live e2e', () => { 420_000 ); + it( + 'wakes a real Codex teammate when runtime member meta omits provider metadata under noisy metrics', + async () => { + const orchestratorCli = process.env.CLAUDE_AGENT_TEAMS_ORCHESTRATOR_CLI_PATH?.trim(); + expect(orchestratorCli).toBeTruthy(); + await assertExecutable(orchestratorCli!); + + const model = process.env.MEMBER_WORK_SYNC_CODEX_MODEL?.trim() || DEFAULT_MODEL; + const effort = (process.env.MEMBER_WORK_SYNC_CODEX_EFFORT?.trim() || + DEFAULT_EFFORT) as 'low' | 'medium' | 'high' | 'xhigh'; + const requestedMemberName = 'NickName'; + const marker = `member-work-sync-codex-runtime-meta-${Date.now()}`; + teamName = `member-work-sync-codex-runtime-meta-${Date.now()}`; + const projectPath = path.join(tempDir, 'project'); + await fs.mkdir(projectPath, { recursive: true }); + await fs.writeFile( + path.join(projectPath, 'README.md'), + '# Member work sync Codex runtime meta live e2e\n\nKeep this project intentionally tiny.\n', + 'utf8' + ); + await trustProjectInTempClaudeGlobalConfig({ claudeRoot: tempClaudeRoot, projectPath }); + process.env.CLAUDE_CODE_CODEX_NATIVE_IGNORE_USER_CONFIG = 'false'; + if (ownsCodexHomeDir) { + await trustProjectInOwnedCodexHome({ codexHomeDir, projectPath }); + } + + const [ + { TeamProvisioningService }, + { TeamConfigReader }, + { TeamTaskReader }, + { TeamTaskWriter }, + { TeamKanbanManager }, + { TeamMembersMetaStore }, + { createCodexAccountFeature }, + { ProviderConnectionService }, + ] = await Promise.all([ + import('../../../../src/main/services/team/TeamProvisioningService'), + import('../../../../src/main/services/team/TeamConfigReader'), + import('../../../../src/main/services/team/TeamTaskReader'), + import('../../../../src/main/services/team/TeamTaskWriter'), + import('../../../../src/main/services/team/TeamKanbanManager'), + import('../../../../src/main/services/team/TeamMembersMetaStore'), + import('../../../../src/features/codex-account/main/composition/createCodexAccountFeature'), + import('../../../../src/main/services/runtime/ProviderConnectionService'), + ]); + + codexAccountFeature = createCodexAccountFeature({ + logger: { + info: () => undefined, + warn: () => undefined, + error: () => undefined, + }, + configManager: { + getConfig: () => ({ + providerConnections: { + codex: { + preferredAuthMode: hasLiveCodexApiKey() ? 'auto' : ('chatgpt' as const), + }, + }, + }), + }, + }); + providerConnectionService = ProviderConnectionService.getInstance(); + providerConnectionService.setCodexAccountFeature(codexAccountFeature); + + const provisioningService = new TeamProvisioningService(); + provisioningService.setWorkspaceTrustCoordinator(createCodexOnlyWorkspaceTrustCoordinator()); + svc = provisioningService; + const activeService = provisioningService; + const taskReader = new TeamTaskReader(); + const membersMetaStore = new TeamMembersMetaStore(); + feature = createMemberWorkSyncFeature({ + teamsBasePath: getTeamsBasePath(), + configReader: new TeamConfigReader(), + taskReader, + kanbanManager: new TeamKanbanManager(), + membersMetaStore, + isTeamActive: (name) => + activeService.isTeamAlive(name) || activeService.hasProvisioningRun(name), + listLifecycleActiveTeamNames: async () => [teamName!], + queueQuietWindowMs: 1, + resolveControlUrl: async () => controlServer?.baseUrl ?? null, + nudgeDeliveryWake: createLiveNudgeDeliveryWake(activeService), + }); + activeService.setTeamChangeEmitter((event: TeamChangeEvent) => + feature!.noteTeamChange(event) + ); + activeService.setRuntimeTurnSettledEnvironmentProvider((input) => + feature!.buildRuntimeTurnSettledEnvironment(input) + ); + controlServer = await startMemberWorkSyncControlServer(feature); + process.env.CLAUDE_TEAM_CONTROL_URL = controlServer.baseUrl; + activeService.setControlApiBaseUrlResolver(async () => controlServer?.baseUrl ?? null); + await fs.writeFile( + path.join(tempClaudeRoot, 'team-control-api.json'), + JSON.stringify({ baseUrl: controlServer.baseUrl }, null, 2), + 'utf8' + ); + + const progressEvents: TeamProvisioningProgress[] = []; + await activeService.createTeam( + { + teamName, + cwd: projectPath, + providerId: 'codex', + providerBackendId: 'codex-native', + model, + effort, + fastMode: 'off', + skipPermissions: true, + prompt: [ + 'Keep launch work minimal.', + 'If you receive a member_work_sync_nudge, do not complete the task.', + 'For a member_work_sync_nudge, call member_work_sync_status first.', + 'Then call member_work_sync_report with state "still_working", the returned agendaFingerprint/reportToken, and taskIds for the current agenda.', + `After member_work_sync_report is accepted, add one task comment containing exactly: ${marker}:still-working.`, + 'After that stop without a user-visible message.', + ].join(' '), + members: [ + { + name: requestedMemberName, + role: 'developer', + providerId: 'codex', + providerBackendId: 'codex-native', + model, + effort, + }, + ], + }, + (progress) => { + progressEvents.push(progress); + } + ); + + await waitUntil(async () => { + const last = progressEvents.at(-1); + if (last?.state === 'failed') { + throw new Error(formatProgressDump(progressEvents)); + } + return last?.state === 'ready'; + }, 240_000); + + const config = await new TeamConfigReader().getConfig(teamName); + const memberName = config?.members + ?.find((member) => sameMemberName(member.name, requestedMemberName)) + ?.name?.trim(); + expect(memberName).toBeTruthy(); + expect( + config?.members?.find((member) => sameMemberName(member.name, memberName!)) + ).toMatchObject({ + providerId: 'codex', + }); + + await stripMemberProviderMetadataFromMembersMeta({ + teamName, + memberName: memberName!, + fallbackRole: 'developer', + }); + expect( + (await membersMetaStore.getMembers(teamName)).find((member) => + sameMemberName(member.name, memberName!) + ) + ).toMatchObject({ + name: memberName, + providerId: undefined, + providerBackendId: undefined, + model: undefined, + effort: undefined, + }); + await waitUntil(async () => { + await feature!.drainRuntimeTurnSettledEvents(); + const diagnostics = feature!.getQueueDiagnostics(); + return diagnostics.queued === 0 && diagnostics.running === 0; + }, 60_000, 1_000, async () => + formatMemberWorkSyncDiagnostics({ + feature: feature!, + teamName: teamName!, + memberName: memberName!, + }) + ); + + const createdAt = new Date().toISOString(); + const taskId = `runtime-meta-${Date.now()}`; + const displayId = String(Date.now()).slice(-8); + await new TeamTaskWriter().createTask(teamName, { + id: taskId, + displayId, + subject: `Member work sync live runtime meta ${marker}`, + description: 'Verify native stale recovery when runtime member meta lacks provider fields.', + owner: memberName!, + createdBy: 'user', + status: 'in_progress', + projectPath, + createdAt, + updatedAt: createdAt, + }); + feature.noteTeamChange({ type: 'task', teamName, taskId }); + + let agendaFingerprint = ''; + await waitUntil(async () => { + const status = await feature!.refreshStatus({ teamName: teamName!, memberName: memberName! }); + if (!status.agenda.items.some((item) => item.taskId === taskId)) { + return false; + } + expect(status).toMatchObject({ + state: 'needs_sync', + providerId: 'codex', + diagnostics: expect.arrayContaining(['no_current_report']), + }); + expect(status.agenda.items).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + taskId, + reason: 'owned_in_progress_task', + evidence: expect.objectContaining({ status: 'in_progress' }), + }), + ]) + ); + agendaFingerprint = status.agenda.fingerprint; + return true; + }, 60_000, 500, async () => + formatMemberWorkSyncDiagnostics({ + feature: feature!, + teamName: teamName!, + memberName: memberName!, + taskId, + }) + ); + await waitUntil(async () => { + const diagnostics = feature!.getQueueDiagnostics(); + return diagnostics.queued === 0 && diagnostics.running === 0; + }, 30_000, 500, async () => + formatMemberWorkSyncDiagnostics({ + feature: feature!, + teamName: teamName!, + memberName: memberName!, + taskId, + }) + ); + const stableStatus = await feature.refreshStatus({ + teamName, + memberName: memberName!, + }); + expect(stableStatus.providerId).toBe('codex'); + expect(stableStatus.agenda.fingerprint).toBe(agendaFingerprint); + expect( + (await readInboxMessages(teamName, memberName!)).filter( + (message) => message.messageKind === 'member_work_sync_nudge' + ) + ).toHaveLength(0); + + await seedNativeStaleBlockingMetrics({ + teamName, + memberName: memberName!, + agendaFingerprint, + }); + feature.noteTeamChange({ type: 'task', teamName, taskId }); + + await waitUntil(async () => { + const diagnostics = feature!.getQueueDiagnostics(); + return diagnostics.queued === 0 && diagnostics.running === 0; + }, 30_000, 500, async () => + formatMemberWorkSyncDiagnostics({ + feature: feature!, + teamName: teamName!, + memberName: memberName!, + taskId, + }) + ); + expect((await feature.getStatus({ teamName, memberName: memberName! })).providerId).toBe( + 'codex' + ); + + await waitUntil(async () => { + const nudges = (await readInboxMessages(teamName!, memberName!)).filter( + (message) => message.messageKind === 'member_work_sync_nudge' + ); + return nudges.length === 1; + }, 60_000, 1_000, async () => + formatMemberWorkSyncDiagnostics({ + feature: feature!, + teamName: teamName!, + memberName: memberName!, + taskId, + }) + ); + + const metrics = await feature.getMetrics({ teamName }); + expect(metrics.phase2Readiness.reasons).toContain('would_nudge_rate_high'); + const journalPath = path.join( + getTeamsBasePath(), + teamName, + 'members', + memberName!, + '.member-work-sync', + 'journal.jsonl' + ); + const journal = await fs.readFile(journalPath, 'utf8'); + const nudgeOutcomes = journal + .trim() + .split('\n') + .map((line) => JSON.parse(line) as { event?: string; reason?: string }) + .filter((event) => event.event === 'nudge_skipped' || event.event === 'nudge_delivered'); + expect(nudgeOutcomes).toContainEqual(expect.objectContaining({ event: 'nudge_delivered' })); + expect(nudgeOutcomes.at(-1)).toMatchObject({ event: 'nudge_delivered' }); + + await relayInboxIfNotAlreadyConsumed(activeService, memberName!); + + await waitUntil(async () => { + const fatalRuntimeMessage = await readFatalRuntimeMessage(teamName!); + if (fatalRuntimeMessage) { + throw new FatalWaitError(fatalRuntimeMessage); + } + await feature!.replayPendingReports([teamName!]); + const status = await feature!.getStatus({ teamName: teamName!, memberName: memberName! }); + return status.report?.accepted === true && status.report.state === 'still_working'; + }, 240_000, 2_000, async () => + formatMemberWorkSyncDiagnostics({ + feature: feature!, + teamName: teamName!, + memberName: memberName!, + taskId, + }) + ); + + const finalStatus = await feature.getStatus({ teamName, memberName: memberName! }); + expect(finalStatus.state).toBe('still_working'); + expect(finalStatus.report).toMatchObject({ + accepted: true, + state: 'still_working', + }); + await waitUntil(async () => { + await feature!.drainRuntimeTurnSettledEvents(); + const metas = await readRuntimeTurnSettledProcessedMetas(getTeamsBasePath()); + return metas.some( + ({ meta }) => + (meta.event as { provider?: unknown; teamName?: unknown } | undefined)?.provider === + 'codex' && + (meta.event as { provider?: unknown; teamName?: unknown } | undefined)?.teamName === + teamName + ); + }, 60_000); + await expect(feature.dispatchDueNudges([teamName])).resolves.toMatchObject({ + delivered: 0, + }); + }, + 480_000 + ); + it( 'lets a real Codex teammate complete the task and report caught-up after the board clears', async () => { @@ -852,6 +1216,173 @@ function resolveConnectedCodexHome(previousCodexHome: string | undefined): strin return path.join(os.userInfo().homedir, '.codex'); } +async function trustProjectInOwnedCodexHome(input: { + codexHomeDir: string; + projectPath: string; +}): Promise { + const [override] = buildCodexTrustedProjectConfigOverrides([input.projectPath], { + maxOverrides: 1, + }); + if (!override) { + return; + } + await fs.mkdir(input.codexHomeDir, { recursive: true }); + await fs.appendFile(path.join(input.codexHomeDir, 'config.toml'), `\n${override}\n`, 'utf8'); +} + +async function trustProjectInTempClaudeGlobalConfig(input: { + claudeRoot: string; + projectPath: string; +}): Promise { + const projectRealPath = await fs.realpath(input.projectPath).catch(() => input.projectPath); + const projects = Object.fromEntries( + [...new Set([input.projectPath, projectRealPath])].map((projectPath) => [ + projectPath, + { + allowedTools: [], + mcpContextUris: [], + mcpServers: {}, + enabledMcpjsonServers: [], + disabledMcpjsonServers: [], + projectOnboardingSeenCount: 0, + hasClaudeMdExternalIncludesApproved: false, + hasClaudeMdExternalIncludesWarningShown: false, + hasTrustDialogAccepted: true, + }, + ]) + ); + const configPaths = [path.join(input.claudeRoot, '.claude.json')]; + const homeDir = process.env.HOME?.trim(); + if (homeDir && path.basename(homeDir).startsWith(VITEST_HOME_PREFIX)) { + configPaths.push(path.join(homeDir, '.claude.json')); + } + + for (const configPath of configPaths) { + await fs.mkdir(path.dirname(configPath), { recursive: true }); + await fs.writeFile(configPath, `${JSON.stringify({ projects }, null, 2)}\n`, 'utf8'); + } +} + +function createCodexOnlyWorkspaceTrustCoordinator(): WorkspaceTrustCoordinator { + return { + async planArgsOnly(request) { + return { launchArgPatches: buildLiveCodexWorkspaceTrustPatches(request) }; + }, + async planFull(request) { + return { + workspaces: request.workspaces, + launchArgPatches: buildLiveCodexWorkspaceTrustPatches(request), + }; + }, + async execute(plan) { + return { + id: 'member-work-sync-codex-live-workspace-trust', + provider: 'claude', + status: 'skipped', + workspaceIds: plan.workspaces.map((workspace) => workspace.id), + evidence: ['live test injects Codex native trusted-project settings'], + }; + }, + }; +} + +function buildLiveCodexWorkspaceTrustPatches( + request: WorkspaceTrustArgsOnlyPlanRequest +): WorkspaceTrustLaunchArgPatch[] { + if ( + !request.featureFlags.enabled || + !request.featureFlags.codexArgs || + !request.providers.includes('codex') + ) { + return []; + } + + const configKeys = request.workspaces.flatMap((workspace) => [ + workspace.configKeyCwd, + workspace.realCwd, + ...(workspace.gitRootConfigKey ? [workspace.gitRootConfigKey] : []), + ]); + const overrides = buildCodexTrustedProjectConfigOverrides(configKeys); + const args = buildCodexWorkspaceTrustSettingsArgs(overrides); + if (args.length === 0) { + return []; + } + + const workspaceIds = request.workspaces.map((workspace) => workspace.id); + return (request.targetSurfaces ?? LIVE_CODEX_WORKSPACE_TRUST_TARGET_SURFACES).map((surface) => ({ + id: `member-work-sync-codex-live-workspace-trust:${surface}`, + owner: 'workspace-trust', + targetProvider: 'codex', + targetSurface: surface, + dialect: 'claude-codex-runtime-settings', + args, + dedupeKey: `member-work-sync-codex-live-workspace-trust:${surface}:${overrides.join('|')}`, + sourceWorkspaceIds: workspaceIds, + reason: 'Trust the live e2e project for Codex native headless teammate startup.', + })); +} + +function sameMemberName(left: string | undefined, right: string | undefined): boolean { + return left?.trim().toLowerCase() === right?.trim().toLowerCase(); +} + +async function stripMemberProviderMetadataFromMembersMeta(input: { + teamName: string; + memberName: string; + fallbackRole: string; +}): Promise { + const metaPath = path.join(getTeamsBasePath(), input.teamName, 'members.meta.json'); + const raw = await fs.readFile(metaPath, 'utf8').catch(() => '{"version":1,"members":[]}'); + const parsed = JSON.parse(raw) as { providerBackendId?: unknown; members?: unknown }; + const sourceMembers = Array.isArray(parsed.members) ? parsed.members : []; + let found = false; + const members = sourceMembers.flatMap((member): Record[] => { + if (!member || typeof member !== 'object') { + return []; + } + const source = member as Record; + const name = typeof source.name === 'string' ? source.name.trim() : ''; + if (!name) { + return []; + } + if (!sameMemberName(name, input.memberName)) { + return [source]; + } + + found = true; + const stripped: Record = { name }; + for (const key of ['role', 'workflow', 'isolation', 'agentType', 'color', 'agentId', 'cwd']) { + if (typeof source[key] === 'string' && source[key].trim()) { + stripped[key] = source[key]; + } + } + for (const key of ['joinedAt', 'removedAt']) { + if (typeof source[key] === 'number') { + stripped[key] = source[key]; + } + } + return [stripped]; + }); + + if (!found) { + members.push({ + name: input.memberName, + role: input.fallbackRole, + agentType: 'general-purpose', + joinedAt: Date.now(), + }); + } + + const payload = { + version: 1, + ...(typeof parsed.providerBackendId === 'string' + ? { providerBackendId: parsed.providerBackendId } + : {}), + members, + }; + await fs.writeFile(metaPath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8'); +} + async function seedShadowReadyMetrics(input: { teamName: string; memberName: string; @@ -899,6 +1430,68 @@ async function seedShadowReadyMetrics(input: { ); } +async function seedNativeStaleBlockingMetrics(input: { + teamName: string; + memberName: string; + agendaFingerprint: string; +}): Promise { + const metricsPath = path.join( + getTeamsBasePath(), + input.teamName, + '.member-work-sync', + 'indexes', + 'metrics.json' + ); + const nowMs = Date.now(); + const staleObservedAt = new Date(nowMs - 6 * 60_000 - 1_000).toISOString(); + await fs.mkdir(path.dirname(metricsPath), { recursive: true }); + await fs.writeFile( + metricsPath, + `${JSON.stringify( + { + schemaVersion: 2, + members: { + [input.memberName]: { + memberName: input.memberName, + state: 'needs_sync', + agendaFingerprint: input.agendaFingerprint, + actionableCount: 1, + evaluatedAt: staleObservedAt, + providerId: 'codex', + }, + }, + recentEvents: [ + { + id: 'native-stale-status', + teamName: input.teamName, + memberName: input.memberName, + kind: 'status_evaluated', + state: 'needs_sync', + agendaFingerprint: input.agendaFingerprint, + recordedAt: staleObservedAt, + actionableCount: 1, + providerId: 'codex', + }, + ...Array.from({ length: 12 }, (_, index) => ({ + id: `native-stale-would-nudge-${index}`, + teamName: input.teamName, + memberName: input.memberName, + kind: 'would_nudge', + state: 'needs_sync', + agendaFingerprint: input.agendaFingerprint, + recordedAt: new Date(nowMs - 5 * 60_000 + index * 5_000).toISOString(), + actionableCount: 1, + providerId: 'codex', + })), + ], + }, + null, + 2 + )}\n`, + 'utf8' + ); +} + async function readInboxMessages(teamName: string, memberName: string): Promise< Array<{ messageId?: string; diff --git a/test/main/services/team/OpenCodeAgendaSyncRecovery.safe-e2e.test.ts b/test/main/services/team/OpenCodeAgendaSyncRecovery.safe-e2e.test.ts index 608da7bf..70b1c3c0 100644 --- a/test/main/services/team/OpenCodeAgendaSyncRecovery.safe-e2e.test.ts +++ b/test/main/services/team/OpenCodeAgendaSyncRecovery.safe-e2e.test.ts @@ -13,7 +13,9 @@ import fs from 'fs'; import os from 'os'; import path from 'path'; import { afterEach, describe, expect, it, vi } from 'vitest'; +import type { Mock } from 'vitest'; +import type { MemberWorkSyncNudgeDeliveryWakePort } from '@features/member-work-sync/core/application/ports'; import type { InboxMessage, TaskRef } from '@shared/types/team'; const tempRoots: string[] = []; @@ -291,12 +293,22 @@ function buildProofMissingRecord(input: { }; } +type TestNudgeDeliveryWake = MemberWorkSyncNudgeDeliveryWakePort & { + schedule: Mock; +}; + +function createNudgeDeliveryWake(): TestNudgeDeliveryWake { + return { + schedule: vi.fn(async () => undefined), + }; +} + function createFeature(input: { teamsBasePath: string; teamName: string; memberName: string; service: TeamProvisioningService; - nudgeDeliveryWake: { schedule: ReturnType }; + nudgeDeliveryWake: TestNudgeDeliveryWake; providerId?: 'opencode' | 'codex'; }) { const providerId = input.providerId ?? 'opencode'; @@ -351,7 +363,7 @@ describe('OpenCode agenda-sync proof-missing recovery safe e2e', () => { const teamName = 'team-codex-agenda-sync-nudge'; const memberName = 'bob'; const service = new TeamProvisioningService(); - const nudgeDeliveryWake = { schedule: vi.fn(async () => undefined) }; + const nudgeDeliveryWake = createNudgeDeliveryWake(); const feature = createFeature({ teamsBasePath, teamName, @@ -415,7 +427,7 @@ describe('OpenCode agenda-sync proof-missing recovery safe e2e', () => { const taskRef: TaskRef = { teamName, taskId: 'task-1', displayId: '11111111' }; const foregroundMessageId = 'proof-missing-message-1'; const service = new TeamProvisioningService(); - const nudgeDeliveryWake = { schedule: vi.fn(async () => undefined) }; + const nudgeDeliveryWake = createNudgeDeliveryWake(); const feature = createFeature({ teamsBasePath, teamName, @@ -499,7 +511,7 @@ describe('OpenCode agenda-sync proof-missing recovery safe e2e', () => { const laneId = 'secondary:opencode:jack'; const taskRef: TaskRef = { teamName, taskId: 'task-1', displayId: '11111111' }; const service = new TeamProvisioningService(); - const nudgeDeliveryWake = { schedule: vi.fn(async () => undefined) }; + const nudgeDeliveryWake = createNudgeDeliveryWake(); const feature = createFeature({ teamsBasePath, teamName, diff --git a/test/main/services/team/TeamInboxWriter.test.ts b/test/main/services/team/TeamInboxWriter.test.ts index 2ef629a1..31b1fb7b 100644 --- a/test/main/services/team/TeamInboxWriter.test.ts +++ b/test/main/services/team/TeamInboxWriter.test.ts @@ -177,6 +177,55 @@ describe('TeamInboxWriter', () => { }); }); + it('updates an existing member-work-sync row text when message kind and payload hash match', async () => { + await writer.sendMessage('my-team', { + member: 'alice', + text: 'sync your work state', + source: 'system_notification', + messageId: 'work-sync-1', + messageKind: 'member_work_sync_nudge', + workSyncIntent: 'agenda_sync', + workSyncPayloadHash: 'sha256:work-sync', + }); + + const result = await writer.updateMessageText('my-team', { + member: 'alice', + messageId: 'work-sync-1', + text: 'sync your work state\nRequired control API: pass controlUrl "http://127.0.0.1:43123" in both member_work_sync_status and member_work_sync_report.', + expectedMessageKind: 'member_work_sync_nudge', + expectedWorkSyncPayloadHash: 'sha256:work-sync', + }); + + const persisted = JSON.parse(hoisted.files.get(inboxPath) ?? '[]') as Record[]; + expect(result).toEqual({ found: true, updated: true }); + expect(persisted[0]?.text).toContain('controlUrl "http://127.0.0.1:43123"'); + expect(persisted[0]?.workSyncPayloadHash).toBe('sha256:work-sync'); + }); + + it('does not update member-work-sync row text when payload hash mismatches', async () => { + await writer.sendMessage('my-team', { + member: 'alice', + text: 'sync your work state', + source: 'system_notification', + messageId: 'work-sync-1', + messageKind: 'member_work_sync_nudge', + workSyncIntent: 'agenda_sync', + workSyncPayloadHash: 'sha256:work-sync', + }); + + const result = await writer.updateMessageText('my-team', { + member: 'alice', + messageId: 'work-sync-1', + text: 'should not write', + expectedMessageKind: 'member_work_sync_nudge', + expectedWorkSyncPayloadHash: 'sha256:different', + }); + + const persisted = JSON.parse(hoisted.files.get(inboxPath) ?? '[]') as Record[]; + expect(result).toEqual({ found: true, updated: false }); + expect(persisted[0]?.text).toBe('sync your work state'); + }); + it('preserves provided message identity fields for dedup across live and persisted rows', async () => { const result = await writer.sendMessage('my-team', { member: 'alice',