fix(member-work-sync): repair nudges and stale report tokens

This commit is contained in:
777genius 2026-06-06 21:19:52 +03:00
parent c322031542
commit c8a3ad07ac
18 changed files with 2166 additions and 50 deletions

View file

@ -1,6 +1,9 @@
import { decideMemberWorkSyncStatus } from '../domain';
import { finalizeMemberWorkSyncAgenda } from './MemberWorkSyncReconciler';
import {
attachMemberWorkSyncReportToken,
finalizeMemberWorkSyncAgenda,
} from './MemberWorkSyncReconciler';
import { resolveMemberWorkSyncRuntimeActivity } from './MemberWorkSyncRuntimeActivity';
import type { MemberWorkSyncStatus, MemberWorkSyncStatusRequest } from '../../contracts';
@ -28,7 +31,7 @@ export class MemberWorkSyncDiagnosticsReader {
inactive: source.inactive || runtimeActivity.inactive,
});
return {
return attachMemberWorkSyncReportToken(this.deps, {
teamName: agenda.teamName,
memberName: agenda.memberName,
state: decision.state,
@ -46,6 +49,6 @@ export class MemberWorkSyncDiagnosticsReader {
'status_snapshot_not_persisted',
],
...(source.providerId ? { providerId: source.providerId } : {}),
};
});
}
}

View file

@ -155,6 +155,21 @@ function shouldPlanDeliveredStillStuckRecovery(input: {
);
}
function shouldRepairDeliveredAgendaSyncNudge(input: {
status: MemberWorkSyncStatus;
requestedInput: MemberWorkSyncOutboxEnsureInput;
existingItem: MemberWorkSyncOutboxItem;
}): boolean {
return (
input.status.state === 'needs_sync' &&
input.requestedInput.payload.workSyncIntent === 'agenda_sync' &&
input.existingItem.status === 'delivered' &&
input.existingItem.agendaFingerprint === input.requestedInput.agendaFingerprint &&
input.existingItem.payloadHash === input.requestedInput.payloadHash &&
!hasActiveAcceptedWorkLease(input.status)
);
}
function isOutboxItemAwaitingDelivery(item: MemberWorkSyncOutboxItem): boolean {
return item.status !== 'delivered' && item.status !== 'failed_terminal';
}
@ -296,6 +311,7 @@ export class MemberWorkSyncNudgeOutboxPlanner {
await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' });
return { planned: false, code: 'payload_conflict' };
}
await this.repairDeliveredAgendaSyncNudgeIfNeeded(status, recoveryInput, recoveryResult.item);
if (activationReason) {
const deliveredStillStuckRecovery = await this.planDeliveredStillStuckRecovery(
@ -371,6 +387,7 @@ export class MemberWorkSyncNudgeOutboxPlanner {
await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' });
return { planned: false, code: 'payload_conflict' };
}
await this.repairDeliveredAgendaSyncNudgeIfNeeded(status, recoveryInput, recoveryResult.item);
const recoveryPlanned = isOutboxItemAwaitingDelivery(recoveryResult.item);
const recoveryPlanResult = {
@ -491,6 +508,11 @@ export class MemberWorkSyncNudgeOutboxPlanner {
await this.appendPlanAudit(status, { planned: false, code: 'payload_conflict' });
return { planned: false, code: 'payload_conflict' };
}
await this.repairDeliveredAgendaSyncNudgeIfNeeded(
status,
recoveryInput,
recoveryResult.item
);
if (
shouldPlanStatusOnlyRecovery({
status,
@ -544,6 +566,7 @@ export class MemberWorkSyncNudgeOutboxPlanner {
await this.appendPlanAudit(status, { planned: false, code });
return { planned: false, code };
}
await this.repairDeliveredAgendaSyncNudgeIfNeeded(status, input, result.item);
if (
shouldPlanStatusOnlyRecovery({
status,
@ -580,6 +603,37 @@ export class MemberWorkSyncNudgeOutboxPlanner {
return planResult;
}
private async repairDeliveredAgendaSyncNudgeIfNeeded(
status: MemberWorkSyncStatus,
requestedInput: MemberWorkSyncOutboxEnsureInput,
existingItem: MemberWorkSyncOutboxItem
): Promise<void> {
const inboxNudge = this.deps.inboxNudge;
if (
!inboxNudge?.repairIfPresent ||
!shouldRepairDeliveredAgendaSyncNudge({ status, requestedInput, existingItem })
) {
return;
}
try {
await inboxNudge.repairIfPresent({
teamName: status.teamName,
memberName: status.memberName,
messageId: existingItem.deliveredMessageId ?? existingItem.id,
payloadHash: existingItem.payloadHash,
payload: existingItem.payload,
});
} catch (error) {
this.deps.logger?.warn('member work sync delivered nudge repair failed', {
teamName: status.teamName,
memberName: status.memberName,
outboxId: existingItem.id,
error: String(error),
});
}
}
private async appendReviewPickupEscalationAudit(
status: MemberWorkSyncStatus,
reason: string

View file

@ -1,6 +1,10 @@
import { MemberWorkSyncReporter } from './MemberWorkSyncReporter';
import type { MemberWorkSyncReportIntentStatus } from '../../contracts';
import type {
MemberWorkSyncReportIntent,
MemberWorkSyncReportIntentStatus,
MemberWorkSyncReportResult,
} from '../../contracts';
import type { MemberWorkSyncUseCaseDeps } from './ports';
export interface MemberWorkSyncPendingReportReplaySummary {
@ -52,10 +56,7 @@ export class MemberWorkSyncPendingReportIntentReplayer {
let status: MemberWorkSyncReportIntentStatus = 'rejected';
let resultCode = 'replay_failed';
try {
const result = await this.reporter.execute({
...intent.request,
source: intent.request.source ?? 'mcp',
});
const result = await this.executeReplay(intent);
status = statusForResult(result);
resultCode = result.code;
} catch (error) {
@ -83,4 +84,56 @@ export class MemberWorkSyncPendingReportIntentReplayer {
return summary;
}
private async executeReplay(
intent: MemberWorkSyncReportIntent
): Promise<MemberWorkSyncReportResult> {
const result = await this.reporter.execute({
...intent.request,
source: intent.request.source ?? 'mcp',
});
const freshToken = await this.getFreshTokenForExpiredFallbackReport(intent, result);
if (!freshToken) {
return result;
}
return this.reporter.execute({
...intent.request,
agendaFingerprint: freshToken.agendaFingerprint,
reportToken: freshToken.reportToken,
source: intent.request.source ?? 'mcp',
});
}
private async getFreshTokenForExpiredFallbackReport(
intent: MemberWorkSyncReportIntent,
result: MemberWorkSyncReportResult
): Promise<{ agendaFingerprint: string; reportToken: string } | null> {
if (
result.accepted ||
result.code !== 'invalid_report_token' ||
intent.reason !== 'control_api_unavailable' ||
!intent.request.reportToken ||
!result.status.reportToken ||
result.status.agenda.fingerprint !== intent.request.agendaFingerprint ||
!this.deps.reportToken
) {
return null;
}
const validation = await this.deps.reportToken.verify({
token: intent.request.reportToken,
teamName: result.status.teamName,
memberName: result.status.memberName,
agendaFingerprint: result.status.agenda.fingerprint,
nowIso: this.deps.clock.now().toISOString(),
});
if (validation.ok || validation.reason !== 'expired') {
return null;
}
return {
agendaFingerprint: result.status.agenda.fingerprint,
reportToken: result.status.reportToken,
};
}
}

View file

@ -190,6 +190,13 @@ export interface MemberWorkSyncInboxNudgePort {
payload: MemberWorkSyncOutboxItem['payload'];
timestamp: string;
}): Promise<{ inserted: boolean; messageId: string; conflict?: boolean }>;
repairIfPresent?(input: {
teamName: string;
memberName: string;
messageId: string;
payloadHash: string;
payload: MemberWorkSyncOutboxItem['payload'];
}): Promise<{ found: boolean; repaired: boolean; conflict?: boolean }>;
}
export interface MemberWorkSyncWatchdogCooldownPort {

View file

@ -3,24 +3,48 @@ import { TeamInboxWriter } from '@main/services/team/TeamInboxWriter';
import type { MemberWorkSyncInboxNudgePort } from '../../../core/application';
type TeamInboxMemberWorkSyncNudgeInput = Parameters<
MemberWorkSyncInboxNudgePort['insertIfAbsent']
>[0];
type TeamInboxMemberWorkSyncNudgeRepairInput = Parameters<
NonNullable<MemberWorkSyncInboxNudgePort['repairIfPresent']>
>[0];
type TeamInboxMemberWorkSyncNudgeWriter = Pick<TeamInboxWriter, 'sendMessage'> &
Partial<Pick<TeamInboxWriter, 'updateMessageText'>>;
function isStoredMemberWorkSyncNudge(
message: Awaited<ReturnType<TeamInboxReader['getMessagesFor']>>[number]
): boolean {
return message.messageKind === 'member_work_sync_nudge';
}
export class TeamInboxMemberWorkSyncNudgeSink implements MemberWorkSyncInboxNudgePort {
constructor(
private readonly inboxReader: Pick<TeamInboxReader, 'getMessagesFor'> = new TeamInboxReader(),
private readonly inboxWriter: Pick<TeamInboxWriter, 'sendMessage'> = new TeamInboxWriter(),
private readonly inboxWriter: TeamInboxMemberWorkSyncNudgeWriter = new TeamInboxWriter(),
private readonly controlUrlResolver?: () => Promise<string | null> | string | null
) {}
async insertIfAbsent(input: Parameters<MemberWorkSyncInboxNudgePort['insertIfAbsent']>[0]) {
async insertIfAbsent(input: TeamInboxMemberWorkSyncNudgeInput) {
const existing = await this.inboxReader.getMessagesFor(input.teamName, input.memberName);
const existingMessage = existing.find((message) => message.messageId === input.messageId);
if (existingMessage) {
if (existingMessage.workSyncPayloadHash !== input.payloadHash) {
if (
existingMessage.workSyncPayloadHash !== input.payloadHash ||
!isStoredMemberWorkSyncNudge(existingMessage)
) {
return { inserted: false, messageId: input.messageId, conflict: true };
}
await this.repairExistingControlUrlIfNeeded(input, existingMessage.text, {
required: Boolean(this.controlUrlResolver),
});
return { inserted: false, messageId: input.messageId };
}
const controlUrl = await this.resolveControlUrl();
const controlUrl = await this.resolveControlUrl({
required: Boolean(this.controlUrlResolver),
});
const text = controlUrl
? this.withControlUrl(input.payload.text, controlUrl)
: input.payload.text;
@ -48,27 +72,89 @@ export class TeamInboxMemberWorkSyncNudgeSink implements MemberWorkSyncInboxNudg
};
}
private async resolveControlUrl(): Promise<string | null> {
async repairIfPresent(input: TeamInboxMemberWorkSyncNudgeRepairInput) {
const existing = await this.inboxReader.getMessagesFor(input.teamName, input.memberName);
const existingMessage = existing.find((message) => message.messageId === input.messageId);
if (!existingMessage) {
return { found: false, repaired: false };
}
if (
existingMessage.workSyncPayloadHash !== input.payloadHash ||
!isStoredMemberWorkSyncNudge(existingMessage)
) {
return { found: true, repaired: false, conflict: true };
}
const repaired = await this.repairExistingControlUrlIfNeeded(input, existingMessage.text, {
required: Boolean(this.controlUrlResolver),
});
return { found: true, repaired };
}
private async repairExistingControlUrlIfNeeded(
input: TeamInboxMemberWorkSyncNudgeRepairInput,
existingText: string | undefined,
options: { required?: boolean } = {}
): Promise<boolean> {
const controlUrl = await this.resolveControlUrl(options);
if (!controlUrl) {
return false;
}
const currentText = existingText ?? input.payload.text;
const repairedText = this.withControlUrl(currentText, controlUrl);
if (repairedText === currentText) {
return false;
}
if (typeof this.inboxWriter.updateMessageText !== 'function') {
if (options.required) {
throw new Error('member work sync inbox text update unavailable');
}
return false;
}
const result = await this.inboxWriter.updateMessageText(input.teamName, {
member: input.memberName,
messageId: input.messageId,
text: repairedText,
expectedMessageKind: 'member_work_sync_nudge',
expectedWorkSyncPayloadHash: input.payloadHash,
});
return result.updated;
}
private async resolveControlUrl(options: { required?: boolean } = {}): Promise<string | null> {
if (!this.controlUrlResolver) {
return null;
}
let value: string | null | undefined;
try {
const value = await this.controlUrlResolver();
const trimmed = value?.trim();
return trimmed ? trimmed : null;
} catch {
value = await this.controlUrlResolver();
} catch (error) {
if (options.required) {
throw new Error(`member work sync control URL unavailable: ${String(error)}`);
}
return null;
}
const trimmed = value?.trim();
if (trimmed) {
return trimmed;
}
if (options.required) {
throw new Error('member work sync control URL unavailable');
}
return null;
}
private withControlUrl(text: string, controlUrl: string): string {
if (text.includes('controlUrl')) {
const controlLine = `Required control API: pass controlUrl "${controlUrl}" in both member_work_sync_status and member_work_sync_report.`;
const existingControlLine =
/^Required control API: pass controlUrl "[^"\n]+" in both member_work_sync_status and member_work_sync_report\.$/m;
if (existingControlLine.test(text)) {
return text.replace(existingControlLine, controlLine);
}
if (text.includes(`controlUrl "${controlUrl}"`)) {
return text;
}
return [
text,
`Required control API: pass controlUrl "${controlUrl}" in both member_work_sync_status and member_work_sync_report.`,
].join('\n');
return [text, controlLine].join('\n');
}
}

View file

@ -12,7 +12,10 @@ interface StallJournalEntry {
alertedAt?: string;
}
type WatchdogCooldownResult = { active: boolean; retryAfterIso?: string };
interface WatchdogCooldownResult {
active: boolean;
retryAfterIso?: string;
}
function parseTime(value: string | undefined): number | null {
if (!value) {

View file

@ -3,6 +3,7 @@ import { describe, expect, it } from 'vitest';
import {
hasUncertainWorkSyncRuntimeActivity,
hasWorkSyncActiveRuntime,
hasWorkSyncReachableRuntime,
isRuntimeEntryActiveForWorkSync,
isRuntimeMemberActiveForWorkSync,
isRuntimeMemberActivityUncertainForWorkSync,
@ -87,6 +88,60 @@ describe('member work sync team activity', () => {
).toBe(false);
});
it('does not treat lead process evidence as active for ordinary teammates', () => {
for (const livenessKind of [undefined, 'runtime_process', 'confirmed_bootstrap'] as const) {
const snapshot = createRuntimeSnapshot({
alice: createRuntimeEntry({
memberName: 'alice',
backendType: 'process',
livenessKind,
pidSource: 'lead_process',
}),
});
expect(isRuntimeEntryActiveForWorkSync(snapshot.members.alice)).toBe(false);
expect(hasWorkSyncActiveRuntime(snapshot)).toBe(false);
expect(hasWorkSyncReachableRuntime(snapshot)).toBe(false);
expect(isRuntimeMemberActiveForWorkSync(snapshot, 'alice')).toBe(false);
}
});
it('keeps active lead processes reachable for targeted lead work-sync', () => {
const snapshot = createRuntimeSnapshot({
'team-lead': createRuntimeEntry({
memberName: 'team-lead',
backendType: 'lead',
livenessKind: undefined,
pidSource: 'lead_process',
}),
alice: createRuntimeEntry({
memberName: 'alice',
alive: false,
livenessKind: 'stale_metadata',
}),
});
expect(hasWorkSyncActiveRuntime(snapshot)).toBe(false);
expect(hasWorkSyncReachableRuntime(snapshot)).toBe(true);
expect(isRuntimeMemberActiveForWorkSync(snapshot, 'team-lead')).toBe(true);
expect(isRuntimeMemberActiveForWorkSync(snapshot, 'alice')).toBe(false);
});
it('keeps ordinary teammates named lead active from normal agent process evidence', () => {
const snapshot = createRuntimeSnapshot({
lead: createRuntimeEntry({
memberName: 'lead',
backendType: 'process',
livenessKind: 'confirmed_bootstrap',
pidSource: 'agent_process_table',
}),
});
expect(hasWorkSyncActiveRuntime(snapshot)).toBe(true);
expect(hasWorkSyncReachableRuntime(snapshot)).toBe(true);
expect(isRuntimeMemberActiveForWorkSync(snapshot, 'lead')).toBe(true);
});
it('does not treat inactive liveness diagnostics as active by themselves', () => {
for (const livenessKind of [
'permission_blocked',

View file

@ -88,6 +88,22 @@ function getAcceptedWorkLeaseStaleness(
return reportExpiresAtMs <= nowMs ? 'expired' : null;
}
function getReportTokenStaleness(
status: MemberWorkSyncStatus,
nowMs: number
): 'missing' | 'expired' | null {
if (!status.reportToken?.trim()) {
return 'missing';
}
const tokenExpiresAtMs = Date.parse(status.reportTokenExpiresAt ?? '');
if (!Number.isFinite(tokenExpiresAtMs) || !Number.isFinite(nowMs)) {
return 'missing';
}
return tokenExpiresAtMs <= nowMs ? 'expired' : null;
}
function isEmptyAgendaStaleState(status: MemberWorkSyncStatus): boolean {
return (
status.agenda.items.length === 0 &&
@ -99,6 +115,10 @@ function isEmptyAgendaStaleState(status: MemberWorkSyncStatus): boolean {
}
function statusNeedsBackgroundRefresh(status: MemberWorkSyncStatus, nowMs: number): boolean {
if (getReportTokenStaleness(status, nowMs) !== null) {
return true;
}
if (isEmptyAgendaStaleState(status)) {
return true;
}
@ -125,6 +145,13 @@ function statusNeedsBackgroundRefresh(status: MemberWorkSyncStatus, nowMs: numbe
function getStatusStalenessDiagnostics(status: MemberWorkSyncStatus, nowMs: number): string[] {
const diagnostics: string[] = [];
const tokenStaleness = getReportTokenStaleness(status, nowMs);
if (tokenStaleness === 'missing') {
diagnostics.push('report_token_missing_refresh_enqueued');
} else if (tokenStaleness === 'expired') {
diagnostics.push('report_token_expired_refresh_enqueued');
}
const evaluatedAtMs = Date.parse(status.evaluatedAt);
if (!Number.isFinite(evaluatedAtMs)) {
diagnostics.push('status_evaluated_at_invalid');
@ -150,6 +177,12 @@ function getStatusStalenessDiagnostics(status: MemberWorkSyncStatus, nowMs: numb
return [...new Set(diagnostics)];
}
function shouldRefreshStatusSynchronously(stalenessDiagnostics: string[]): boolean {
return stalenessDiagnostics.some(
(diagnostic) => diagnostic !== 'caught_up_stale_refresh_enqueued'
);
}
export function buildMemberWorkSyncRuntimeTurnSettledEnvironment(input: {
teamsBasePath: string;
provider: RuntimeTurnSettledProvider;
@ -505,6 +538,21 @@ export function createMemberWorkSyncFeature(deps: {
if (stalenessDiagnostics.length === 0) {
return status;
}
if (shouldRefreshStatusSynchronously(stalenessDiagnostics)) {
try {
return await reconciler.execute(request, {
reconciledBy: 'request',
triggerReasons: ['manual_refresh'],
});
} catch (error) {
deps.logger?.warn('member work sync synchronous status refresh failed', {
teamName: status.teamName,
memberName: status.memberName,
diagnostics: stalenessDiagnostics,
error: String(error),
});
}
}
queue.enqueue({
teamName: status.teamName,
memberName: status.memberName,

View file

@ -26,11 +26,46 @@ const WORK_SYNC_BOOTSTRAP_ONLY_PID_SOURCES = new Set<TeamAgentRuntimePidSource>(
'persisted_metadata',
]);
const WORK_SYNC_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES = new Set<TeamAgentRuntimePidSource>([
const WORK_SYNC_MEMBER_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES = new Set<TeamAgentRuntimePidSource>([
'agent_process_table',
'opencode_bridge',
]);
const WORK_SYNC_LEAD_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES = new Set<TeamAgentRuntimePidSource>([
'lead_process',
]);
function isWorkSyncLeadLikeMemberName(memberName: string): boolean {
const normalized = normalizeMemberName(memberName).replace(/[\s_]+/g, '-');
return (
normalized === 'lead' ||
normalized === 'team-lead' ||
normalized === 'teamlead' ||
normalized === 'team-leader'
);
}
function hasActiveWorkSyncProcessEvidence(
entry: Pick<TeamAgentRuntimeEntry, 'alive' | 'livenessKind' | 'pidSource'> | null | undefined,
confirmedBootstrapActivePidSources: ReadonlySet<TeamAgentRuntimePidSource>
): boolean {
if (entry?.alive !== true) {
return false;
}
if (
entry.livenessKind === 'confirmed_bootstrap' &&
(!entry.pidSource ||
WORK_SYNC_BOOTSTRAP_ONLY_PID_SOURCES.has(entry.pidSource) ||
!confirmedBootstrapActivePidSources.has(entry.pidSource))
) {
return false;
}
if (!entry.livenessKind) {
return true;
}
return !WORK_SYNC_INACTIVE_LIVENESS_KINDS.has(entry.livenessKind);
}
export function isRuntimeEntryActiveForWorkSync(
entry:
| Pick<
@ -40,7 +75,7 @@ export function isRuntimeEntryActiveForWorkSync(
| null
| undefined
): boolean {
if (entry?.alive !== true) {
if (!entry) {
return false;
}
if (
@ -50,17 +85,33 @@ export function isRuntimeEntryActiveForWorkSync(
return false;
}
if (
entry.livenessKind === 'confirmed_bootstrap' &&
(!entry.pidSource ||
WORK_SYNC_BOOTSTRAP_ONLY_PID_SOURCES.has(entry.pidSource) ||
!WORK_SYNC_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES.has(entry.pidSource))
entry.pidSource &&
WORK_SYNC_LEAD_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES.has(entry.pidSource)
) {
return false;
}
if (!entry.livenessKind) {
return true;
return hasActiveWorkSyncProcessEvidence(
entry,
WORK_SYNC_MEMBER_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES
);
}
function isRuntimeLeadEntryActiveForWorkSync(
entry:
| Pick<
TeamAgentRuntimeEntry,
'alive' | 'backendType' | 'livenessKind' | 'memberName' | 'pidSource'
>
| null
| undefined
): boolean {
if (!entry || !isWorkSyncLeadLikeMemberName(entry.memberName)) {
return false;
}
return !WORK_SYNC_INACTIVE_LIVENESS_KINDS.has(entry.livenessKind);
return (
entry.backendType === 'lead' &&
hasActiveWorkSyncProcessEvidence(entry, WORK_SYNC_LEAD_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES)
);
}
function isRuntimeEntryRelevantForWorkSync(
@ -95,6 +146,14 @@ export function hasWorkSyncActiveRuntime(
return Object.values(snapshot?.members ?? {}).some(isRuntimeEntryActiveForWorkSync);
}
export function hasWorkSyncReachableRuntime(
snapshot: Pick<TeamAgentRuntimeSnapshot, 'members'> | null | undefined
): boolean {
return Object.values(snapshot?.members ?? {}).some(
(entry) => isRuntimeEntryActiveForWorkSync(entry) || isRuntimeLeadEntryActiveForWorkSync(entry)
);
}
export function isRuntimeMemberActiveForWorkSync(
snapshot: Pick<TeamAgentRuntimeSnapshot, 'members'> | null | undefined,
memberName: string
@ -106,7 +165,9 @@ export function isRuntimeMemberActiveForWorkSync(
return Object.values(snapshot?.members ?? {}).some(
(entry) =>
normalizeMemberName(entry.memberName) === normalizedMemberName &&
isRuntimeEntryActiveForWorkSync(entry)
(isRuntimeEntryActiveForWorkSync(entry) ||
(isWorkSyncLeadLikeMemberName(normalizedMemberName) &&
isRuntimeLeadEntryActiveForWorkSync(entry)))
);
}

View file

@ -11,6 +11,7 @@ export {
export {
hasUncertainWorkSyncRuntimeActivity,
hasWorkSyncActiveRuntime,
hasWorkSyncReachableRuntime,
isRuntimeEntryActiveForWorkSync,
isRuntimeMemberActiveForWorkSync,
isRuntimeMemberActivityUncertainForWorkSync,

View file

@ -41,7 +41,7 @@ import {
buildMemberWorkSyncRuntimeTurnSettledEnvironment,
createMemberWorkSyncFeature,
hasUncertainWorkSyncRuntimeActivity,
hasWorkSyncActiveRuntime,
hasWorkSyncReachableRuntime,
isRuntimeMemberActivityUncertainForWorkSync,
isRuntimeMemberActiveForWorkSync,
type MemberWorkSyncFeatureFacade,
@ -1919,7 +1919,7 @@ async function initializeServices(): Promise<void> {
if (!snapshot) {
return null;
}
const active = hasWorkSyncActiveRuntime(snapshot);
const active = hasWorkSyncReachableRuntime(snapshot);
if (!active && hasUncertainWorkSyncRuntimeActivity(snapshot)) {
return null;
}
@ -2037,7 +2037,12 @@ async function initializeServices(): Promise<void> {
isBusy: (input) => teamProvisioningService.getOpenCodeMemberDeliveryBusyStatus(input),
},
],
resolveControlUrl: async () => getTeamControlApiBaseUrl(),
resolveControlUrl: async () => {
if (!httpServer.isRunning()) {
await startHttpServer(handleModeSwitch);
}
return getTeamControlApiBaseUrl();
},
proofMissingRecoveryGuard: {
shouldDispatch: async (input) => {
const isOpenCodeRecipient = await teamProvisioningService

View file

@ -6,9 +6,23 @@ import * as path from 'path';
import { atomicWriteAsync } from './atomicWrite';
import { withFileLock } from './fileLock';
import { withInboxLock } from './inboxLock';
import { getEffectiveInboxMessageId } from './inboxMessageIdentity';
import type { InboxMessage, SendMessageRequest, SendMessageResult, TaskRef } from '@shared/types';
export interface UpdateInboxMessageTextRequest {
member: string;
messageId: string;
text: string;
expectedMessageKind?: InboxMessage['messageKind'];
expectedWorkSyncPayloadHash?: string;
}
export interface UpdateInboxMessageTextResult {
found: boolean;
updated: boolean;
}
export interface MergeRuntimeDeliveryTaskRefsRequest {
inboxName: string;
messageId: string;
@ -137,6 +151,78 @@ export class TeamInboxWriter {
};
}
async updateMessageText(
teamName: string,
request: UpdateInboxMessageTextRequest
): Promise<UpdateInboxMessageTextResult> {
const messageId = request.messageId.trim();
if (!messageId) {
return { found: false, updated: false };
}
const inboxPath = path.join(getTeamsBasePath(), teamName, 'inboxes', `${request.member}.json`);
let result: UpdateInboxMessageTextResult = { found: false, updated: false };
await withFileLock(inboxPath, async () => {
await withInboxLock(inboxPath, async () => {
let raw: string;
try {
raw = await fs.promises.readFile(inboxPath, 'utf8');
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return;
}
throw error;
}
let parsed: unknown;
try {
parsed = JSON.parse(raw) as unknown;
} catch {
return;
}
if (!Array.isArray(parsed)) {
return;
}
let changed = false;
for (const item of parsed) {
if (!item || typeof item !== 'object') {
continue;
}
const row = item as Record<string, unknown>;
const rowMessageId = getEffectiveInboxMessageId(row);
if (rowMessageId !== messageId) {
continue;
}
result = { found: true, updated: changed };
if (request.expectedMessageKind && row.messageKind !== request.expectedMessageKind) {
continue;
}
if (
request.expectedWorkSyncPayloadHash &&
row.workSyncPayloadHash !== request.expectedWorkSyncPayloadHash
) {
continue;
}
if (row.text === request.text) {
continue;
}
row.text = request.text;
changed = true;
result = { found: true, updated: true };
}
if (!changed) {
return;
}
await atomicWriteAsync(inboxPath, JSON.stringify(parsed, null, 2));
});
});
return result;
}
async mergeRuntimeDeliveryTaskRefs(
teamName: string,
request: MergeRuntimeDeliveryTaskRefsRequest

View file

@ -311,8 +311,13 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort {
class InMemoryInboxNudge implements MemberWorkSyncInboxNudgePort {
readonly inserted: Array<Parameters<MemberWorkSyncInboxNudgePort['insertIfAbsent']>[0]> = [];
readonly repaired: Array<
Parameters<NonNullable<MemberWorkSyncInboxNudgePort['repairIfPresent']>>[0]
> = [];
fail = false;
conflict = false;
repairFail = false;
repairConflict = false;
async insertIfAbsent(input: Parameters<MemberWorkSyncInboxNudgePort['insertIfAbsent']>[0]) {
if (this.fail) {
@ -324,6 +329,19 @@ class InMemoryInboxNudge implements MemberWorkSyncInboxNudgePort {
this.inserted.push(input);
return { inserted: true, messageId: input.messageId };
}
async repairIfPresent(
input: Parameters<NonNullable<MemberWorkSyncInboxNudgePort['repairIfPresent']>>[0]
) {
if (this.repairFail) {
throw new Error('inbox repair unavailable');
}
if (this.repairConflict) {
return { found: true, repaired: false, conflict: true };
}
this.repaired.push(input);
return { found: true, repaired: true };
}
}
function createDeps(options?: {
@ -1885,6 +1903,19 @@ describe('MemberWorkSync use cases', () => {
item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:')
)
).toHaveLength(1);
expect(inbox.inserted).toHaveLength(2);
expect(inbox.repaired).toEqual(
expect.arrayContaining([
expect.objectContaining({
messageId: baseId,
payloadHash: outbox.items.get(baseId)?.payloadHash,
}),
expect.objectContaining({
messageId: recovery?.id,
payloadHash: recovery?.payloadHash,
}),
])
);
clock.set('2026-04-29T01:02:00.000Z');
store.metricsGeneratedAt = '2026-04-29T01:02:00.000Z';
@ -2007,6 +2038,33 @@ describe('MemberWorkSync use cases', () => {
expect(summary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 });
expect(inbox.inserted).toHaveLength(2);
expect(inbox.inserted[1]?.messageId).toContain('agenda-sync-still-stuck');
clock.set('2026-04-29T01:02:00.000Z');
store.phase2ReadinessState = 'shadow_ready';
store.phase2ReadinessReasons = [];
store.metricsGeneratedAt = '2026-04-29T01:02:00.000Z';
await reconciler.execute(
{
teamName: 'team-a',
memberName: 'bob',
},
{ reconciledBy: 'queue', triggerReasons: ['config_changed', 'task_changed'] }
);
const recoveryItems = [...outbox.items.values()].filter((item) =>
item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:')
);
expect(recoveryItems).toHaveLength(2);
expect(new Set(recoveryItems.map((item) => item.id)).size).toBe(2);
const secondSummary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
teamNames: ['team-a'],
claimedBy: 'test-dispatcher',
});
expect(secondSummary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 });
expect(inbox.inserted).toHaveLength(3);
expect(inbox.inserted[2]?.messageId).toContain('agenda-sync-still-stuck');
});
it('creates a delivered-still-stuck recovery for mixed review pickup and native work under noisy metrics', async () => {
@ -2130,6 +2188,15 @@ describe('MemberWorkSync use cases', () => {
item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:')
)
).toHaveLength(0);
expect(inbox.inserted).toHaveLength(1);
expect(inbox.repaired).toEqual([
expect.objectContaining({
teamName: 'team-a',
memberName: 'bob',
messageId: baseId,
payloadHash: outbox.items.get(baseId)?.payloadHash,
}),
]);
expect(auditEvents).toEqual(
expect.arrayContaining([
expect.objectContaining({
@ -2945,6 +3012,85 @@ describe('MemberWorkSync use cases', () => {
expect(store.writes.at(-1)?.state).toBe('still_working');
});
it('refreshes expired fallback pending report tokens during replay', async () => {
const { deps, store } = createDeps();
const reader = new MemberWorkSyncReconciler(deps);
const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' });
const baseReportToken = deps.reportToken!;
deps.reportToken = {
create: baseReportToken.create,
verify: async (input) =>
input.token === 'expired-token'
? { ok: false, reason: 'expired' }
: baseReportToken.verify(input),
};
store.pendingIntents.set('intent-1', {
id: 'intent-1',
teamName: 'team-a',
memberName: 'bob',
status: 'pending',
reason: 'control_api_unavailable',
recordedAt: '2026-04-29T00:16:00.000Z',
request: {
teamName: 'team-a',
memberName: 'bob',
state: 'still_working',
agendaFingerprint: current.agenda.fingerprint,
reportToken: 'expired-token',
leaseTtlMs: 120_000,
source: 'mcp',
},
});
const summary = await new MemberWorkSyncPendingReportIntentReplayer(deps).replayTeam('team-a');
expect(summary).toEqual({ processed: 1, accepted: 1, rejected: 0, superseded: 0 });
expect(store.pendingIntents.get('intent-1')).toMatchObject({
status: 'accepted',
resultCode: 'accepted',
});
expect(store.writes.at(-1)?.report).toMatchObject({
accepted: true,
source: 'mcp',
state: 'still_working',
});
});
it('rejects invalid fallback pending report tokens without refreshing identity', async () => {
const { deps, store } = createDeps();
const reader = new MemberWorkSyncReconciler(deps);
const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' });
store.pendingIntents.set('intent-1', {
id: 'intent-1',
teamName: 'team-a',
memberName: 'bob',
status: 'pending',
reason: 'control_api_unavailable',
recordedAt: '2026-04-29T00:00:01.000Z',
request: {
teamName: 'team-a',
memberName: 'bob',
state: 'still_working',
agendaFingerprint: current.agenda.fingerprint,
reportToken: 'invalid-token',
leaseTtlMs: 120_000,
source: 'mcp',
},
});
const summary = await new MemberWorkSyncPendingReportIntentReplayer(deps).replayTeam('team-a');
expect(summary).toEqual({ processed: 1, accepted: 0, rejected: 1, superseded: 0 });
expect(store.pendingIntents.get('intent-1')).toMatchObject({
status: 'rejected',
resultCode: 'invalid_report_token',
});
expect(store.writes.at(-1)?.report).toMatchObject({
accepted: false,
rejectionCode: 'invalid_report_token',
});
});
it('supersedes pending controller intents when the member runtime is inactive', async () => {
const { deps, store } = createDeps();
const reader = new MemberWorkSyncReconciler(deps);

View file

@ -1,6 +1,5 @@
import { describe, expect, it, vi } from 'vitest';
import { TeamInboxMemberWorkSyncNudgeSink } from '@features/member-work-sync/main/adapters/output/TeamInboxMemberWorkSyncNudgeSink';
import { describe, expect, it, vi } from 'vitest';
import type { MemberWorkSyncInboxNudgePort } from '@features/member-work-sync/core/application';
@ -32,7 +31,11 @@ describe('TeamInboxMemberWorkSyncNudgeSink', () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{ messageId: input.messageId, workSyncPayloadHash: input.payloadHash },
{
messageId: input.messageId,
messageKind: 'member_work_sync_nudge',
workSyncPayloadHash: input.payloadHash,
},
]),
};
const inboxWriter = {
@ -49,6 +52,309 @@ describe('TeamInboxMemberWorkSyncNudgeSink', () => {
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
});
it('repairs an existing idempotent nudge row that is missing the current controlUrl', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{
messageId: input.messageId,
text: input.payload.text,
messageKind: 'member_work_sync_nudge',
workSyncPayloadHash: input.payloadHash,
},
]),
};
const inboxWriter = {
sendMessage: vi.fn(),
updateMessageText: vi.fn(async () => ({ found: true, updated: true })),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => 'http://127.0.0.1:43123'
);
await expect(sink.insertIfAbsent(input)).resolves.toEqual({
inserted: false,
messageId: input.messageId,
});
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
expect(inboxWriter.updateMessageText).toHaveBeenCalledWith('team-a', {
member: 'bob',
messageId: input.messageId,
text: `${input.payload.text}\nRequired control API: pass controlUrl "http://127.0.0.1:43123" in both member_work_sync_status and member_work_sync_report.`,
expectedMessageKind: 'member_work_sync_nudge',
expectedWorkSyncPayloadHash: input.payloadHash,
});
});
it('refreshes a stale controlUrl on an existing idempotent nudge row', async () => {
const input = makeInput();
const existingText = `${input.payload.text}\nRequired control API: pass controlUrl "http://127.0.0.1:11111" in both member_work_sync_status and member_work_sync_report.`;
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{
messageId: input.messageId,
text: existingText,
messageKind: 'member_work_sync_nudge',
workSyncPayloadHash: input.payloadHash,
},
]),
};
const inboxWriter = {
sendMessage: vi.fn(),
updateMessageText: vi.fn(async () => ({ found: true, updated: true })),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => 'http://127.0.0.1:43123'
);
await expect(sink.insertIfAbsent(input)).resolves.toEqual({
inserted: false,
messageId: input.messageId,
});
expect(inboxWriter.updateMessageText).toHaveBeenCalledWith(
'team-a',
expect.objectContaining({
text: `${input.payload.text}\nRequired control API: pass controlUrl "http://127.0.0.1:43123" in both member_work_sync_status and member_work_sync_report.`,
})
);
});
it('fails closed when an existing idempotent nudge needs controlUrl repair but resolver is unavailable', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{
messageId: input.messageId,
text: input.payload.text,
messageKind: 'member_work_sync_nudge',
workSyncPayloadHash: input.payloadHash,
},
]),
};
const inboxWriter = {
sendMessage: vi.fn(),
updateMessageText: vi.fn(),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => null
);
await expect(sink.insertIfAbsent(input)).rejects.toThrow(
'member work sync control URL unavailable'
);
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
expect(inboxWriter.updateMessageText).not.toHaveBeenCalled();
});
it('fails closed when an existing idempotent nudge needs controlUrl repair but writer cannot update text', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{
messageId: input.messageId,
text: input.payload.text,
messageKind: 'member_work_sync_nudge',
workSyncPayloadHash: input.payloadHash,
},
]),
};
const inboxWriter = {
sendMessage: vi.fn(),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => 'http://127.0.0.1:43123'
);
await expect(sink.insertIfAbsent(input)).rejects.toThrow(
'member work sync inbox text update unavailable'
);
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
});
it('repairs a delivered nudge row by stable messageId without inserting a duplicate', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{
messageId: input.messageId,
text: input.payload.text,
messageKind: 'member_work_sync_nudge',
workSyncPayloadHash: input.payloadHash,
},
]),
};
const inboxWriter = {
sendMessage: vi.fn(),
updateMessageText: vi.fn(async () => ({ found: true, updated: true })),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => 'http://127.0.0.1:43123'
);
await expect(
sink.repairIfPresent({
teamName: input.teamName,
memberName: input.memberName,
messageId: input.messageId,
payloadHash: input.payloadHash,
payload: input.payload,
})
).resolves.toEqual({ found: true, repaired: true });
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
expect(inboxWriter.updateMessageText).toHaveBeenCalledWith(
'team-a',
expect.objectContaining({
messageId: input.messageId,
expectedWorkSyncPayloadHash: input.payloadHash,
})
);
});
it('reports direct repair as unrepaired when the guarded writer refuses the update', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{
messageId: input.messageId,
text: input.payload.text,
messageKind: 'member_work_sync_nudge',
workSyncPayloadHash: input.payloadHash,
},
]),
};
const inboxWriter = {
sendMessage: vi.fn(),
updateMessageText: vi.fn(async () => ({ found: true, updated: false })),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => 'http://127.0.0.1:43123'
);
await expect(
sink.repairIfPresent({
teamName: input.teamName,
memberName: input.memberName,
messageId: input.messageId,
payloadHash: input.payloadHash,
payload: input.payload,
})
).resolves.toEqual({ found: true, repaired: false });
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
expect(inboxWriter.updateMessageText).toHaveBeenCalledWith(
'team-a',
expect.objectContaining({
messageId: input.messageId,
expectedWorkSyncPayloadHash: input.payloadHash,
})
);
});
it('reports missing delivered rows during direct repair without inserting', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => []),
};
const inboxWriter = {
sendMessage: vi.fn(),
updateMessageText: vi.fn(),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(inboxReader as never, inboxWriter as never);
await expect(
sink.repairIfPresent({
teamName: input.teamName,
memberName: input.memberName,
messageId: input.messageId,
payloadHash: input.payloadHash,
payload: input.payload,
})
).resolves.toEqual({ found: false, repaired: false });
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
expect(inboxWriter.updateMessageText).not.toHaveBeenCalled();
});
it('fails closed when direct repair finds a different payload hash', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{
messageId: input.messageId,
text: input.payload.text,
messageKind: 'member_work_sync_nudge',
workSyncPayloadHash: 'different-payload-hash',
},
]),
};
const inboxWriter = {
sendMessage: vi.fn(),
updateMessageText: vi.fn(),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(inboxReader as never, inboxWriter as never);
await expect(
sink.repairIfPresent({
teamName: input.teamName,
memberName: input.memberName,
messageId: input.messageId,
payloadHash: input.payloadHash,
payload: input.payload,
})
).resolves.toEqual({ found: true, repaired: false, conflict: true });
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
expect(inboxWriter.updateMessageText).not.toHaveBeenCalled();
});
it('does not rewrite an existing idempotent nudge row with the current controlUrl', async () => {
const input = makeInput();
const existingText = `${input.payload.text}\nRequired control API: pass controlUrl "http://127.0.0.1:43123" in both member_work_sync_status and member_work_sync_report.`;
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{
messageId: input.messageId,
text: existingText,
messageKind: 'member_work_sync_nudge',
workSyncPayloadHash: input.payloadHash,
},
]),
};
const inboxWriter = {
sendMessage: vi.fn(),
updateMessageText: vi.fn(async () => ({ found: true, updated: true })),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => 'http://127.0.0.1:43123'
);
await expect(sink.insertIfAbsent(input)).resolves.toEqual({
inserted: false,
messageId: input.messageId,
});
expect(inboxWriter.updateMessageText).not.toHaveBeenCalled();
});
it('fails closed when the existing stable messageId has a different payload hash', async () => {
const input = makeInput();
const inboxReader = {
@ -70,6 +376,48 @@ describe('TeamInboxMemberWorkSyncNudgeSink', () => {
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
});
it('fails closed when the existing stable messageId is not a work-sync nudge row', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => [
{
messageId: input.messageId,
messageKind: 'task_comment_notification',
workSyncPayloadHash: input.payloadHash,
text: input.payload.text,
},
]),
};
const inboxWriter = {
sendMessage: vi.fn(),
updateMessageText: vi.fn(),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => 'http://127.0.0.1:43123'
);
await expect(sink.insertIfAbsent(input)).resolves.toEqual({
inserted: false,
messageId: input.messageId,
conflict: true,
});
await expect(
sink.repairIfPresent({
teamName: input.teamName,
memberName: input.memberName,
messageId: input.messageId,
payloadHash: input.payloadHash,
payload: input.payload,
})
).resolves.toEqual({ found: true, repaired: false, conflict: true });
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
expect(inboxWriter.updateMessageText).not.toHaveBeenCalled();
});
it('treats legacy work-sync rows without payload hash as conflicts', async () => {
const input = makeInput();
const inboxReader = {
@ -129,6 +477,50 @@ describe('TeamInboxMemberWorkSyncNudgeSink', () => {
});
});
it('does not insert a new nudge when a configured controlUrl resolver returns null', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => []),
};
const inboxWriter = {
sendMessage: vi.fn(),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => null
);
await expect(sink.insertIfAbsent(input)).rejects.toThrow(
'member work sync control URL unavailable'
);
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
});
it('does not insert a new nudge when a configured controlUrl resolver fails', async () => {
const input = makeInput();
const inboxReader = {
getMessagesFor: vi.fn(async () => []),
};
const inboxWriter = {
sendMessage: vi.fn(),
};
const sink = new TeamInboxMemberWorkSyncNudgeSink(
inboxReader as never,
inboxWriter as never,
() => {
throw new Error('sidecar failed');
}
);
await expect(sink.insertIfAbsent(input)).rejects.toThrow(
'member work sync control URL unavailable'
);
expect(inboxWriter.sendMessage).not.toHaveBeenCalled();
});
it('propagates reader failures so dispatch can classify the attempt', async () => {
const input = makeInput();
const inboxReader = {

View file

@ -3,6 +3,7 @@ import {
buildMemberWorkSyncRuntimeTurnSettledEnvironment,
createMemberWorkSyncFeature,
} from '@features/member-work-sync/main';
import { HmacMemberWorkSyncReportTokenAdapter } from '@features/member-work-sync/main/infrastructure/HmacMemberWorkSyncReportTokenAdapter';
import { JsonMemberWorkSyncStore } from '@features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore';
import { MemberWorkSyncStorePaths } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths';
import { NodeHashAdapter } from '@features/member-work-sync/main/infrastructure/NodeHashAdapter';
@ -1555,6 +1556,130 @@ describe('createMemberWorkSyncFeature composition', () => {
}
});
it('keeps config provider when runtime member meta omits it before native stale recovery', async () => {
const claudeRoot = makeTempRoot();
setClaudeBasePathOverride(claudeRoot);
const teamsBasePath = getTeamsBasePath();
const teamName = 'team-native-stale-meta-provider';
const memberName = 'nickname';
const nudgeDeliveryWake = {
schedule: vi.fn(async () => undefined),
};
const feature = createMemberWorkSyncFeature({
teamsBasePath,
configReader: {
getConfig: vi.fn(async () => ({
name: teamName,
members: [{ name: 'NickName', providerId: 'codex', model: 'gpt-5.5' }],
})),
} as never,
taskReader: {
getTasks: vi.fn(async () => [
{
id: 'task-1',
displayId: '11111111',
subject: 'Review landing',
status: 'in_progress',
owner: 'NickName',
},
]),
} as never,
kanbanManager: {
getState: vi.fn(async () => ({
teamName,
reviewers: [],
tasks: {},
})),
} as never,
membersMetaStore: {
getMembers: vi.fn(async () => [
{
name: 'NickName',
role: 'developer',
agentType: 'general-purpose',
color: 'blue',
},
]),
} as never,
isTeamActive: vi.fn(async () => true),
nudgeDeliveryWake,
queueQuietWindowMs: 1,
});
try {
feature.noteTeamChange({ type: 'task', teamName, taskId: 'task-1' } as never);
let agendaFingerprint = '';
await waitForAssertion(async () => {
const status = await feature.getStatus({ teamName, memberName });
expect(status).toMatchObject({
state: 'needs_sync',
providerId: 'codex',
diagnostics: expect.arrayContaining(['no_current_report']),
agenda: {
items: [
expect.objectContaining({
reason: 'owned_in_progress_task',
evidence: expect.objectContaining({ status: 'in_progress' }),
}),
],
},
});
agendaFingerprint = status.agenda.fingerprint;
});
expect(await readInboxMessages({ teamsBasePath, teamName, memberName })).toEqual([]);
await seedNativeStaleInProgressBlockingMetrics({
teamsBasePath,
teamName,
memberName,
agendaFingerprint,
});
feature.noteTeamChange({ type: 'task', teamName, taskId: 'task-1' } as never);
await waitForAssertion(async () => {
const nudges = (await readInboxMessages({ teamsBasePath, teamName, memberName })).filter(
(message) => message.messageKind === 'member_work_sync_nudge'
);
expect(nudges).toHaveLength(1);
expect(nudges[0]?.text).toContain('Work sync check');
expect(nudges[0]?.text).toContain('11111111');
expect(nudgeDeliveryWake.schedule).toHaveBeenCalledWith({
teamName,
memberName,
messageId: nudges[0]?.messageId,
providerId: 'codex',
reason: 'member_work_sync_nudge_inserted',
delayMs: 500,
});
expect(
Object.values(await readMemberOutboxItems({ teamsBasePath, teamName, memberName }))
).toEqual([
expect.objectContaining({
status: 'delivered',
deliveredMessageId: nudges[0]?.messageId,
}),
]);
});
const journal = await fs.promises.readFile(
path.join(
teamsBasePath,
teamName,
'members',
memberName,
'.member-work-sync',
'journal.jsonl'
),
'utf8'
);
expect(journal).toContain('"event":"nudge_delivered"');
expect(journal).not.toContain('"reason":"blocking_metrics"');
} finally {
await feature.dispose();
}
});
it('delivers native stale pending-work recovery nudges despite noisy global metrics', async () => {
const claudeRoot = makeTempRoot();
setClaudeBasePathOverride(claudeRoot);
@ -3335,6 +3460,103 @@ describe('createMemberWorkSyncFeature composition', () => {
}
});
it('keeps nudges retryable while configured controlUrl is unavailable and delivers after recovery', async () => {
const claudeRoot = makeTempRoot();
setClaudeBasePathOverride(claudeRoot);
const teamsBasePath = getTeamsBasePath();
const teamName = 'team-control-url-retry';
const memberName = 'bob';
let controlUrl: string | null = null;
const feature = createMemberWorkSyncFeature({
teamsBasePath,
configReader: {
getConfig: vi.fn(async () => ({
name: teamName,
members: [{ name: memberName, providerId: 'codex' }],
})),
} as never,
taskReader: {
getTasks: vi.fn(async () => [
{
id: 'task-1',
displayId: '11111111',
subject: 'Ship sync after control URL recovery',
status: 'pending',
owner: memberName,
},
]),
} as never,
kanbanManager: {
getState: vi.fn(async () => ({
teamName,
reviewers: [],
tasks: {},
})),
} as never,
membersMetaStore: {
getMembers: vi.fn(async () => []),
} as never,
isTeamActive: vi.fn(async () => true),
queueQuietWindowMs: 1,
resolveControlUrl: vi.fn(async () => controlUrl),
});
try {
await seedShadowReadyMetrics({ teamsBasePath, teamName, memberName });
feature.noteTeamChange({ type: 'task', teamName, taskId: 'task-1' } as never);
await waitForAssertion(async () => {
expect(await readInboxMessages({ teamsBasePath, teamName, memberName })).toHaveLength(0);
expect(
Object.values(await readMemberOutboxItems({ teamsBasePath, teamName, memberName }))
).toEqual(
expect.arrayContaining([
expect.objectContaining({
status: 'failed_retryable',
lastError: expect.stringContaining('member work sync control URL unavailable'),
}),
])
);
});
await waitForQueueIdle(feature);
controlUrl = 'http://127.0.0.1:43123';
await forceRetryableOutboxDue({
teamsBasePath,
teamName,
memberName,
nextAttemptAt: new Date(Date.now() - 1_000).toISOString(),
});
await expect(feature.dispatchDueNudges([teamName])).resolves.toEqual({
claimed: 1,
delivered: 1,
superseded: 0,
retryable: 0,
terminal: 0,
});
const nudges = (await readInboxMessages({ teamsBasePath, teamName, memberName })).filter(
(message) => message.messageKind === 'member_work_sync_nudge'
);
expect(nudges).toHaveLength(1);
expect(nudges[0]?.text).toContain('11111111');
expect(nudges[0]?.text).toContain('controlUrl "http://127.0.0.1:43123"');
expect(
Object.values(await readMemberOutboxItems({ teamsBasePath, teamName, memberName }))
).toEqual(
expect.arrayContaining([
expect.objectContaining({
status: 'delivered',
deliveredMessageId: expect.any(String),
}),
])
);
} finally {
await feature.dispose();
}
});
it('respects watchdog cooldown and delivers after the retry window is due', async () => {
const claudeRoot = makeTempRoot();
setClaudeBasePathOverride(claudeRoot);
@ -3978,6 +4200,252 @@ describe('createMemberWorkSyncFeature composition', () => {
}
});
it('refreshes expired fallback pending report tokens through the real HMAC validator', async () => {
const claudeRoot = makeTempRoot();
setClaudeBasePathOverride(claudeRoot);
const teamsBasePath = getTeamsBasePath();
const teamName = 'team-expired-pending-report';
const memberName = 'bob';
const storePaths = new MemberWorkSyncStorePaths(teamsBasePath);
const feature = createMemberWorkSyncFeature({
teamsBasePath,
configReader: {
getConfig: vi.fn(async () => ({
name: teamName,
members: [{ name: memberName, providerId: 'codex' }],
})),
} as never,
taskReader: {
getTasks: vi.fn(async () => [
{
id: 'task-1',
displayId: '11111111',
subject: 'Ship sync after expired fallback report',
status: 'pending',
owner: memberName,
},
]),
} as never,
kanbanManager: {
getState: vi.fn(async () => ({
teamName,
reviewers: [],
tasks: {},
})),
} as never,
membersMetaStore: {
getMembers: vi.fn(async () => []),
} as never,
isTeamActive: vi.fn(async () => true),
});
try {
const status = await feature.refreshStatus({ teamName, memberName });
expect(status.reportToken).toBeTruthy();
const expiredToken = await new HmacMemberWorkSyncReportTokenAdapter(storePaths).create({
teamName,
memberName,
agendaFingerprint: status.agenda.fingerprint,
issuedAt: new Date(Date.now() - 60 * 60_000).toISOString(),
});
const store = new JsonMemberWorkSyncStore(storePaths);
await store.appendPendingReport(
{
teamName,
memberName,
state: 'still_working',
agendaFingerprint: status.agenda.fingerprint,
reportToken: expiredToken.token,
taskIds: ['task-1'],
source: 'mcp',
},
'control_api_unavailable'
);
await expect(feature.replayPendingReports([teamName])).resolves.toEqual({
processed: 1,
accepted: 1,
rejected: 0,
superseded: 0,
});
const finalStatus = await feature.getStatus({ teamName, memberName });
expect(finalStatus).toMatchObject({
state: 'still_working',
report: {
accepted: true,
state: 'still_working',
taskIds: ['task-1'],
source: 'mcp',
},
});
const memberReports = JSON.parse(
await fs.promises.readFile(
path.join(
teamsBasePath,
teamName,
'members',
memberName,
'.member-work-sync',
'reports.json'
),
'utf8'
)
) as {
intents?: Record<
string,
{ status?: string; resultCode?: string; request?: { reportToken?: string } }
>;
};
expect(Object.values(memberReports.intents ?? {})).toContainEqual(
expect.objectContaining({
status: 'accepted',
resultCode: 'accepted',
request: expect.objectContaining({ reportToken: expiredToken.token }),
})
);
} finally {
await feature.dispose();
}
});
it('returns a reportable status with a token when no stored status exists', async () => {
const claudeRoot = makeTempRoot();
setClaudeBasePathOverride(claudeRoot);
const teamsBasePath = getTeamsBasePath();
const teamName = 'team-a';
const memberName = 'bob';
const feature = createMemberWorkSyncFeature({
teamsBasePath,
configReader: {
getConfig: vi.fn(async () => ({
name: teamName,
members: [{ name: memberName, providerId: 'codex' }],
})),
} as never,
taskReader: {
getTasks: vi.fn(async () => [
{
id: 'task-1',
displayId: '11111111',
subject: 'Wake from first status call',
status: 'pending',
owner: memberName,
},
]),
} as never,
kanbanManager: {
getState: vi.fn(async () => ({
teamName,
reviewers: [],
tasks: {},
})),
} as never,
membersMetaStore: {
getMembers: vi.fn(async () => []),
} as never,
isTeamActive: vi.fn(async () => true),
});
try {
const status = await feature.getStatus({ teamName, memberName });
expect(status).toMatchObject({
state: 'needs_sync',
shadow: { reconciledBy: 'request' },
});
expect(status.reportToken).toBeTruthy();
await expect(
feature.report({
teamName,
memberName,
state: 'still_working',
agendaFingerprint: status.agenda.fingerprint,
reportToken: status.reportToken,
taskIds: ['task-1'],
source: 'test',
})
).resolves.toMatchObject({
accepted: true,
status: { state: 'still_working', report: { accepted: true } },
});
} finally {
await feature.dispose();
}
});
it('refreshes an expired stored report token before returning status to a teammate', async () => {
const claudeRoot = makeTempRoot();
setClaudeBasePathOverride(claudeRoot);
const teamsBasePath = getTeamsBasePath();
const teamName = 'team-a';
const memberName = 'bob';
const feature = createMemberWorkSyncFeature({
teamsBasePath,
configReader: {
getConfig: vi.fn(async () => ({
name: teamName,
members: [{ name: memberName, providerId: 'codex' }],
})),
} as never,
taskReader: {
getTasks: vi.fn(async () => [
{
id: 'task-1',
displayId: '11111111',
subject: 'Wake with expired token',
status: 'pending',
owner: memberName,
},
]),
} as never,
kanbanManager: {
getState: vi.fn(async () => ({
teamName,
reviewers: [],
tasks: {},
})),
} as never,
membersMetaStore: {
getMembers: vi.fn(async () => []),
} as never,
isTeamActive: vi.fn(async () => true),
});
try {
const current = await feature.refreshStatus({ teamName, memberName });
const store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(teamsBasePath));
const expiredToken = 'wrs:v1.expired-token-for-regression';
await store.write({
...current,
reportToken: expiredToken,
reportTokenExpiresAt: new Date(Date.now() - 60_000).toISOString(),
});
const refreshed = await feature.getStatus({ teamName, memberName });
expect(refreshed.reportToken).toBeTruthy();
expect(refreshed.reportToken).not.toBe(expiredToken);
expect(Date.parse(refreshed.reportTokenExpiresAt ?? '')).toBeGreaterThan(Date.now());
await expect(
feature.report({
teamName,
memberName,
state: 'still_working',
agendaFingerprint: refreshed.agenda.fingerprint,
reportToken: refreshed.reportToken,
taskIds: ['task-1'],
source: 'test',
})
).resolves.toMatchObject({
accepted: true,
status: { state: 'still_working', report: { accepted: true } },
});
} finally {
await feature.dispose();
}
});
it('refreshes stale needs_sync into inactive after the whole team stops', async () => {
const claudeRoot = makeTempRoot();
setClaudeBasePathOverride(claudeRoot);
@ -4030,15 +4498,9 @@ describe('createMemberWorkSyncFeature composition', () => {
teamActive = false;
await expect(feature.getStatus({ teamName, memberName })).resolves.toMatchObject({
state: 'needs_sync',
diagnostics: expect.arrayContaining(['status_stale_refresh_enqueued']),
});
await waitForQueueIdle(feature);
await expect(store.read({ teamName, memberName })).resolves.toMatchObject({
state: 'inactive',
diagnostics: expect.arrayContaining(['team_runtime_inactive']),
shadow: { reconciledBy: 'queue', triggerReasons: ['manual_refresh'] },
shadow: { reconciledBy: 'request', triggerReasons: ['manual_refresh'] },
});
} finally {
await feature.dispose();

View file

@ -8,6 +8,14 @@ import {
createMemberWorkSyncFeature,
type MemberWorkSyncFeatureFacade,
} from '../../../../src/features/member-work-sync/main';
import {
buildCodexTrustedProjectConfigOverrides,
buildCodexWorkspaceTrustSettingsArgs,
type WorkspaceTrustArgsOnlyPlanRequest,
type WorkspaceTrustCoordinator,
type WorkspaceTrustLaunchArgPatch,
type WorkspaceTrustLaunchArgTargetSurface,
} from '../../../../src/features/workspace-trust/main';
import {
getTeamsBasePath,
setClaudeBasePathOverride,
@ -49,6 +57,13 @@ const liveDescribe =
const DEFAULT_ORCHESTRATOR_CLI = '/Users/belief/dev/projects/claude/agent_teams_orchestrator/cli-source';
const DEFAULT_MODEL = 'gpt-5.4-mini';
const DEFAULT_EFFORT = 'low' as const;
const LIVE_CODEX_WORKSPACE_TRUST_TARGET_SURFACES: WorkspaceTrustLaunchArgTargetSurface[] = [
'primary_provider_args',
'cross_provider_member_args',
'provider_facts_probe',
'default_model_probe',
];
const VITEST_HOME_PREFIX = 'agent-teams-vitest-home-';
liveDescribe('Member work sync Codex live e2e', () => {
let tempDir: string;
@ -593,6 +608,355 @@ liveDescribe('Member work sync Codex live e2e', () => {
420_000
);
it(
'wakes a real Codex teammate when runtime member meta omits provider metadata under noisy metrics',
async () => {
const orchestratorCli = process.env.CLAUDE_AGENT_TEAMS_ORCHESTRATOR_CLI_PATH?.trim();
expect(orchestratorCli).toBeTruthy();
await assertExecutable(orchestratorCli!);
const model = process.env.MEMBER_WORK_SYNC_CODEX_MODEL?.trim() || DEFAULT_MODEL;
const effort = (process.env.MEMBER_WORK_SYNC_CODEX_EFFORT?.trim() ||
DEFAULT_EFFORT) as 'low' | 'medium' | 'high' | 'xhigh';
const requestedMemberName = 'NickName';
const marker = `member-work-sync-codex-runtime-meta-${Date.now()}`;
teamName = `member-work-sync-codex-runtime-meta-${Date.now()}`;
const projectPath = path.join(tempDir, 'project');
await fs.mkdir(projectPath, { recursive: true });
await fs.writeFile(
path.join(projectPath, 'README.md'),
'# Member work sync Codex runtime meta live e2e\n\nKeep this project intentionally tiny.\n',
'utf8'
);
await trustProjectInTempClaudeGlobalConfig({ claudeRoot: tempClaudeRoot, projectPath });
process.env.CLAUDE_CODE_CODEX_NATIVE_IGNORE_USER_CONFIG = 'false';
if (ownsCodexHomeDir) {
await trustProjectInOwnedCodexHome({ codexHomeDir, projectPath });
}
const [
{ TeamProvisioningService },
{ TeamConfigReader },
{ TeamTaskReader },
{ TeamTaskWriter },
{ TeamKanbanManager },
{ TeamMembersMetaStore },
{ createCodexAccountFeature },
{ ProviderConnectionService },
] = await Promise.all([
import('../../../../src/main/services/team/TeamProvisioningService'),
import('../../../../src/main/services/team/TeamConfigReader'),
import('../../../../src/main/services/team/TeamTaskReader'),
import('../../../../src/main/services/team/TeamTaskWriter'),
import('../../../../src/main/services/team/TeamKanbanManager'),
import('../../../../src/main/services/team/TeamMembersMetaStore'),
import('../../../../src/features/codex-account/main/composition/createCodexAccountFeature'),
import('../../../../src/main/services/runtime/ProviderConnectionService'),
]);
codexAccountFeature = createCodexAccountFeature({
logger: {
info: () => undefined,
warn: () => undefined,
error: () => undefined,
},
configManager: {
getConfig: () => ({
providerConnections: {
codex: {
preferredAuthMode: hasLiveCodexApiKey() ? 'auto' : ('chatgpt' as const),
},
},
}),
},
});
providerConnectionService = ProviderConnectionService.getInstance();
providerConnectionService.setCodexAccountFeature(codexAccountFeature);
const provisioningService = new TeamProvisioningService();
provisioningService.setWorkspaceTrustCoordinator(createCodexOnlyWorkspaceTrustCoordinator());
svc = provisioningService;
const activeService = provisioningService;
const taskReader = new TeamTaskReader();
const membersMetaStore = new TeamMembersMetaStore();
feature = createMemberWorkSyncFeature({
teamsBasePath: getTeamsBasePath(),
configReader: new TeamConfigReader(),
taskReader,
kanbanManager: new TeamKanbanManager(),
membersMetaStore,
isTeamActive: (name) =>
activeService.isTeamAlive(name) || activeService.hasProvisioningRun(name),
listLifecycleActiveTeamNames: async () => [teamName!],
queueQuietWindowMs: 1,
resolveControlUrl: async () => controlServer?.baseUrl ?? null,
nudgeDeliveryWake: createLiveNudgeDeliveryWake(activeService),
});
activeService.setTeamChangeEmitter((event: TeamChangeEvent) =>
feature!.noteTeamChange(event)
);
activeService.setRuntimeTurnSettledEnvironmentProvider((input) =>
feature!.buildRuntimeTurnSettledEnvironment(input)
);
controlServer = await startMemberWorkSyncControlServer(feature);
process.env.CLAUDE_TEAM_CONTROL_URL = controlServer.baseUrl;
activeService.setControlApiBaseUrlResolver(async () => controlServer?.baseUrl ?? null);
await fs.writeFile(
path.join(tempClaudeRoot, 'team-control-api.json'),
JSON.stringify({ baseUrl: controlServer.baseUrl }, null, 2),
'utf8'
);
const progressEvents: TeamProvisioningProgress[] = [];
await activeService.createTeam(
{
teamName,
cwd: projectPath,
providerId: 'codex',
providerBackendId: 'codex-native',
model,
effort,
fastMode: 'off',
skipPermissions: true,
prompt: [
'Keep launch work minimal.',
'If you receive a member_work_sync_nudge, do not complete the task.',
'For a member_work_sync_nudge, call member_work_sync_status first.',
'Then call member_work_sync_report with state "still_working", the returned agendaFingerprint/reportToken, and taskIds for the current agenda.',
`After member_work_sync_report is accepted, add one task comment containing exactly: ${marker}:still-working.`,
'After that stop without a user-visible message.',
].join(' '),
members: [
{
name: requestedMemberName,
role: 'developer',
providerId: 'codex',
providerBackendId: 'codex-native',
model,
effort,
},
],
},
(progress) => {
progressEvents.push(progress);
}
);
await waitUntil(async () => {
const last = progressEvents.at(-1);
if (last?.state === 'failed') {
throw new Error(formatProgressDump(progressEvents));
}
return last?.state === 'ready';
}, 240_000);
const config = await new TeamConfigReader().getConfig(teamName);
const memberName = config?.members
?.find((member) => sameMemberName(member.name, requestedMemberName))
?.name?.trim();
expect(memberName).toBeTruthy();
expect(
config?.members?.find((member) => sameMemberName(member.name, memberName!))
).toMatchObject({
providerId: 'codex',
});
await stripMemberProviderMetadataFromMembersMeta({
teamName,
memberName: memberName!,
fallbackRole: 'developer',
});
expect(
(await membersMetaStore.getMembers(teamName)).find((member) =>
sameMemberName(member.name, memberName!)
)
).toMatchObject({
name: memberName,
providerId: undefined,
providerBackendId: undefined,
model: undefined,
effort: undefined,
});
await waitUntil(async () => {
await feature!.drainRuntimeTurnSettledEvents();
const diagnostics = feature!.getQueueDiagnostics();
return diagnostics.queued === 0 && diagnostics.running === 0;
}, 60_000, 1_000, async () =>
formatMemberWorkSyncDiagnostics({
feature: feature!,
teamName: teamName!,
memberName: memberName!,
})
);
const createdAt = new Date().toISOString();
const taskId = `runtime-meta-${Date.now()}`;
const displayId = String(Date.now()).slice(-8);
await new TeamTaskWriter().createTask(teamName, {
id: taskId,
displayId,
subject: `Member work sync live runtime meta ${marker}`,
description: 'Verify native stale recovery when runtime member meta lacks provider fields.',
owner: memberName!,
createdBy: 'user',
status: 'in_progress',
projectPath,
createdAt,
updatedAt: createdAt,
});
feature.noteTeamChange({ type: 'task', teamName, taskId });
let agendaFingerprint = '';
await waitUntil(async () => {
const status = await feature!.refreshStatus({ teamName: teamName!, memberName: memberName! });
if (!status.agenda.items.some((item) => item.taskId === taskId)) {
return false;
}
expect(status).toMatchObject({
state: 'needs_sync',
providerId: 'codex',
diagnostics: expect.arrayContaining(['no_current_report']),
});
expect(status.agenda.items).toEqual(
expect.arrayContaining([
expect.objectContaining({
taskId,
reason: 'owned_in_progress_task',
evidence: expect.objectContaining({ status: 'in_progress' }),
}),
])
);
agendaFingerprint = status.agenda.fingerprint;
return true;
}, 60_000, 500, async () =>
formatMemberWorkSyncDiagnostics({
feature: feature!,
teamName: teamName!,
memberName: memberName!,
taskId,
})
);
await waitUntil(async () => {
const diagnostics = feature!.getQueueDiagnostics();
return diagnostics.queued === 0 && diagnostics.running === 0;
}, 30_000, 500, async () =>
formatMemberWorkSyncDiagnostics({
feature: feature!,
teamName: teamName!,
memberName: memberName!,
taskId,
})
);
const stableStatus = await feature.refreshStatus({
teamName,
memberName: memberName!,
});
expect(stableStatus.providerId).toBe('codex');
expect(stableStatus.agenda.fingerprint).toBe(agendaFingerprint);
expect(
(await readInboxMessages(teamName, memberName!)).filter(
(message) => message.messageKind === 'member_work_sync_nudge'
)
).toHaveLength(0);
await seedNativeStaleBlockingMetrics({
teamName,
memberName: memberName!,
agendaFingerprint,
});
feature.noteTeamChange({ type: 'task', teamName, taskId });
await waitUntil(async () => {
const diagnostics = feature!.getQueueDiagnostics();
return diagnostics.queued === 0 && diagnostics.running === 0;
}, 30_000, 500, async () =>
formatMemberWorkSyncDiagnostics({
feature: feature!,
teamName: teamName!,
memberName: memberName!,
taskId,
})
);
expect((await feature.getStatus({ teamName, memberName: memberName! })).providerId).toBe(
'codex'
);
await waitUntil(async () => {
const nudges = (await readInboxMessages(teamName!, memberName!)).filter(
(message) => message.messageKind === 'member_work_sync_nudge'
);
return nudges.length === 1;
}, 60_000, 1_000, async () =>
formatMemberWorkSyncDiagnostics({
feature: feature!,
teamName: teamName!,
memberName: memberName!,
taskId,
})
);
const metrics = await feature.getMetrics({ teamName });
expect(metrics.phase2Readiness.reasons).toContain('would_nudge_rate_high');
const journalPath = path.join(
getTeamsBasePath(),
teamName,
'members',
memberName!,
'.member-work-sync',
'journal.jsonl'
);
const journal = await fs.readFile(journalPath, 'utf8');
const nudgeOutcomes = journal
.trim()
.split('\n')
.map((line) => JSON.parse(line) as { event?: string; reason?: string })
.filter((event) => event.event === 'nudge_skipped' || event.event === 'nudge_delivered');
expect(nudgeOutcomes).toContainEqual(expect.objectContaining({ event: 'nudge_delivered' }));
expect(nudgeOutcomes.at(-1)).toMatchObject({ event: 'nudge_delivered' });
await relayInboxIfNotAlreadyConsumed(activeService, memberName!);
await waitUntil(async () => {
const fatalRuntimeMessage = await readFatalRuntimeMessage(teamName!);
if (fatalRuntimeMessage) {
throw new FatalWaitError(fatalRuntimeMessage);
}
await feature!.replayPendingReports([teamName!]);
const status = await feature!.getStatus({ teamName: teamName!, memberName: memberName! });
return status.report?.accepted === true && status.report.state === 'still_working';
}, 240_000, 2_000, async () =>
formatMemberWorkSyncDiagnostics({
feature: feature!,
teamName: teamName!,
memberName: memberName!,
taskId,
})
);
const finalStatus = await feature.getStatus({ teamName, memberName: memberName! });
expect(finalStatus.state).toBe('still_working');
expect(finalStatus.report).toMatchObject({
accepted: true,
state: 'still_working',
});
await waitUntil(async () => {
await feature!.drainRuntimeTurnSettledEvents();
const metas = await readRuntimeTurnSettledProcessedMetas(getTeamsBasePath());
return metas.some(
({ meta }) =>
(meta.event as { provider?: unknown; teamName?: unknown } | undefined)?.provider ===
'codex' &&
(meta.event as { provider?: unknown; teamName?: unknown } | undefined)?.teamName ===
teamName
);
}, 60_000);
await expect(feature.dispatchDueNudges([teamName])).resolves.toMatchObject({
delivered: 0,
});
},
480_000
);
it(
'lets a real Codex teammate complete the task and report caught-up after the board clears',
async () => {
@ -852,6 +1216,173 @@ function resolveConnectedCodexHome(previousCodexHome: string | undefined): strin
return path.join(os.userInfo().homedir, '.codex');
}
async function trustProjectInOwnedCodexHome(input: {
codexHomeDir: string;
projectPath: string;
}): Promise<void> {
const [override] = buildCodexTrustedProjectConfigOverrides([input.projectPath], {
maxOverrides: 1,
});
if (!override) {
return;
}
await fs.mkdir(input.codexHomeDir, { recursive: true });
await fs.appendFile(path.join(input.codexHomeDir, 'config.toml'), `\n${override}\n`, 'utf8');
}
async function trustProjectInTempClaudeGlobalConfig(input: {
claudeRoot: string;
projectPath: string;
}): Promise<void> {
const projectRealPath = await fs.realpath(input.projectPath).catch(() => input.projectPath);
const projects = Object.fromEntries(
[...new Set([input.projectPath, projectRealPath])].map((projectPath) => [
projectPath,
{
allowedTools: [],
mcpContextUris: [],
mcpServers: {},
enabledMcpjsonServers: [],
disabledMcpjsonServers: [],
projectOnboardingSeenCount: 0,
hasClaudeMdExternalIncludesApproved: false,
hasClaudeMdExternalIncludesWarningShown: false,
hasTrustDialogAccepted: true,
},
])
);
const configPaths = [path.join(input.claudeRoot, '.claude.json')];
const homeDir = process.env.HOME?.trim();
if (homeDir && path.basename(homeDir).startsWith(VITEST_HOME_PREFIX)) {
configPaths.push(path.join(homeDir, '.claude.json'));
}
for (const configPath of configPaths) {
await fs.mkdir(path.dirname(configPath), { recursive: true });
await fs.writeFile(configPath, `${JSON.stringify({ projects }, null, 2)}\n`, 'utf8');
}
}
function createCodexOnlyWorkspaceTrustCoordinator(): WorkspaceTrustCoordinator {
return {
async planArgsOnly(request) {
return { launchArgPatches: buildLiveCodexWorkspaceTrustPatches(request) };
},
async planFull(request) {
return {
workspaces: request.workspaces,
launchArgPatches: buildLiveCodexWorkspaceTrustPatches(request),
};
},
async execute(plan) {
return {
id: 'member-work-sync-codex-live-workspace-trust',
provider: 'claude',
status: 'skipped',
workspaceIds: plan.workspaces.map((workspace) => workspace.id),
evidence: ['live test injects Codex native trusted-project settings'],
};
},
};
}
function buildLiveCodexWorkspaceTrustPatches(
request: WorkspaceTrustArgsOnlyPlanRequest
): WorkspaceTrustLaunchArgPatch[] {
if (
!request.featureFlags.enabled ||
!request.featureFlags.codexArgs ||
!request.providers.includes('codex')
) {
return [];
}
const configKeys = request.workspaces.flatMap((workspace) => [
workspace.configKeyCwd,
workspace.realCwd,
...(workspace.gitRootConfigKey ? [workspace.gitRootConfigKey] : []),
]);
const overrides = buildCodexTrustedProjectConfigOverrides(configKeys);
const args = buildCodexWorkspaceTrustSettingsArgs(overrides);
if (args.length === 0) {
return [];
}
const workspaceIds = request.workspaces.map((workspace) => workspace.id);
return (request.targetSurfaces ?? LIVE_CODEX_WORKSPACE_TRUST_TARGET_SURFACES).map((surface) => ({
id: `member-work-sync-codex-live-workspace-trust:${surface}`,
owner: 'workspace-trust',
targetProvider: 'codex',
targetSurface: surface,
dialect: 'claude-codex-runtime-settings',
args,
dedupeKey: `member-work-sync-codex-live-workspace-trust:${surface}:${overrides.join('|')}`,
sourceWorkspaceIds: workspaceIds,
reason: 'Trust the live e2e project for Codex native headless teammate startup.',
}));
}
function sameMemberName(left: string | undefined, right: string | undefined): boolean {
return left?.trim().toLowerCase() === right?.trim().toLowerCase();
}
async function stripMemberProviderMetadataFromMembersMeta(input: {
teamName: string;
memberName: string;
fallbackRole: string;
}): Promise<void> {
const metaPath = path.join(getTeamsBasePath(), input.teamName, 'members.meta.json');
const raw = await fs.readFile(metaPath, 'utf8').catch(() => '{"version":1,"members":[]}');
const parsed = JSON.parse(raw) as { providerBackendId?: unknown; members?: unknown };
const sourceMembers = Array.isArray(parsed.members) ? parsed.members : [];
let found = false;
const members = sourceMembers.flatMap((member): Record<string, unknown>[] => {
if (!member || typeof member !== 'object') {
return [];
}
const source = member as Record<string, unknown>;
const name = typeof source.name === 'string' ? source.name.trim() : '';
if (!name) {
return [];
}
if (!sameMemberName(name, input.memberName)) {
return [source];
}
found = true;
const stripped: Record<string, unknown> = { name };
for (const key of ['role', 'workflow', 'isolation', 'agentType', 'color', 'agentId', 'cwd']) {
if (typeof source[key] === 'string' && source[key].trim()) {
stripped[key] = source[key];
}
}
for (const key of ['joinedAt', 'removedAt']) {
if (typeof source[key] === 'number') {
stripped[key] = source[key];
}
}
return [stripped];
});
if (!found) {
members.push({
name: input.memberName,
role: input.fallbackRole,
agentType: 'general-purpose',
joinedAt: Date.now(),
});
}
const payload = {
version: 1,
...(typeof parsed.providerBackendId === 'string'
? { providerBackendId: parsed.providerBackendId }
: {}),
members,
};
await fs.writeFile(metaPath, `${JSON.stringify(payload, null, 2)}\n`, 'utf8');
}
async function seedShadowReadyMetrics(input: {
teamName: string;
memberName: string;
@ -899,6 +1430,68 @@ async function seedShadowReadyMetrics(input: {
);
}
async function seedNativeStaleBlockingMetrics(input: {
teamName: string;
memberName: string;
agendaFingerprint: string;
}): Promise<void> {
const metricsPath = path.join(
getTeamsBasePath(),
input.teamName,
'.member-work-sync',
'indexes',
'metrics.json'
);
const nowMs = Date.now();
const staleObservedAt = new Date(nowMs - 6 * 60_000 - 1_000).toISOString();
await fs.mkdir(path.dirname(metricsPath), { recursive: true });
await fs.writeFile(
metricsPath,
`${JSON.stringify(
{
schemaVersion: 2,
members: {
[input.memberName]: {
memberName: input.memberName,
state: 'needs_sync',
agendaFingerprint: input.agendaFingerprint,
actionableCount: 1,
evaluatedAt: staleObservedAt,
providerId: 'codex',
},
},
recentEvents: [
{
id: 'native-stale-status',
teamName: input.teamName,
memberName: input.memberName,
kind: 'status_evaluated',
state: 'needs_sync',
agendaFingerprint: input.agendaFingerprint,
recordedAt: staleObservedAt,
actionableCount: 1,
providerId: 'codex',
},
...Array.from({ length: 12 }, (_, index) => ({
id: `native-stale-would-nudge-${index}`,
teamName: input.teamName,
memberName: input.memberName,
kind: 'would_nudge',
state: 'needs_sync',
agendaFingerprint: input.agendaFingerprint,
recordedAt: new Date(nowMs - 5 * 60_000 + index * 5_000).toISOString(),
actionableCount: 1,
providerId: 'codex',
})),
],
},
null,
2
)}\n`,
'utf8'
);
}
async function readInboxMessages(teamName: string, memberName: string): Promise<
Array<{
messageId?: string;

View file

@ -13,7 +13,9 @@ import fs from 'fs';
import os from 'os';
import path from 'path';
import { afterEach, describe, expect, it, vi } from 'vitest';
import type { Mock } from 'vitest';
import type { MemberWorkSyncNudgeDeliveryWakePort } from '@features/member-work-sync/core/application/ports';
import type { InboxMessage, TaskRef } from '@shared/types/team';
const tempRoots: string[] = [];
@ -291,12 +293,22 @@ function buildProofMissingRecord(input: {
};
}
type TestNudgeDeliveryWake = MemberWorkSyncNudgeDeliveryWakePort & {
schedule: Mock<MemberWorkSyncNudgeDeliveryWakePort['schedule']>;
};
function createNudgeDeliveryWake(): TestNudgeDeliveryWake {
return {
schedule: vi.fn<MemberWorkSyncNudgeDeliveryWakePort['schedule']>(async () => undefined),
};
}
function createFeature(input: {
teamsBasePath: string;
teamName: string;
memberName: string;
service: TeamProvisioningService;
nudgeDeliveryWake: { schedule: ReturnType<typeof vi.fn> };
nudgeDeliveryWake: TestNudgeDeliveryWake;
providerId?: 'opencode' | 'codex';
}) {
const providerId = input.providerId ?? 'opencode';
@ -351,7 +363,7 @@ describe('OpenCode agenda-sync proof-missing recovery safe e2e', () => {
const teamName = 'team-codex-agenda-sync-nudge';
const memberName = 'bob';
const service = new TeamProvisioningService();
const nudgeDeliveryWake = { schedule: vi.fn(async () => undefined) };
const nudgeDeliveryWake = createNudgeDeliveryWake();
const feature = createFeature({
teamsBasePath,
teamName,
@ -415,7 +427,7 @@ describe('OpenCode agenda-sync proof-missing recovery safe e2e', () => {
const taskRef: TaskRef = { teamName, taskId: 'task-1', displayId: '11111111' };
const foregroundMessageId = 'proof-missing-message-1';
const service = new TeamProvisioningService();
const nudgeDeliveryWake = { schedule: vi.fn(async () => undefined) };
const nudgeDeliveryWake = createNudgeDeliveryWake();
const feature = createFeature({
teamsBasePath,
teamName,
@ -499,7 +511,7 @@ describe('OpenCode agenda-sync proof-missing recovery safe e2e', () => {
const laneId = 'secondary:opencode:jack';
const taskRef: TaskRef = { teamName, taskId: 'task-1', displayId: '11111111' };
const service = new TeamProvisioningService();
const nudgeDeliveryWake = { schedule: vi.fn(async () => undefined) };
const nudgeDeliveryWake = createNudgeDeliveryWake();
const feature = createFeature({
teamsBasePath,
teamName,

View file

@ -177,6 +177,55 @@ describe('TeamInboxWriter', () => {
});
});
it('updates an existing member-work-sync row text when message kind and payload hash match', async () => {
await writer.sendMessage('my-team', {
member: 'alice',
text: 'sync your work state',
source: 'system_notification',
messageId: 'work-sync-1',
messageKind: 'member_work_sync_nudge',
workSyncIntent: 'agenda_sync',
workSyncPayloadHash: 'sha256:work-sync',
});
const result = await writer.updateMessageText('my-team', {
member: 'alice',
messageId: 'work-sync-1',
text: 'sync your work state\nRequired control API: pass controlUrl "http://127.0.0.1:43123" in both member_work_sync_status and member_work_sync_report.',
expectedMessageKind: 'member_work_sync_nudge',
expectedWorkSyncPayloadHash: 'sha256:work-sync',
});
const persisted = JSON.parse(hoisted.files.get(inboxPath) ?? '[]') as Record<string, unknown>[];
expect(result).toEqual({ found: true, updated: true });
expect(persisted[0]?.text).toContain('controlUrl "http://127.0.0.1:43123"');
expect(persisted[0]?.workSyncPayloadHash).toBe('sha256:work-sync');
});
it('does not update member-work-sync row text when payload hash mismatches', async () => {
await writer.sendMessage('my-team', {
member: 'alice',
text: 'sync your work state',
source: 'system_notification',
messageId: 'work-sync-1',
messageKind: 'member_work_sync_nudge',
workSyncIntent: 'agenda_sync',
workSyncPayloadHash: 'sha256:work-sync',
});
const result = await writer.updateMessageText('my-team', {
member: 'alice',
messageId: 'work-sync-1',
text: 'should not write',
expectedMessageKind: 'member_work_sync_nudge',
expectedWorkSyncPayloadHash: 'sha256:different',
});
const persisted = JSON.parse(hoisted.files.get(inboxPath) ?? '[]') as Record<string, unknown>[];
expect(result).toEqual({ found: true, updated: false });
expect(persisted[0]?.text).toBe('sync your work state');
});
it('preserves provided message identity fields for dedup across live and persisted rows', async () => {
const result = await writer.sendMessage('my-team', {
member: 'alice',