diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncDiagnosticsReader.ts b/src/features/member-work-sync/core/application/MemberWorkSyncDiagnosticsReader.ts index aaa4915d..c329abe1 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncDiagnosticsReader.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncDiagnosticsReader.ts @@ -1,6 +1,7 @@ import { decideMemberWorkSyncStatus } from '../domain'; import { finalizeMemberWorkSyncAgenda } from './MemberWorkSyncReconciler'; +import { resolveMemberWorkSyncRuntimeActivity } from './MemberWorkSyncRuntimeActivity'; import type { MemberWorkSyncStatus, MemberWorkSyncStatusRequest } from '../../contracts'; import type { MemberWorkSyncUseCaseDeps } from './ports'; @@ -17,13 +18,14 @@ export class MemberWorkSyncDiagnosticsReader { const source = await this.deps.agendaSource.loadAgenda(request); const agenda = finalizeMemberWorkSyncAgenda(this.deps, source); const nowIso = this.deps.clock.now().toISOString(); - const teamActive = this.deps.lifecycle - ? await this.deps.lifecycle.isTeamActive(agenda.teamName) - : true; + const runtimeActivity = await resolveMemberWorkSyncRuntimeActivity(this.deps, { + teamName: agenda.teamName, + memberName: agenda.memberName, + }); const decision = decideMemberWorkSyncStatus({ agenda, nowIso, - inactive: source.inactive || !teamActive, + inactive: source.inactive || runtimeActivity.inactive, }); return { @@ -39,7 +41,7 @@ export class MemberWorkSyncDiagnosticsReader { evaluatedAt: nowIso, diagnostics: [ ...agenda.diagnostics, - ...(!teamActive ? ['team_runtime_inactive'] : []), + ...runtimeActivity.diagnostics, ...decision.diagnostics, 'status_snapshot_not_persisted', ], diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts index dcbc4f3e..038ae15e 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncNudgeDispatcher.ts @@ -3,6 +3,7 @@ import { decideMemberWorkSyncStatus } from '../domain'; import { appendMemberWorkSyncAudit, reasonToAuditEvent } from './MemberWorkSyncAudit'; import { decideMemberWorkSyncNudgeActivation } from './MemberWorkSyncNudgeActivationPolicy'; import { finalizeMemberWorkSyncAgenda } from './MemberWorkSyncReconciler'; +import { resolveMemberWorkSyncRuntimeActivity } from './MemberWorkSyncRuntimeActivity'; import type { MemberWorkSyncAgenda, @@ -14,6 +15,9 @@ import type { MemberWorkSyncAuditEventName, MemberWorkSyncUseCaseDeps } from './ const MEMBER_WORK_SYNC_MAX_NUDGES_PER_MEMBER_PER_HOUR = 2; const MEMBER_WORK_SYNC_RETRY_BASE_MINUTES = 10; const MEMBER_WORK_SYNC_RETRY_MAX_MINUTES = 60; +const MEMBER_WORK_SYNC_NUDGE_DISPATCH_ITEM_TIMEOUT_MS = 2 * 60_000; +const MEMBER_WORK_SYNC_NUDGE_DISPATCH_TEAM_TIMEOUT_MS = 2 * 60_000; +const MEMBER_WORK_SYNC_NUDGE_CLAIM_TIMEOUT_MS = 30_000; const AGENDA_SYNC_STILL_STUCK_RECOVERY_INTENT_PREFIX = 'agenda-sync-still-stuck:'; export interface MemberWorkSyncNudgeDispatchSummary { @@ -28,12 +32,32 @@ export interface MemberWorkSyncNudgeDispatchOptions { claimedBy: string; teamNames: string[]; limit?: number; + itemTimeoutMs?: number; + teamTimeoutMs?: number; + claimTimeoutMs?: number; } function emptySummary(): MemberWorkSyncNudgeDispatchSummary { return { claimed: 0, delivered: 0, superseded: 0, retryable: 0, terminal: 0 }; } +function addSummary( + left: MemberWorkSyncNudgeDispatchSummary, + right: MemberWorkSyncNudgeDispatchSummary +): MemberWorkSyncNudgeDispatchSummary { + return { + claimed: left.claimed + right.claimed, + delivered: left.delivered + right.delivered, + superseded: left.superseded + right.superseded, + retryable: left.retryable + right.retryable, + terminal: left.terminal + right.terminal, + }; +} + +function unrefTimer(timer: ReturnType): void { + timer.unref?.(); +} + function addMinutes(iso: string, minutes: number): string { return new Date(Date.parse(iso) + minutes * 60_000).toISOString(); } @@ -116,6 +140,22 @@ function reviewPickupRequestIdsStillMatch( return payloadIds.length > 0 && payloadIds.every((id) => agendaIds.includes(id)); } +interface MemberWorkSyncNudgeDispatchRun { + cancelled: boolean; + parent?: MemberWorkSyncNudgeDispatchRun; +} + +function isDispatchRunCancelled(run?: MemberWorkSyncNudgeDispatchRun): boolean { + let current: MemberWorkSyncNudgeDispatchRun | undefined = run; + while (current) { + if (current.cancelled) { + return true; + } + current = current.parent; + } + return false; +} + export class MemberWorkSyncNudgeDispatcher { constructor(private readonly deps: MemberWorkSyncUseCaseDeps) {} @@ -129,28 +169,275 @@ export class MemberWorkSyncNudgeDispatcher { } const nowIso = this.deps.clock.now().toISOString(); - const summary = emptySummary(); - for (const teamName of [ - ...new Set(options.teamNames.map((name) => name.trim()).filter(Boolean)), - ]) { - const claimed = await outbox.claimDue({ - teamName, - claimedBy: options.claimedBy, - nowIso, - limit: options.limit ?? 10, - }); - summary.claimed += claimed.length; - for (const item of claimed) { - const result = await this.dispatchItem(item, nowIso); - summary[result] += 1; + const itemTimeoutMs = Math.max( + 1, + options.itemTimeoutMs ?? MEMBER_WORK_SYNC_NUDGE_DISPATCH_ITEM_TIMEOUT_MS + ); + const teamTimeoutMs = Math.max( + 1, + options.teamTimeoutMs ?? MEMBER_WORK_SYNC_NUDGE_DISPATCH_TEAM_TIMEOUT_MS + ); + const claimTimeoutMs = Math.max( + 1, + options.claimTimeoutMs ?? MEMBER_WORK_SYNC_NUDGE_CLAIM_TIMEOUT_MS + ); + const teamNames = [...new Set(options.teamNames.map((name) => name.trim()).filter(Boolean))]; + const summaries = await Promise.allSettled( + teamNames.map((teamName) => + this.dispatchTeamWithTimeout(teamName, options, nowIso, { + itemTimeoutMs, + teamTimeoutMs, + claimTimeoutMs, + }) + ) + ); + + let summary = emptySummary(); + for (const [index, result] of summaries.entries()) { + if (result.status === 'fulfilled') { + summary = addSummary(summary, result.value); + } else { + this.deps.logger?.warn('member work sync team nudge dispatch failed', { + teamName: teamNames[index], + error: String(result.reason), + }); } } return summary; } + private async dispatchTeamWithTimeout( + teamName: string, + options: MemberWorkSyncNudgeDispatchOptions, + nowIso: string, + timeouts: { itemTimeoutMs: number; teamTimeoutMs: number; claimTimeoutMs: number } + ): Promise { + let timeout: ReturnType | null = null; + const run: MemberWorkSyncNudgeDispatchRun = { cancelled: false }; + const work = this.dispatchTeam(teamName, options, nowIso, timeouts, run); + void work.catch(() => undefined); + + try { + const result = await Promise.race([ + work, + new Promise<'timeout'>((resolve) => { + timeout = setTimeout(() => { + run.cancelled = true; + resolve('timeout'); + }, timeouts.teamTimeoutMs); + unrefTimer(timeout); + }), + ]); + if (result !== 'timeout') { + return result; + } + this.deps.logger?.warn('member work sync team nudge dispatch timed out', { + teamName, + timeoutMs: timeouts.teamTimeoutMs, + }); + return emptySummary(); + } finally { + run.cancelled = true; + if (timeout) { + clearTimeout(timeout); + } + } + } + + private async dispatchTeam( + teamName: string, + options: MemberWorkSyncNudgeDispatchOptions, + nowIso: string, + timeouts: { itemTimeoutMs: number; claimTimeoutMs: number }, + run: MemberWorkSyncNudgeDispatchRun + ): Promise { + const summary = emptySummary(); + const claimed = await this.claimDueWithTimeout(teamName, options, nowIso, timeouts, run); + if (!claimed || isDispatchRunCancelled(run)) { + return summary; + } + + summary.claimed += claimed.length; + for (const item of claimed) { + if (isDispatchRunCancelled(run)) { + break; + } + const result = await this.dispatchItemWithTimeout(item, nowIso, timeouts.itemTimeoutMs, run); + summary[result] += 1; + } + return summary; + } + + private async claimDueWithTimeout( + teamName: string, + options: MemberWorkSyncNudgeDispatchOptions, + nowIso: string, + timeouts: { claimTimeoutMs: number }, + run: MemberWorkSyncNudgeDispatchRun + ): Promise { + const outbox = this.deps.outboxStore; + if (!outbox) { + return null; + } + + let timeout: ReturnType | null = null; + const work = outbox.claimDue({ + teamName, + claimedBy: options.claimedBy, + nowIso, + limit: options.limit ?? 10, + }); + void work.catch(() => undefined); + + try { + const result = await Promise.race([ + work, + new Promise<'timeout'>((resolve) => { + timeout = setTimeout(() => resolve('timeout'), timeouts.claimTimeoutMs); + unrefTimer(timeout); + }), + ]); + if (result !== 'timeout') { + return isDispatchRunCancelled(run) ? null : result; + } + this.deps.logger?.warn('member work sync nudge claim timed out', { + teamName, + timeoutMs: timeouts.claimTimeoutMs, + }); + return null; + } catch (error) { + this.deps.logger?.warn('member work sync nudge claim failed', { + teamName, + error: String(error), + }); + return null; + } finally { + if (timeout) { + clearTimeout(timeout); + } + } + } + + private async dispatchItemWithTimeout( + item: MemberWorkSyncOutboxItem, + nowIso: string, + timeoutMs: number, + run: MemberWorkSyncNudgeDispatchRun + ): Promise> { + let timeout: ReturnType | null = null; + const itemRun: MemberWorkSyncNudgeDispatchRun = { cancelled: false, parent: run }; + const work = this.dispatchItem(item, nowIso, itemRun); + void work.catch(() => undefined); + + try { + const result = await Promise.race< + keyof Omit | 'timeout' + >([ + work, + new Promise<'timeout'>((resolve) => { + timeout = setTimeout(() => { + itemRun.cancelled = true; + resolve('timeout'); + }, timeoutMs); + unrefTimer(timeout); + }), + ]); + if (result !== 'timeout') { + return result; + } + await this.tryMarkDispatchItemRetryable( + item, + nowIso, + `nudge dispatch item timed out after ${timeoutMs}ms`, + timeoutMs, + run + ); + return 'retryable'; + } catch (error) { + await this.tryMarkDispatchItemRetryable(item, nowIso, String(error), timeoutMs, run); + return 'retryable'; + } finally { + itemRun.cancelled = true; + if (timeout) { + clearTimeout(timeout); + } + } + } + + private async tryMarkDispatchItemRetryable( + item: MemberWorkSyncOutboxItem, + nowIso: string, + error: string, + timeoutMs: number, + run?: MemberWorkSyncNudgeDispatchRun + ): Promise { + if (isDispatchRunCancelled(run)) { + return; + } + let timeout: ReturnType | null = null; + const markTimeoutMs = Math.min(Math.max(1, timeoutMs), 5_000); + const work = this.markDispatchItemRetryable(item, nowIso, error, run); + void work.catch(() => undefined); + + try { + const result = await Promise.race([ + work.then(() => 'marked' as const), + new Promise<'timeout'>((resolve) => { + timeout = setTimeout(() => resolve('timeout'), markTimeoutMs); + unrefTimer(timeout); + }), + ]); + if (result === 'timeout') { + this.deps.logger?.warn('member work sync nudge retry mark timed out', { + teamName: item.teamName, + memberName: item.memberName, + outboxId: item.id, + timeoutMs: markTimeoutMs, + error, + }); + } + } catch (markError) { + this.deps.logger?.warn('member work sync nudge retry mark failed', { + teamName: item.teamName, + memberName: item.memberName, + outboxId: item.id, + error: String(markError), + }); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } + } + + private async markDispatchItemRetryable( + item: MemberWorkSyncOutboxItem, + nowIso: string, + error: string, + run?: MemberWorkSyncNudgeDispatchRun + ): Promise { + if (isDispatchRunCancelled(run)) { + return; + } + await this.deps.outboxStore?.markFailed({ + teamName: item.teamName, + id: item.id, + attemptGeneration: item.attemptGeneration, + error, + retryable: true, + nowIso, + nextAttemptAt: nextRetryAt(item, nowIso), + }); + if (isDispatchRunCancelled(run)) { + return; + } + await this.appendDispatchAudit(item, 'nudge_retryable', error); + } + private async dispatchItem( item: MemberWorkSyncOutboxItem, - nowIso: string + nowIso: string, + run: MemberWorkSyncNudgeDispatchRun ): Promise> { const outbox = this.deps.outboxStore; const inbox = this.deps.inboxNudge; @@ -158,7 +445,13 @@ export class MemberWorkSyncNudgeDispatcher { return 'terminal'; } + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } const revalidation = await this.revalidate(item, nowIso); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } if (!revalidation.ok) { if (revalidation.retryable) { await outbox.markFailed({ @@ -170,6 +463,9 @@ export class MemberWorkSyncNudgeDispatcher { nowIso, nextAttemptAt: revalidation.nextAttemptAt ?? nextRetryAt(item, nowIso), }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.appendDispatchAudit( item, reasonToAuditEvent(revalidation.reason), @@ -178,8 +474,8 @@ export class MemberWorkSyncNudgeDispatcher { return 'retryable'; } if (revalidation.reason.startsWith('review_pickup_delivery_unavailable:')) { - await this.markReviewPickupDeliveryUnavailable(item, nowIso, revalidation.reason); - return 'superseded'; + await this.markReviewPickupDeliveryUnavailable(item, nowIso, revalidation.reason, run); + return isDispatchRunCancelled(run) ? 'retryable' : 'superseded'; } await outbox.markSuperseded({ teamName: item.teamName, @@ -187,11 +483,17 @@ export class MemberWorkSyncNudgeDispatcher { reason: revalidation.reason, nowIso, }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.appendDispatchAudit(item, 'nudge_superseded', revalidation.reason); return 'superseded'; } try { + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } const inserted = await inbox.insertIfAbsent({ teamName: item.teamName, memberName: item.memberName, @@ -200,6 +502,9 @@ export class MemberWorkSyncNudgeDispatcher { payload: item.payload, timestamp: nowIso, }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } if (inserted.conflict) { await outbox.markFailed({ teamName: item.teamName, @@ -209,6 +514,9 @@ export class MemberWorkSyncNudgeDispatcher { retryable: false, nowIso, }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.appendDispatchAudit(item, 'nudge_skipped', 'inbox_payload_conflict'); return 'terminal'; } @@ -218,7 +526,8 @@ export class MemberWorkSyncNudgeDispatcher { inserted.messageId, inserted.inserted, revalidation.providerId, - nowIso + nowIso, + run ); } await outbox.markDelivered({ @@ -228,15 +537,25 @@ export class MemberWorkSyncNudgeDispatcher { deliveredMessageId: inserted.messageId, nowIso, }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.appendDispatchAudit(item, 'nudge_delivered', 'inbox_inserted'); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.scheduleDeliveryWake( item, inserted.messageId, inserted.inserted, - revalidation.providerId + revalidation.providerId, + run ); - return 'delivered'; + return isDispatchRunCancelled(run) ? 'retryable' : 'delivered'; } catch (error) { + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await outbox.markFailed({ teamName: item.teamName, id: item.id, @@ -246,6 +565,9 @@ export class MemberWorkSyncNudgeDispatcher { nowIso, nextAttemptAt: nextRetryAt(item, nowIso), }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.appendDispatchAudit(item, 'nudge_retryable', String(error)); return 'retryable'; } @@ -256,7 +578,8 @@ export class MemberWorkSyncNudgeDispatcher { messageId: string, inserted: boolean, providerId: MemberWorkSyncStatus['providerId'] | undefined, - nowIso: string + nowIso: string, + run: MemberWorkSyncNudgeDispatchRun ): Promise> { const outbox = this.deps.outboxStore; const delivery = this.deps.reviewPickupDelivery; @@ -264,11 +587,15 @@ export class MemberWorkSyncNudgeDispatcher { await this.markReviewPickupDeliveryUnavailable( item, nowIso, - 'review_pickup_delivery_port_unavailable' + 'review_pickup_delivery_port_unavailable', + run ); - return 'superseded'; + return isDispatchRunCancelled(run) ? 'retryable' : 'superseded'; } + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } const outcome = await delivery.deliver({ teamName: item.teamName, memberName: item.memberName, @@ -278,6 +605,9 @@ export class MemberWorkSyncNudgeDispatcher { inserted, nowIso, }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } if (outcome.ok) { await outbox.markDelivered({ @@ -289,7 +619,13 @@ export class MemberWorkSyncNudgeDispatcher { deliveryDiagnostics: outcome.diagnostics, nowIso, }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.appendDispatchAudit(item, 'review_pickup_member_nudge_delivered', outcome.state); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.appendDispatchAudit(item, 'nudge_delivered', `review_pickup:${outcome.state}`); return 'delivered'; } @@ -304,13 +640,16 @@ export class MemberWorkSyncNudgeDispatcher { nowIso, nextAttemptAt: outcome.retryAfterIso ?? nextRetryAt(item, nowIso), }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.appendDispatchAudit(item, 'review_pickup_wake_failed_retryable', outcome.message); return 'retryable'; } if (outcome.reason === 'capability_absent') { - await this.markReviewPickupDeliveryUnavailable(item, nowIso, outcome.message); - return 'superseded'; + await this.markReviewPickupDeliveryUnavailable(item, nowIso, outcome.message, run); + return isDispatchRunCancelled(run) ? 'retryable' : 'superseded'; } await outbox.markFailed({ @@ -321,6 +660,9 @@ export class MemberWorkSyncNudgeDispatcher { retryable: false, nowIso, }); + if (isDispatchRunCancelled(run)) { + return 'retryable'; + } await this.appendDispatchAudit(item, 'nudge_skipped', outcome.message); return 'terminal'; } @@ -328,26 +670,40 @@ export class MemberWorkSyncNudgeDispatcher { private async markReviewPickupDeliveryUnavailable( item: MemberWorkSyncOutboxItem, nowIso: string, - reason: string + reason: string, + run?: MemberWorkSyncNudgeDispatchRun ): Promise { + if (isDispatchRunCancelled(run)) { + return; + } await this.deps.outboxStore?.markSuperseded({ teamName: item.teamName, id: item.id, reason, nowIso, }); + if (isDispatchRunCancelled(run)) { + return; + } await this.appendDispatchAudit(item, 'review_pickup_delivery_unavailable', reason); + if (isDispatchRunCancelled(run)) { + return; + } await this.appendDispatchAudit(item, 'review_pickup_escalated', reason); - await this.notifyReviewPickupEscalation(item, nowIso, reason); + if (isDispatchRunCancelled(run)) { + return; + } + await this.notifyReviewPickupEscalation(item, nowIso, reason, run); } private async notifyReviewPickupEscalation( item: MemberWorkSyncOutboxItem, nowIso: string, - reason: string + reason: string, + run?: MemberWorkSyncNudgeDispatchRun ): Promise { const escalation = this.deps.reviewPickupEscalation; - if (!escalation) { + if (!escalation || isDispatchRunCancelled(run)) { return; } @@ -395,12 +751,16 @@ export class MemberWorkSyncNudgeDispatcher { | { ok: true; providerId?: MemberWorkSyncStatus['providerId'] } | { ok: false; reason: string; retryable: boolean; nextAttemptAt?: string } > { - const teamActive = this.deps.lifecycle - ? await this.deps.lifecycle.isTeamActive(item.teamName) - : true; - if (!teamActive) { + const runtimeActivity = await resolveMemberWorkSyncRuntimeActivity(this.deps, { + teamName: item.teamName, + memberName: item.memberName, + }); + if (!runtimeActivity.teamActive) { return { ok: false, reason: 'team_inactive', retryable: false }; } + if (!runtimeActivity.memberActive) { + return { ok: false, reason: 'member_runtime_inactive', retryable: false }; + } const previous = await this.deps.statusStore.read({ teamName: item.teamName, @@ -424,7 +784,7 @@ export class MemberWorkSyncNudgeDispatcher { agenda, latestAcceptedReport: previous.report?.accepted ? previous.report : null, nowIso, - inactive: source.inactive || !teamActive, + inactive: source.inactive || runtimeActivity.inactive, }); const providerId = source.providerId ?? previous.providerId; const { report: _previousReport, ...previousWithoutReport } = previous; @@ -533,21 +893,46 @@ export class MemberWorkSyncNudgeDispatcher { } const taskIds = item.payload.taskRefs.map((taskRef) => taskRef.taskId); - if ( - this.deps.watchdogCooldown && - (await this.deps.watchdogCooldown.hasRecentNudge({ - teamName: item.teamName, - memberName: item.memberName, - taskIds, - nowIso, - })) - ) { - return { ok: false, reason: 'watchdog_cooldown_active', retryable: true }; + const watchdogCooldown = await this.resolveWatchdogCooldown(item, taskIds, nowIso); + if (watchdogCooldown.active) { + return { + ok: false, + reason: 'watchdog_cooldown_active', + retryable: true, + ...(watchdogCooldown.retryAfterIso + ? { nextAttemptAt: watchdogCooldown.retryAfterIso } + : {}), + }; } return { ok: true, ...(providerId ? { providerId } : {}) }; } + private async resolveWatchdogCooldown( + item: MemberWorkSyncOutboxItem, + taskIds: string[], + nowIso: string + ): Promise<{ active: boolean; retryAfterIso?: string }> { + const watchdogCooldown = this.deps.watchdogCooldown; + if (!watchdogCooldown) { + return { active: false }; + } + const input = { + teamName: item.teamName, + memberName: item.memberName, + taskIds, + nowIso, + }; + if (watchdogCooldown.getRecentNudgeCooldown) { + const result = await watchdogCooldown.getRecentNudgeCooldown(input); + return { + active: result.active, + ...(result.retryAfterIso ? { retryAfterIso: result.retryAfterIso } : {}), + }; + } + return { active: await watchdogCooldown.hasRecentNudge(input) }; + } + private async revalidateProofMissingRecovery( item: MemberWorkSyncOutboxItem, nowIso: string @@ -578,9 +963,10 @@ export class MemberWorkSyncNudgeDispatcher { item: MemberWorkSyncOutboxItem, messageId: string, inserted: boolean, - providerId?: MemberWorkSyncStatus['providerId'] + providerId?: MemberWorkSyncStatus['providerId'], + run?: MemberWorkSyncNudgeDispatchRun ): Promise { - if (!this.deps.nudgeDeliveryWake) { + if (!this.deps.nudgeDeliveryWake || isDispatchRunCancelled(run)) { return; } diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts b/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts index 8c6bce96..a878e463 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts @@ -17,7 +17,11 @@ function statusForResult(input: { if (input.accepted) { return 'accepted'; } - if (input.code === 'member_inactive' || input.code === 'team_runtime_inactive') { + if ( + input.code === 'member_inactive' || + input.code === 'team_runtime_inactive' || + input.code === 'member_runtime_inactive' + ) { return 'superseded'; } return 'rejected'; diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncReconciler.ts b/src/features/member-work-sync/core/application/MemberWorkSyncReconciler.ts index 06a33246..8c737920 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncReconciler.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncReconciler.ts @@ -7,6 +7,7 @@ import { import { appendMemberWorkSyncAudit } from './MemberWorkSyncAudit'; import { MemberWorkSyncNudgeOutboxPlanner } from './MemberWorkSyncNudgeOutboxPlanner'; +import { resolveMemberWorkSyncRuntimeActivity } from './MemberWorkSyncRuntimeActivity'; import type { MemberWorkSyncStatus, MemberWorkSyncStatusRequest } from '../../contracts'; import type { MemberWorkSyncAgendaSourceResult, MemberWorkSyncUseCaseDeps } from './ports'; @@ -14,6 +15,7 @@ import type { MemberWorkSyncAgendaSourceResult, MemberWorkSyncUseCaseDeps } from export interface MemberWorkSyncReconcileContext { reconciledBy?: 'request' | 'queue'; triggerReasons?: string[]; + isCancelled?: () => boolean; recovery?: { kind: 'proof_missing'; intentKey: string; @@ -22,6 +24,19 @@ export interface MemberWorkSyncReconcileContext { }; } +export class MemberWorkSyncReconcileCancelledError extends Error { + constructor() { + super('member work sync reconcile cancelled'); + this.name = 'MemberWorkSyncReconcileCancelledError'; + } +} + +function assertReconcileNotCancelled(context: MemberWorkSyncReconcileContext): void { + if (context.isCancelled?.()) { + throw new MemberWorkSyncReconcileCancelledError(); + } +} + export function finalizeMemberWorkSyncAgenda( deps: MemberWorkSyncUseCaseDeps, source: MemberWorkSyncAgendaSourceResult @@ -61,6 +76,7 @@ export class MemberWorkSyncReconciler { ...(context.triggerReasons?.length ? { triggerReasons: context.triggerReasons } : {}), }); const source = await this.deps.agendaSource.loadAgenda(request); + assertReconcileNotCancelled(context); const agenda = finalizeMemberWorkSyncAgenda(this.deps, source); await appendMemberWorkSyncAudit(this.deps, { teamName: agenda.teamName, @@ -72,21 +88,24 @@ export class MemberWorkSyncReconciler { ...(source.providerId ? { providerId: source.providerId } : {}), diagnostics: agenda.diagnostics, }); + assertReconcileNotCancelled(context); const previous = await this.deps.statusStore.read(request); const nowIso = this.deps.clock.now().toISOString(); - const teamActive = this.deps.lifecycle - ? await this.deps.lifecycle.isTeamActive(agenda.teamName) - : true; + const runtimeActivity = await resolveMemberWorkSyncRuntimeActivity(this.deps, { + teamName: agenda.teamName, + memberName: agenda.memberName, + }); + assertReconcileNotCancelled(context); const decision = decideMemberWorkSyncStatus({ agenda, latestAcceptedReport: previous?.report?.accepted ? previous.report : null, nowIso, - inactive: source.inactive || !teamActive, + inactive: source.inactive || runtimeActivity.inactive, }); await appendMemberWorkSyncAudit(this.deps, { teamName: agenda.teamName, memberName: agenda.memberName, - event: source.inactive || !teamActive ? 'team_inactive' : 'decision_made', + event: source.inactive || runtimeActivity.inactive ? 'team_inactive' : 'decision_made', source: 'reconciler', agendaFingerprint: agenda.fingerprint, state: decision.state, @@ -95,6 +114,7 @@ export class MemberWorkSyncReconciler { diagnostics: decision.diagnostics, }); + assertReconcileNotCancelled(context); const status = await attachMemberWorkSyncReportToken(this.deps, { teamName: agenda.teamName, memberName: agenda.memberName, @@ -125,15 +145,13 @@ export class MemberWorkSyncReconciler { : {}), }, evaluatedAt: nowIso, - diagnostics: [ - ...agenda.diagnostics, - ...(!teamActive ? ['team_runtime_inactive'] : []), - ...decision.diagnostics, - ], + diagnostics: [...agenda.diagnostics, ...runtimeActivity.diagnostics, ...decision.diagnostics], ...(source.providerId ? { providerId: source.providerId } : {}), }); + assertReconcileNotCancelled(context); await this.deps.statusStore.write(status); + assertReconcileNotCancelled(context); await this.planNudgeOutbox(status); return status; } diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts b/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts index 8899468f..fb23c682 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts @@ -6,6 +6,7 @@ import { finalizeMemberWorkSyncAgenda, MemberWorkSyncReconciler, } from './MemberWorkSyncReconciler'; +import { resolveMemberWorkSyncRuntimeActivity } from './MemberWorkSyncRuntimeActivity'; import type { MemberWorkSyncReport, @@ -42,10 +43,11 @@ export class MemberWorkSyncReporter { const source = await this.deps.agendaSource.loadAgenda(request); const agenda = finalizeMemberWorkSyncAgenda(this.deps, source); const nowIso = this.deps.clock.now().toISOString(); - const teamActive = this.deps.lifecycle - ? await this.deps.lifecycle.isTeamActive(agenda.teamName) - : true; - if (!teamActive) { + const runtimeActivity = await resolveMemberWorkSyncRuntimeActivity(this.deps, { + teamName: agenda.teamName, + memberName: agenda.memberName, + }); + if (!runtimeActivity.teamActive) { const status = await this.reconciler.execute(request); const rejectedStatus = await this.recordRejectedReport( status, @@ -59,6 +61,21 @@ export class MemberWorkSyncReporter { status: rejectedStatus, }; } + if (!runtimeActivity.memberActive) { + const status = await this.reconciler.execute(request); + const rejectedStatus = await this.recordRejectedReport( + status, + request, + 'member_runtime_inactive' + ); + return { + accepted: false, + code: 'member_runtime_inactive', + message: + 'Member runtime is not active. Restart this teammate before reporting work sync state.', + status: rejectedStatus, + }; + } const tokenValidation = this.deps.reportToken ? await this.deps.reportToken.verify({ token: request.reportToken, diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncRuntimeActivity.ts b/src/features/member-work-sync/core/application/MemberWorkSyncRuntimeActivity.ts new file mode 100644 index 00000000..91ee0e9f --- /dev/null +++ b/src/features/member-work-sync/core/application/MemberWorkSyncRuntimeActivity.ts @@ -0,0 +1,41 @@ +import type { MemberWorkSyncUseCaseDeps } from './ports'; + +export interface MemberWorkSyncRuntimeActivity { + teamActive: boolean; + memberActive: boolean; + inactive: boolean; + diagnostics: string[]; +} + +export async function resolveMemberWorkSyncRuntimeActivity( + deps: Pick, + input: { teamName: string; memberName: string } +): Promise { + if (!deps.lifecycle) { + return { teamActive: true, memberActive: true, inactive: false, diagnostics: [] }; + } + + const teamActive = await deps.lifecycle.isTeamActive(input.teamName); + if (!teamActive) { + return { + teamActive: false, + memberActive: false, + inactive: true, + diagnostics: ['team_runtime_inactive'], + }; + } + + const memberActive = deps.lifecycle.isMemberActive + ? await deps.lifecycle.isMemberActive(input) + : true; + if (!memberActive) { + return { + teamActive: true, + memberActive: false, + inactive: true, + diagnostics: ['member_runtime_inactive'], + }; + } + + return { teamActive: true, memberActive: true, inactive: false, diagnostics: [] }; +} diff --git a/src/features/member-work-sync/core/application/index.ts b/src/features/member-work-sync/core/application/index.ts index 8232db8a..1704cb76 100644 --- a/src/features/member-work-sync/core/application/index.ts +++ b/src/features/member-work-sync/core/application/index.ts @@ -8,6 +8,7 @@ export * from './MemberWorkSyncNudgeOutboxPlanner'; export * from './MemberWorkSyncPendingReportIntentReplayer'; export * from './MemberWorkSyncReconciler'; export * from './MemberWorkSyncReporter'; +export * from './MemberWorkSyncRuntimeActivity'; export * from './MemberWorkSyncTargetedRecoveryPolicy'; export type * from './ports'; export * from './RuntimeTurnSettledIngestor'; diff --git a/src/features/member-work-sync/core/application/ports.ts b/src/features/member-work-sync/core/application/ports.ts index e779e92c..58400bce 100644 --- a/src/features/member-work-sync/core/application/ports.ts +++ b/src/features/member-work-sync/core/application/ports.ts @@ -56,6 +56,7 @@ export interface MemberWorkSyncReportTokenPort { export interface MemberWorkSyncLifecyclePort { isTeamActive(teamName: string): Promise | boolean; + isMemberActive?(input: { teamName: string; memberName: string }): Promise | boolean; } export interface MemberWorkSyncLoggerPort { @@ -198,6 +199,12 @@ export interface MemberWorkSyncWatchdogCooldownPort { taskIds: string[]; nowIso: string; }): Promise; + getRecentNudgeCooldown?(input: { + teamName: string; + memberName: string; + taskIds: string[]; + nowIso: string; + }): Promise<{ active: boolean; retryAfterIso?: string }>; } export interface MemberWorkSyncBusySignalPort { diff --git a/src/features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter.ts b/src/features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter.ts index d934a14b..641220bf 100644 --- a/src/features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter.ts +++ b/src/features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter.ts @@ -77,6 +77,7 @@ export class MemberWorkSyncTeamChangeRouter { noteTeamChange(event: TeamChangeEvent): void { if (event.type === 'lead-activity' && event.detail === 'offline') { this.queue.dropTeam(event.teamName); + void this.enqueueTeam(event.teamName, 'runtime_activity', 0).catch(() => undefined); return; } diff --git a/src/features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown.ts b/src/features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown.ts index 1873ee06..c5dd44b4 100644 --- a/src/features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown.ts +++ b/src/features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown.ts @@ -7,10 +7,13 @@ const DEFAULT_WATCHDOG_COOLDOWN_MS = 10 * 60_000; interface StallJournalEntry { taskId: string; + memberName?: string; state: string; alertedAt?: string; } +type WatchdogCooldownResult = { active: boolean; retryAfterIso?: string }; + function parseTime(value: string | undefined): number | null { if (!value) { return null; @@ -19,6 +22,10 @@ function parseTime(value: string | undefined): number | null { return Number.isFinite(time) ? time : null; } +function normalizeMemberName(value: string | undefined): string { + return value?.trim().toLowerCase() ?? ''; +} + export class TeamTaskStallJournalWorkSyncCooldown implements MemberWorkSyncWatchdogCooldownPort { constructor( private readonly teamsBasePath: string, @@ -31,9 +38,18 @@ export class TeamTaskStallJournalWorkSyncCooldown implements MemberWorkSyncWatch taskIds: string[]; nowIso: string; }): Promise { + return (await this.getRecentNudgeCooldown(input)).active; + } + + async getRecentNudgeCooldown(input: { + teamName: string; + memberName: string; + taskIds: string[]; + nowIso: string; + }): Promise { const taskIds = new Set(input.taskIds); if (taskIds.size === 0) { - return false; + return { active: false }; } try { @@ -43,19 +59,34 @@ export class TeamTaskStallJournalWorkSyncCooldown implements MemberWorkSyncWatch ); const parsed = JSON.parse(raw) as unknown; if (!Array.isArray(parsed)) { - return false; + return { active: false }; } const now = parseTime(input.nowIso) ?? Date.now(); - return parsed.some((entry): boolean => { + const expectedMemberName = normalizeMemberName(input.memberName); + let retryAfterMs: number | null = null; + for (const entry of parsed) { const row = entry as Partial; if (row.state !== 'alerted' || !row.taskId || !taskIds.has(row.taskId)) { - return false; + continue; + } + const rowMemberName = normalizeMemberName(row.memberName); + if (rowMemberName && rowMemberName !== expectedMemberName) { + continue; } const alertedAt = parseTime(row.alertedAt); - return alertedAt != null && now - alertedAt <= this.cooldownMs; - }); + if (alertedAt == null || alertedAt > now || now - alertedAt >= this.cooldownMs) { + continue; + } + const entryRetryAfterMs = alertedAt + this.cooldownMs; + retryAfterMs = + retryAfterMs == null ? entryRetryAfterMs : Math.max(retryAfterMs, entryRetryAfterMs); + } + if (retryAfterMs == null) { + return { active: false }; + } + return { active: true, retryAfterIso: new Date(retryAfterMs).toISOString() }; } catch { - return false; + return { active: false }; } } } diff --git a/src/features/member-work-sync/main/composition/__tests__/memberWorkSyncTeamActivity.test.ts b/src/features/member-work-sync/main/composition/__tests__/memberWorkSyncTeamActivity.test.ts index d352c19f..7079c5e2 100644 --- a/src/features/member-work-sync/main/composition/__tests__/memberWorkSyncTeamActivity.test.ts +++ b/src/features/member-work-sync/main/composition/__tests__/memberWorkSyncTeamActivity.test.ts @@ -1,8 +1,11 @@ import { describe, expect, it } from 'vitest'; import { + hasUncertainWorkSyncRuntimeActivity, hasWorkSyncActiveRuntime, isRuntimeEntryActiveForWorkSync, + isRuntimeMemberActiveForWorkSync, + isRuntimeMemberActivityUncertainForWorkSync, } from '../memberWorkSyncTeamActivity'; import type { TeamAgentRuntimeEntry, TeamAgentRuntimeSnapshot } from '@shared/types'; @@ -39,14 +42,49 @@ describe('member work sync team activity', () => { }); it('treats a confirmed bootstrap runtime entry as active', () => { + for (const pidSource of ['agent_process_table', 'opencode_bridge'] as const) { + expect( + isRuntimeEntryActiveForWorkSync( + createRuntimeEntry({ + livenessKind: 'confirmed_bootstrap', + pidSource, + runtimeLastSeenAt: '2026-05-18T19:44:47.000Z', + }) + ) + ).toBe(true); + } + }); + + it('does not treat bootstrap-only confirmation as active runtime evidence', () => { + for (const pidSource of [ + undefined, + 'runtime_bootstrap', + 'persisted_metadata', + 'tmux_child', + 'tmux_pane', + ] as const) { + expect( + isRuntimeEntryActiveForWorkSync( + createRuntimeEntry({ + livenessKind: 'confirmed_bootstrap', + ...(pidSource ? { pidSource } : {}), + }) + ) + ).toBe(false); + } + }); + + it('does not count lead runtime entries as work-sync active teammates', () => { expect( isRuntimeEntryActiveForWorkSync( createRuntimeEntry({ - livenessKind: 'confirmed_bootstrap', - runtimeLastSeenAt: '2026-05-18T19:44:47.000Z', + memberName: 'team-lead', + backendType: 'lead', + livenessKind: undefined, + pidSource: 'lead_process', }) ) - ).toBe(true); + ).toBe(false); }); it('does not treat inactive liveness diagnostics as active by themselves', () => { @@ -77,6 +115,12 @@ describe('member work sync team activity', () => { expect( hasWorkSyncActiveRuntime( createRuntimeSnapshot({ + 'team-lead': createRuntimeEntry({ + memberName: 'team-lead', + backendType: 'lead', + livenessKind: undefined, + pidSource: 'lead_process', + }), alice: createRuntimeEntry({ alive: false, livenessKind: 'stale_metadata' }), bob: createRuntimeEntry({ memberName: 'bob', livenessKind: 'runtime_process' }), }) @@ -88,6 +132,12 @@ describe('member work sync team activity', () => { expect( hasWorkSyncActiveRuntime( createRuntimeSnapshot({ + 'team-lead': createRuntimeEntry({ + memberName: 'team-lead', + backendType: 'lead', + livenessKind: undefined, + pidSource: 'lead_process', + }), alice: createRuntimeEntry({ alive: false, livenessKind: 'stale_metadata' }), bob: createRuntimeEntry({ memberName: 'bob', @@ -99,6 +149,50 @@ describe('member work sync team activity', () => { ).toBe(false); }); + it('checks active runtime evidence for a specific teammate', () => { + const snapshot = createRuntimeSnapshot({ + alice: createRuntimeEntry({ memberName: 'alice', livenessKind: 'runtime_process' }), + bob: createRuntimeEntry({ memberName: 'bob', alive: false, livenessKind: 'stale_metadata' }), + }); + + expect(isRuntimeMemberActiveForWorkSync(snapshot, 'ALICE')).toBe(true); + expect(isRuntimeMemberActiveForWorkSync(snapshot, 'bob')).toBe(false); + expect(isRuntimeMemberActiveForWorkSync(snapshot, 'team-lead')).toBe(false); + }); + + it('treats process table unavailability as uncertain runtime activity', () => { + const snapshot = createRuntimeSnapshot({ + alice: createRuntimeEntry({ + memberName: 'alice', + alive: false, + livenessKind: 'registered_only', + runtimeDiagnostic: 'runtime pid could not be verified because process table unavailable', + }), + bob: createRuntimeEntry({ memberName: 'bob', alive: false, livenessKind: 'stale_metadata' }), + }); + + expect(hasWorkSyncActiveRuntime(snapshot)).toBe(false); + expect(hasUncertainWorkSyncRuntimeActivity(snapshot)).toBe(true); + expect(isRuntimeMemberActivityUncertainForWorkSync(snapshot, 'alice')).toBe(true); + expect(isRuntimeMemberActivityUncertainForWorkSync(snapshot, 'bob')).toBe(false); + }); + + it('recognizes process table is unavailable diagnostics as uncertain runtime activity', () => { + const snapshot = createRuntimeSnapshot({ + alice: createRuntimeEntry({ + memberName: 'alice', + alive: false, + livenessKind: 'confirmed_bootstrap', + pidSource: 'runtime_bootstrap', + runtimeDiagnostic: 'runtime pid could not be verified because process table is unavailable', + }), + }); + + expect(hasWorkSyncActiveRuntime(snapshot)).toBe(false); + expect(hasUncertainWorkSyncRuntimeActivity(snapshot)).toBe(true); + expect(isRuntimeMemberActivityUncertainForWorkSync(snapshot, 'alice')).toBe(true); + }); + it('handles missing snapshots as inactive', () => { expect(hasWorkSyncActiveRuntime(null)).toBe(false); expect(hasWorkSyncActiveRuntime(undefined)).toBe(false); diff --git a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts index ecd2e932..c617e653 100644 --- a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts +++ b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts @@ -228,6 +228,7 @@ export function createMemberWorkSyncFeature(deps: { kanbanManager: TeamKanbanManager; membersMetaStore: TeamMembersMetaStore; isTeamActive?: (teamName: string) => Promise | boolean; + isMemberActive?: (input: { teamName: string; memberName: string }) => Promise | boolean; canDispatchNudges?: (teamName: string) => Promise | boolean; listLifecycleActiveTeamNames?: () => Promise; queueQuietWindowMs?: number; @@ -312,7 +313,14 @@ export function createMemberWorkSyncFeature(deps: { ...(deps.reviewPickupEscalation ? { reviewPickupEscalation: deps.reviewPickupEscalation } : {}), reportToken, auditJournal, - ...(deps.isTeamActive ? { lifecycle: { isTeamActive: deps.isTeamActive } } : {}), + ...(deps.isTeamActive + ? { + lifecycle: { + isTeamActive: deps.isTeamActive, + ...(deps.isMemberActive ? { isMemberActive: deps.isMemberActive } : {}), + }, + } + : {}), logger: deps.logger, }; const diagnosticsReader = new MemberWorkSyncDiagnosticsReader(useCaseDeps); @@ -328,6 +336,16 @@ export function createMemberWorkSyncFeature(deps: { retryable: 0, terminal: 0, }); + const addNudgeDispatchSummaries = ( + left: MemberWorkSyncNudgeDispatchSummary, + right: MemberWorkSyncNudgeDispatchSummary + ): MemberWorkSyncNudgeDispatchSummary => ({ + claimed: left.claimed + right.claimed, + delivered: left.delivered + right.delivered, + superseded: left.superseded + right.superseded, + retryable: left.retryable + right.retryable, + terminal: left.terminal + right.terminal, + }); const filterNudgeDispatchReadyTeamNames = async (teamNames: string[]): Promise => { const uniqueTeamNames = [...new Set(teamNames.map((name) => name.trim()).filter(Boolean))]; if (!deps.canDispatchNudges) { @@ -401,22 +419,30 @@ export function createMemberWorkSyncFeature(deps: { if (readyTeamNames.length === 0) { return emptyNudgeDispatchSummary(); } + const dispatchReadyNudges = () => + nudgeDispatcher.dispatchDue({ + teamNames: readyTeamNames, + claimedBy, + }); + const initialSummary = await dispatchReadyNudges(); if (options.refreshBackgroundStaleStatuses !== false) { await refreshBackgroundStaleStatuses(readyTeamNames); + return addNudgeDispatchSummaries(initialSummary, await dispatchReadyNudges()); } - return nudgeDispatcher.dispatchDue({ - teamNames: readyTeamNames, - claimedBy, - }); + return initialSummary; }; const queue = new MemberWorkSyncEventQueue({ reconcile: async (request, context: MemberWorkSyncReconcileContext) => { await reconciler.execute(request, context); + if (context.isCancelled?.()) { + return; + } await dispatchNudgesForReadyTeams([request.teamName], `member-work-sync:${process.pid}`, { refreshBackgroundStaleStatuses: false, }); }, isTeamActive: deps.isTeamActive ?? (() => true), + reconcileInactiveTeams: true, ...(deps.queueQuietWindowMs != null ? { quietWindowMs: deps.queueQuietWindowMs } : {}), auditJournal, logger: deps.logger, diff --git a/src/features/member-work-sync/main/composition/memberWorkSyncTeamActivity.ts b/src/features/member-work-sync/main/composition/memberWorkSyncTeamActivity.ts index a6a475a2..a739f201 100644 --- a/src/features/member-work-sync/main/composition/memberWorkSyncTeamActivity.ts +++ b/src/features/member-work-sync/main/composition/memberWorkSyncTeamActivity.ts @@ -1,7 +1,17 @@ -import type { TeamAgentRuntimeEntry, TeamAgentRuntimeSnapshot } from '@shared/types'; +import { mentionsProcessTableUnavailable } from '@shared/utils/teamLaunchFailureReason'; + +import { normalizeMemberName } from '../../core/domain'; + +import type { + TeamAgentRuntimeEntry, + TeamAgentRuntimePidSource, + TeamAgentRuntimeSnapshot, +} from '@shared/types'; type RuntimeLivenessKind = NonNullable; +const WORK_SYNC_RESERVED_MEMBER_NAMES = new Set(['team-lead', 'user']); + const WORK_SYNC_INACTIVE_LIVENESS_KINDS = new Set([ 'permission_blocked', 'runtime_process_candidate', @@ -11,20 +21,107 @@ const WORK_SYNC_INACTIVE_LIVENESS_KINDS = new Set([ 'not_found', ]); +const WORK_SYNC_BOOTSTRAP_ONLY_PID_SOURCES = new Set([ + 'runtime_bootstrap', + 'persisted_metadata', +]); + +const WORK_SYNC_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES = new Set([ + 'agent_process_table', + 'opencode_bridge', +]); + export function isRuntimeEntryActiveForWorkSync( - entry: Pick | null | undefined + entry: + | Pick< + TeamAgentRuntimeEntry, + 'alive' | 'backendType' | 'livenessKind' | 'memberName' | 'pidSource' + > + | null + | undefined ): boolean { if (entry?.alive !== true) { return false; } + if ( + entry.backendType === 'lead' || + WORK_SYNC_RESERVED_MEMBER_NAMES.has(entry.memberName.trim().toLowerCase()) + ) { + return false; + } + if ( + entry.livenessKind === 'confirmed_bootstrap' && + (!entry.pidSource || + WORK_SYNC_BOOTSTRAP_ONLY_PID_SOURCES.has(entry.pidSource) || + !WORK_SYNC_CONFIRMED_BOOTSTRAP_ACTIVE_PID_SOURCES.has(entry.pidSource)) + ) { + return false; + } if (!entry.livenessKind) { return true; } return !WORK_SYNC_INACTIVE_LIVENESS_KINDS.has(entry.livenessKind); } +function isRuntimeEntryRelevantForWorkSync( + entry: Pick +): boolean { + return ( + entry.backendType !== 'lead' && + !WORK_SYNC_RESERVED_MEMBER_NAMES.has(entry.memberName.trim().toLowerCase()) + ); +} + +function runtimeEntryMentionsProcessTableUnavailable( + entry: Pick +): boolean { + return [entry.runtimeDiagnostic, ...(entry.diagnostics ?? [])].some((message) => + mentionsProcessTableUnavailable(message) + ); +} + +export function hasUncertainWorkSyncRuntimeActivity( + snapshot: Pick | null | undefined +): boolean { + return Object.values(snapshot?.members ?? {}).some( + (entry) => + isRuntimeEntryRelevantForWorkSync(entry) && runtimeEntryMentionsProcessTableUnavailable(entry) + ); +} + export function hasWorkSyncActiveRuntime( snapshot: Pick | null | undefined ): boolean { return Object.values(snapshot?.members ?? {}).some(isRuntimeEntryActiveForWorkSync); } + +export function isRuntimeMemberActiveForWorkSync( + snapshot: Pick | null | undefined, + memberName: string +): boolean { + const normalizedMemberName = normalizeMemberName(memberName); + if (!normalizedMemberName) { + return false; + } + return Object.values(snapshot?.members ?? {}).some( + (entry) => + normalizeMemberName(entry.memberName) === normalizedMemberName && + isRuntimeEntryActiveForWorkSync(entry) + ); +} + +export function isRuntimeMemberActivityUncertainForWorkSync( + snapshot: Pick | null | undefined, + memberName: string +): boolean { + const normalizedMemberName = normalizeMemberName(memberName); + if (!normalizedMemberName) { + return false; + } + return Object.values(snapshot?.members ?? {}).some( + (entry) => + normalizeMemberName(entry.memberName) === normalizedMemberName && + isRuntimeEntryRelevantForWorkSync(entry) && + runtimeEntryMentionsProcessTableUnavailable(entry) + ); +} diff --git a/src/features/member-work-sync/main/index.ts b/src/features/member-work-sync/main/index.ts index 6f5f1fa8..f451680f 100644 --- a/src/features/member-work-sync/main/index.ts +++ b/src/features/member-work-sync/main/index.ts @@ -9,6 +9,9 @@ export { createMemberWorkSyncFeature, } from './composition/createMemberWorkSyncFeature'; export { + hasUncertainWorkSyncRuntimeActivity, hasWorkSyncActiveRuntime, isRuntimeEntryActiveForWorkSync, + isRuntimeMemberActiveForWorkSync, + isRuntimeMemberActivityUncertainForWorkSync, } from './composition/memberWorkSyncTeamActivity'; diff --git a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts index c0017130..c1477ed0 100644 --- a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts +++ b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts @@ -283,7 +283,7 @@ function isStaleClaim(claimedAt: string | undefined, nowIso: string): boolean { return ( claimedAtMs != null && nowMs != null && - nowMs - claimedAtMs >= MEMBER_WORK_SYNC_OUTBOX_CLAIM_STALE_MS + (claimedAtMs > nowMs || nowMs - claimedAtMs >= MEMBER_WORK_SYNC_OUTBOX_CLAIM_STALE_MS) ); } @@ -298,6 +298,18 @@ function applyOptionalNextAttemptAt( delete item.nextAttemptAt; } +function isNextAttemptDue(nextAttemptAt: string | undefined, nowIso: string): boolean { + if (!nextAttemptAt) { + return true; + } + const nextAttemptAtMs = parseIsoMs(nextAttemptAt); + if (nextAttemptAtMs == null) { + return true; + } + const nowMs = parseIsoMs(nowIso); + return nowMs != null && nextAttemptAtMs <= nowMs; +} + function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boolean { if (item.status === 'claimed') { return isStaleClaim(item.claimedAt ?? item.updatedAt, nowIso); @@ -305,10 +317,7 @@ function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boo if (item.status !== 'pending' && item.status !== 'failed_retryable') { return false; } - if (!item.nextAttemptAt) { - return true; - } - return item.nextAttemptAt <= nowIso; + return isNextAttemptDue(item.nextAttemptAt, nowIso); } function canClaimOutboxRoute(route: OutboxIndexRoute, nowIso: string): boolean { @@ -317,7 +326,7 @@ function canClaimOutboxRoute(route: OutboxIndexRoute, nowIso: string): boolean { } return ( (route.status === 'pending' || route.status === 'failed_retryable') && - (!route.nextAttemptAt || route.nextAttemptAt <= nowIso) + isNextAttemptDue(route.nextAttemptAt, nowIso) ); } @@ -958,7 +967,7 @@ export class JsonMemberWorkSyncStore async markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise { await this.updateOutboxItem(input.teamName, input.id, (current) => { - if (current?.attemptGeneration !== input.attemptGeneration) { + if (current?.attemptGeneration !== input.attemptGeneration || current.status !== 'claimed') { return current; } const next: MemberWorkSyncOutboxItem = { @@ -993,7 +1002,10 @@ export class JsonMemberWorkSyncStore async markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise { await this.updateOutboxItem(input.teamName, input.id, (current) => { - if (current?.attemptGeneration !== input.attemptGeneration) { + if ( + current?.attemptGeneration !== input.attemptGeneration || + isOutboxTerminal(current.status) + ) { return current; } const next: MemberWorkSyncOutboxItem = { diff --git a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts index 446bb726..705bfe0b 100644 --- a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts +++ b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts @@ -85,10 +85,12 @@ export interface MemberWorkSyncEventQueueDeps { context: MemberWorkSyncReconcileContext ): Promise; isTeamActive(teamName: string): Promise | boolean; + reconcileInactiveTeams?: boolean; quietWindowMs?: number; triggerTiming?: Partial>>; concurrency?: number; retryDelayMs?: number; + reconcileTimeoutMs?: number; maxRetryAttempts?: number; now?: () => number; nowIso?: () => string; @@ -104,6 +106,8 @@ function unrefTimer(timer: ReturnType): void { timer.unref?.(); } +const DEFAULT_RECONCILE_TIMEOUT_MS = 2 * 60_000; + export class MemberWorkSyncEventQueue { private readonly items = new Map(); private readonly running = new Map(); @@ -111,6 +115,7 @@ export class MemberWorkSyncEventQueue { private readonly quietWindowMs: number; private readonly concurrency: number; private readonly retryDelayMs: number; + private readonly reconcileTimeoutMs: number; private readonly maxRetryAttempts: number; private readonly now: () => number; private readonly nowIso: () => string; @@ -128,6 +133,7 @@ export class MemberWorkSyncEventQueue { this.quietWindowMs = deps.quietWindowMs ?? 90_000; this.concurrency = Math.max(1, deps.concurrency ?? 2); this.retryDelayMs = Math.max(0, deps.retryDelayMs ?? 30_000); + this.reconcileTimeoutMs = Math.max(1, deps.reconcileTimeoutMs ?? DEFAULT_RECONCILE_TIMEOUT_MS); this.maxRetryAttempts = Math.max(0, deps.maxRetryAttempts ?? 3); this.now = deps.now ?? Date.now; this.nowIso = deps.nowIso ?? (() => new Date().toISOString()); @@ -475,7 +481,7 @@ export class MemberWorkSyncEventQueue { } private async executeItem(_key: string, item: QueueItem, running: RunningItem): Promise { - if (!(await this.deps.isTeamActive(item.teamName))) { + if (!this.deps.reconcileInactiveTeams && !(await this.deps.isTeamActive(item.teamName))) { this.counters.dropped += 1; this.appendAudit({ teamName: item.teamName, @@ -488,7 +494,7 @@ export class MemberWorkSyncEventQueue { } const recovery = running.recovery ?? item.recovery; - await this.deps.reconcile( + await this.runReconcileWithTimeout( { teamName: item.teamName, memberName: item.memberName }, { reconciledBy: 'queue', @@ -506,6 +512,39 @@ export class MemberWorkSyncEventQueue { }); } + private async runReconcileWithTimeout( + input: { teamName: string; memberName: string }, + context: MemberWorkSyncReconcileContext + ): Promise { + let timeout: ReturnType | null = null; + let timedOut = false; + const reconcilePromise = this.deps.reconcile(input, { + ...context, + isCancelled: () => timedOut || context.isCancelled?.() === true, + }); + void reconcilePromise.catch(() => undefined); + try { + await Promise.race([ + reconcilePromise, + new Promise((_, reject) => { + timeout = setTimeout(() => { + timedOut = true; + reject( + new Error( + `member work sync queue reconcile timed out after ${this.reconcileTimeoutMs}ms` + ) + ); + }, this.reconcileTimeoutMs); + unrefTimer(timeout); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } + } + private appendAudit(input: Omit): void { if (!this.deps.auditJournal) { return; diff --git a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler.ts b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler.ts index 276b04a1..aacf3f7b 100644 --- a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler.ts +++ b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler.ts @@ -4,6 +4,7 @@ import type { } from '../../core/application'; const DEFAULT_NUDGE_DISPATCH_INTERVAL_MS = 60_000; +const DEFAULT_NUDGE_DISPATCH_TIMEOUT_MS = 2 * 60_000; function uniqueNonEmpty(values: string[]): string[] { return [...new Set(values.map((value) => value.trim()).filter(Boolean))]; @@ -17,17 +18,23 @@ export interface MemberWorkSyncNudgeDispatchSchedulerDeps { listLifecycleActiveTeamNames(): Promise; dispatchDue(teamNames: string[]): Promise; intervalMs?: number; + dispatchTimeoutMs?: number; logger?: MemberWorkSyncLoggerPort; } export class MemberWorkSyncNudgeDispatchScheduler { private readonly intervalMs: number; + private readonly dispatchTimeoutMs: number; private timer: ReturnType | null = null; private running: Promise | null = null; private stopped = false; constructor(private readonly deps: MemberWorkSyncNudgeDispatchSchedulerDeps) { this.intervalMs = Math.max(10_000, deps.intervalMs ?? DEFAULT_NUDGE_DISPATCH_INTERVAL_MS); + this.dispatchTimeoutMs = Math.max( + 1, + deps.dispatchTimeoutMs ?? DEFAULT_NUDGE_DISPATCH_TIMEOUT_MS + ); } start(): void { @@ -84,11 +91,11 @@ export class MemberWorkSyncNudgeDispatchScheduler { private async dispatchOnce(): Promise { try { - const teamNames = uniqueNonEmpty(await this.deps.listLifecycleActiveTeamNames()); + const teamNames = uniqueNonEmpty(await this.listLifecycleActiveTeamNamesWithTimeout()); if (teamNames.length === 0) { return; } - const summary = await this.deps.dispatchDue(teamNames); + const summary = await this.runDispatchDueWithTimeout(teamNames); if (summary.claimed > 0 || summary.delivered > 0 || summary.retryable > 0) { this.deps.logger?.debug('member work sync scheduled nudge dispatch completed', { teamCount: teamNames.length, @@ -101,4 +108,56 @@ export class MemberWorkSyncNudgeDispatchScheduler { }); } } + + private async runDispatchDueWithTimeout( + teamNames: string[] + ): Promise { + let timeout: ReturnType | null = null; + const work = this.deps.dispatchDue(teamNames); + void work.catch(() => undefined); + try { + return await Promise.race([ + work, + new Promise((_, reject) => { + timeout = setTimeout(() => { + reject( + new Error( + `member work sync scheduled nudge dispatch timed out after ${this.dispatchTimeoutMs}ms` + ) + ); + }, this.dispatchTimeoutMs); + unrefTimer(timeout); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } + } + + private async listLifecycleActiveTeamNamesWithTimeout(): Promise { + let timeout: ReturnType | null = null; + const work = this.deps.listLifecycleActiveTeamNames(); + void work.catch(() => undefined); + try { + return await Promise.race([ + work, + new Promise((_, reject) => { + timeout = setTimeout(() => { + reject( + new Error( + `member work sync scheduled nudge team listing timed out after ${this.dispatchTimeoutMs}ms` + ) + ); + }, this.dispatchTimeoutMs); + unrefTimer(timeout); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } + } } diff --git a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal.ts b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal.ts index af25b61b..7cb33637 100644 --- a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal.ts +++ b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal.ts @@ -30,6 +30,10 @@ function parseIsoMs(value: string | undefined, fallbackMs: number): number { return Number.isFinite(parsed) ? parsed : fallbackMs; } +function parseEventIsoMs(value: string | undefined, nowMs: number): number { + return Math.min(parseIsoMs(value, nowMs), nowMs); +} + function addMsIso(baseIso: string, ms: number): string { return new Date(Date.parse(baseIso) + ms).toISOString(); } @@ -136,7 +140,7 @@ export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusyS return; } const state = this.getOrCreateState(teamName, memberName); - const startedAtMs = parseIsoMs(startedAt, Date.now()); + const startedAtMs = parseEventIsoMs(startedAt, Date.now()); state.activeToolStartedAtByToolId.set(normalizedToolUseId, new Date(startedAtMs).toISOString()); state.recentBusyUntilByToolId.delete(normalizedToolUseId); } @@ -151,7 +155,7 @@ export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusyS if (!memberName.trim() || !normalizedToolUseId) { return; } - const finishedAtMs = parseIsoMs(finishedAt, Date.now()); + const finishedAtMs = parseEventIsoMs(finishedAt, Date.now()); const busyUntilIso = new Date(finishedAtMs + this.busyGraceMs).toISOString(); const state = this.getOrCreateState(teamName, memberName); state.activeToolStartedAtByToolId.delete(normalizedToolUseId); diff --git a/src/features/member-work-sync/main/infrastructure/RuntimeTurnSettledDrainScheduler.ts b/src/features/member-work-sync/main/infrastructure/RuntimeTurnSettledDrainScheduler.ts index 1e82dc55..29cc2513 100644 --- a/src/features/member-work-sync/main/infrastructure/RuntimeTurnSettledDrainScheduler.ts +++ b/src/features/member-work-sync/main/infrastructure/RuntimeTurnSettledDrainScheduler.ts @@ -6,6 +6,7 @@ import type { export interface RuntimeTurnSettledDrainSchedulerDeps { drain(): Promise; intervalMs?: number; + drainTimeoutMs?: number; logger?: MemberWorkSyncLoggerPort; } @@ -13,14 +14,21 @@ function unrefTimer(timer: ReturnType): void { timer.unref?.(); } +const DEFAULT_RUNTIME_TURN_SETTLED_DRAIN_TIMEOUT_MS = 2 * 60_000; + export class RuntimeTurnSettledDrainScheduler { private readonly intervalMs: number; + private readonly drainTimeoutMs: number; private timer: ReturnType | null = null; private running = false; private disposed = false; constructor(private readonly deps: RuntimeTurnSettledDrainSchedulerDeps) { this.intervalMs = Math.max(1_000, deps.intervalMs ?? 15_000); + this.drainTimeoutMs = Math.max( + 1, + deps.drainTimeoutMs ?? DEFAULT_RUNTIME_TURN_SETTLED_DRAIN_TIMEOUT_MS + ); } start(): void { @@ -37,7 +45,7 @@ export class RuntimeTurnSettledDrainScheduler { this.running = true; try { - return await this.deps.drain(); + return await this.runDrainWithTimeout(); } catch (error) { this.deps.logger?.warn('runtime turn settled scheduled drain failed', { error: String(error), @@ -66,4 +74,25 @@ export class RuntimeTurnSettledDrainScheduler { }, delayMs); unrefTimer(this.timer); } + + private async runDrainWithTimeout(): Promise { + let timeout: ReturnType | null = null; + try { + return await Promise.race([ + this.deps.drain(), + new Promise((_, reject) => { + timeout = setTimeout(() => { + reject( + new Error(`runtime turn settled drain timed out after ${this.drainTimeoutMs}ms`) + ); + }, this.drainTimeoutMs); + unrefTimer(timeout); + }), + ]); + } finally { + if (timeout) { + clearTimeout(timeout); + } + } + } } diff --git a/src/main/index.ts b/src/main/index.ts index 91ae9cc8..46d16e8d 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -40,7 +40,10 @@ import { import { buildMemberWorkSyncRuntimeTurnSettledEnvironment, createMemberWorkSyncFeature, + hasUncertainWorkSyncRuntimeActivity, hasWorkSyncActiveRuntime, + isRuntimeMemberActivityUncertainForWorkSync, + isRuntimeMemberActiveForWorkSync, type MemberWorkSyncFeatureFacade, registerMemberWorkSyncIpc, removeMemberWorkSyncIpc, @@ -1820,7 +1823,10 @@ async function initializeServices(): Promise { teammateToolTracker = new TeammateToolTracker( teamMemberLogsFinder, teamLogSourceTracker, - forwardTeamChange + (event) => { + forwardTeamChange(event); + memberWorkSyncFeature?.noteTeamChange(event); + } ); // Allow TeamProvisioningService to trigger team refresh events (e.g. live lead replies). const teamChangeEmitter = (event: TeamChangeEvent): void => { @@ -1878,41 +1884,140 @@ async function initializeServices(): Promise { }); runtimeProviderManagementFeature = createRuntimeProviderManagementFeature(); const memberWorkSyncLogger = createLogger('Feature:MemberWorkSync'); - const hasMemberWorkSyncRuntimeActivity = async (teamName: string): Promise => { + const getMemberWorkSyncRuntimeSnapshot = async (input: { + teamName: string; + memberName?: string; + }) => { + const timeoutMs = 15_000; + let timer: ReturnType | null = null; + const snapshot = teamProvisioningService.getTeamAgentRuntimeSnapshot(input.teamName); + void snapshot.catch(() => undefined); try { - const snapshot = await teamProvisioningService.getTeamAgentRuntimeSnapshot(teamName); - return hasWorkSyncActiveRuntime(snapshot); + return await Promise.race([ + snapshot, + new Promise((resolve) => { + timer = setTimeout(() => { + memberWorkSyncLogger.warn('member work sync runtime snapshot timed out', { + teamName: input.teamName, + ...(input.memberName ? { memberName: input.memberName } : {}), + timeoutMs, + }); + resolve(null); + }, timeoutMs); + timer.unref?.(); + }), + ]); + } finally { + if (timer) { + clearTimeout(timer); + } + } + }; + const getMemberWorkSyncRuntimeActivity = async (teamName: string): Promise => { + try { + const snapshot = await getMemberWorkSyncRuntimeSnapshot({ teamName }); + if (!snapshot) { + return null; + } + const active = hasWorkSyncActiveRuntime(snapshot); + if (!active && hasUncertainWorkSyncRuntimeActivity(snapshot)) { + return null; + } + return active; } catch (error) { memberWorkSyncLogger.warn('member work sync runtime activity check failed', { teamName, error: String(error), }); - return false; + return null; + } + }; + const getMemberWorkSyncMemberRuntimeActivity = async (input: { + teamName: string; + memberName: string; + }): Promise => { + try { + const snapshot = await getMemberWorkSyncRuntimeSnapshot(input); + if (!snapshot) { + return null; + } + const active = isRuntimeMemberActiveForWorkSync(snapshot, input.memberName); + if (!active && isRuntimeMemberActivityUncertainForWorkSync(snapshot, input.memberName)) { + return null; + } + return active; + } catch (error) { + memberWorkSyncLogger.warn('member work sync member runtime activity check failed', { + teamName: input.teamName, + memberName: input.memberName, + error: String(error), + }); + return null; } }; const isTeamActiveForMemberWorkSync = async (teamName: string): Promise => { - if ( + const runtimeActive = await getMemberWorkSyncRuntimeActivity(teamName); + if (runtimeActive != null) { + return runtimeActive; + } + return ( teamProvisioningService.isTeamAlive(teamName) || teamProvisioningService.hasProvisioningRun(teamName) - ) { - return true; - } - return hasMemberWorkSyncRuntimeActivity(teamName); + ); }; const canDispatchMemberWorkSyncNudges = async (teamName: string): Promise => { - if (teamProvisioningService.isTeamAlive(teamName)) { - return true; + const runtimeActive = await getMemberWorkSyncRuntimeActivity(teamName); + if (runtimeActive != null) { + return runtimeActive; } - return hasMemberWorkSyncRuntimeActivity(teamName); + return teamProvisioningService.isTeamAlive(teamName); + }; + const isMemberActiveForMemberWorkSync = async (input: { + teamName: string; + memberName: string; + }): Promise => { + const runtimeActive = await getMemberWorkSyncMemberRuntimeActivity(input); + if (runtimeActive != null) { + return runtimeActive; + } + return ( + teamProvisioningService.isTeamAlive(input.teamName) || + teamProvisioningService.hasProvisioningRun(input.teamName) + ); }; const listMemberWorkSyncLifecycleActiveTeamNames = async (): Promise => { + const teams = (await teamDataService.listTeams()).filter((team) => !team.deletedAt); + const activeChecks = await Promise.allSettled( + teams.map(async (team) => { + try { + return { + teamName: team.teamName, + active: await isTeamActiveForMemberWorkSync(team.teamName), + }; + } catch (error) { + memberWorkSyncLogger.warn('member work sync lifecycle team activity check failed', { + teamName: team.teamName, + error: String(error), + }); + return { + teamName: team.teamName, + active: + teamProvisioningService.isTeamAlive(team.teamName) || + teamProvisioningService.hasProvisioningRun(team.teamName), + }; + } + }) + ); const activeTeamNames: string[] = []; - for (const team of await teamDataService.listTeams()) { - if (team.deletedAt) { + for (const check of activeChecks) { + if (check.status === 'rejected') { + memberWorkSyncLogger.warn('member work sync lifecycle team activity check failed', { + error: String(check.reason), + }); continue; } - if (await isTeamActiveForMemberWorkSync(team.teamName)) { - activeTeamNames.push(team.teamName); + if (check.value.active) { + activeTeamNames.push(check.value.teamName); } } return activeTeamNames; @@ -1924,6 +2029,7 @@ async function initializeServices(): Promise { kanbanManager: new TeamKanbanManager(), membersMetaStore: new TeamMembersMetaStore(), isTeamActive: isTeamActiveForMemberWorkSync, + isMemberActive: isMemberActiveForMemberWorkSync, canDispatchNudges: canDispatchMemberWorkSyncNudges, listLifecycleActiveTeamNames: listMemberWorkSyncLifecycleActiveTeamNames, extraBusySignals: [ diff --git a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts index 25f747de..af3d67cf 100644 --- a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts +++ b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts @@ -6,6 +6,7 @@ import { MemberWorkSyncNudgeDispatcher, type MemberWorkSyncOutboxStorePort, MemberWorkSyncPendingReportIntentReplayer, + MemberWorkSyncReconcileCancelledError, MemberWorkSyncReconciler, MemberWorkSyncReporter, type MemberWorkSyncReviewPickupDeliveryPort, @@ -13,7 +14,7 @@ import { type MemberWorkSyncStatusStorePort, type MemberWorkSyncUseCaseDeps, } from '@features/member-work-sync/core/application'; -import { describe, expect, it } from 'vitest'; +import { describe, expect, it, vi } from 'vitest'; import type { MemberWorkSyncActionableWorkItem, @@ -79,6 +80,10 @@ const secondReviewPickupItem: MemberWorkSyncActionableWorkItem = { }, }; +function isTerminalOutboxStatus(status: MemberWorkSyncOutboxItem['status']): boolean { + return status === 'delivered' || status === 'superseded' || status === 'failed_terminal'; +} + class MutableClock { private current = new Date('2026-04-29T00:00:00.000Z'); @@ -231,7 +236,7 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort { async markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise { const current = this.items.get(input.id); - if (current?.attemptGeneration === input.attemptGeneration) { + if (current?.attemptGeneration === input.attemptGeneration && current.status === 'claimed') { const next = { ...current, status: 'delivered' as const, @@ -254,7 +259,10 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort { async markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise { const current = this.items.get(input.id); - if (current?.attemptGeneration === input.attemptGeneration) { + if ( + current?.attemptGeneration === input.attemptGeneration && + !isTerminalOutboxStatus(current.status) + ) { this.items.set(input.id, { ...current, status: input.retryable ? 'failed_retryable' : 'failed_terminal', @@ -324,10 +332,12 @@ function createDeps(options?: { activeMemberNames?: string[]; inactive?: boolean; teamActive?: boolean; + memberActive?: boolean; providerId?: 'opencode' | 'codex'; outboxStore?: MemberWorkSyncOutboxStorePort; inboxNudge?: MemberWorkSyncInboxNudgePort; busySignal?: MemberWorkSyncUseCaseDeps['busySignal']; + watchdogCooldown?: MemberWorkSyncUseCaseDeps['watchdogCooldown']; reviewPickupDelivery?: MemberWorkSyncReviewPickupDeliveryPort; reviewPickupEscalation?: MemberWorkSyncReviewPickupEscalationPort; }) { @@ -361,6 +371,7 @@ function createDeps(options?: { ...(options?.outboxStore ? { outboxStore: options.outboxStore } : {}), ...(options?.inboxNudge ? { inboxNudge: options.inboxNudge } : {}), ...(options?.busySignal ? { busySignal: options.busySignal } : {}), + ...(options?.watchdogCooldown ? { watchdogCooldown: options.watchdogCooldown } : {}), ...(options?.reviewPickupDelivery ? { reviewPickupDelivery: options.reviewPickupDelivery } : {}), @@ -379,6 +390,7 @@ function createDeps(options?: { }, lifecycle: { isTeamActive: () => options?.teamActive ?? true, + isMemberActive: () => options?.memberActive ?? true, }, auditJournal: { append: async (event) => { @@ -414,6 +426,71 @@ describe('MemberWorkSync use cases', () => { ]); }); + it('does not write status or plan nudges after a queued reconcile is cancelled', async () => { + const outbox = new InMemoryOutboxStore(); + const { auditEvents, deps, store } = createDeps({ outboxStore: outbox }); + + await expect( + new MemberWorkSyncReconciler(deps).execute( + { teamName: 'team-a', memberName: 'bob' }, + { + reconciledBy: 'queue', + triggerReasons: ['turn_settled'], + isCancelled: () => true, + } + ) + ).rejects.toBeInstanceOf(MemberWorkSyncReconcileCancelledError); + + expect(store.writes).toHaveLength(0); + expect(outbox.ensures).toHaveLength(0); + expect(auditEvents.map((event) => event.event)).toEqual(['reconcile_started']); + }); + + it('does not create a report token when a queued reconcile is cancelled after decision audit', async () => { + const outbox = new InMemoryOutboxStore(); + const { auditEvents, deps, store } = createDeps({ outboxStore: outbox }); + let cancelled = false; + let tokenCreates = 0; + deps.auditJournal = { + append: async (event) => { + auditEvents.push(event); + if (event.event === 'decision_made') { + cancelled = true; + } + }, + }; + deps.reportToken = { + create: async (input) => { + tokenCreates += 1; + return { + token: `token:${input.teamName}:${input.memberName}:${input.agendaFingerprint}`, + expiresAt: '2026-04-29T00:15:00.000Z', + }; + }, + verify: async () => ({ ok: false, reason: 'missing' }), + }; + + await expect( + new MemberWorkSyncReconciler(deps).execute( + { teamName: 'team-a', memberName: 'bob' }, + { + reconciledBy: 'queue', + triggerReasons: ['turn_settled'], + isCancelled: () => cancelled, + } + ) + ).rejects.toBeInstanceOf(MemberWorkSyncReconcileCancelledError); + + expect(tokenCreates).toBe(0); + expect(store.writes).toHaveLength(0); + expect(outbox.ensures).toHaveLength(0); + expect(auditEvents.map((event) => event.event)).toEqual([ + 'reconcile_started', + 'agenda_loaded', + 'decision_made', + ]); + }); + it('accepts still_working as a bounded lease for the current fingerprint', async () => { const { auditEvents, clock, deps } = createDeps(); const reader = new MemberWorkSyncReconciler(deps); @@ -447,6 +524,36 @@ describe('MemberWorkSync use cases', () => { expect(auditEvents.map((event) => event.event)).toContain('report_accepted'); }); + it('rejects reports when this member runtime is no longer active', async () => { + const { deps } = createDeps(); + const reader = new MemberWorkSyncReconciler(deps); + const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' }); + const reporter = new MemberWorkSyncReporter({ + ...deps, + lifecycle: { + isTeamActive: () => true, + isMemberActive: () => false, + }, + }); + + const result = await reporter.execute({ + teamName: 'team-a', + memberName: 'bob', + state: 'still_working', + agendaFingerprint: current.agenda.fingerprint, + reportToken: current.reportToken, + source: 'test', + }); + + expect(result.accepted).toBe(false); + expect(result.code).toBe('member_runtime_inactive'); + expect(result.status.state).toBe('inactive'); + expect(result.status.report).toMatchObject({ + accepted: false, + rejectionCode: 'member_runtime_inactive', + }); + }); + it('uses app clock instead of model supplied reportedAt for lease timing', async () => { const { deps } = createDeps(); const reader = new MemberWorkSyncReconciler(deps); @@ -577,6 +684,18 @@ describe('MemberWorkSync use cases', () => { expect(status.shadow?.wouldNudge).toBe(false); }); + it('marks status inactive when this member runtime is not active', async () => { + const { deps } = createDeps({ memberActive: false }); + const status = await new MemberWorkSyncReconciler(deps).execute({ + teamName: 'team-a', + memberName: 'bob', + }); + + expect(status.state).toBe('inactive'); + expect(status.diagnostics).toContain('member_runtime_inactive'); + expect(status.shadow?.wouldNudge).toBe(false); + }); + it('records fingerprint transitions without treating them as progress proof', async () => { const { deps, source } = createDeps(); const reader = new MemberWorkSyncReconciler(deps); @@ -892,6 +1011,379 @@ describe('MemberWorkSync use cases', () => { }); }); + it('supersedes due nudges for inactive member runtimes without inbox delivery', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const status = await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + const dispatcher = new MemberWorkSyncNudgeDispatcher({ + ...deps, + lifecycle: { + isTeamActive: () => true, + isMemberActive: () => false, + }, + }); + + const summary = await dispatcher.dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(summary).toMatchObject({ claimed: 1, delivered: 0, superseded: 1 }); + expect(inbox.inserted).toEqual([]); + expect( + outbox.items.get(`member-work-sync:team-a:bob:${status.agenda.fingerprint}`) + ).toMatchObject({ + status: 'superseded', + lastError: 'member_runtime_inactive', + }); + }); + + it('continues dispatching later claimed nudges when one item times out', async () => { + const outbox = new InMemoryOutboxStore(); + const { deps, store } = createDeps({ outboxStore: outbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const status = await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + const firstItem = [...outbox.items.values()][0]; + expect(firstItem).toBeDefined(); + await outbox.ensurePending({ + id: `${firstItem!.id}:second`, + teamName: firstItem!.teamName, + memberName: firstItem!.memberName, + agendaFingerprint: firstItem!.agendaFingerprint, + payloadHash: `${firstItem!.payloadHash}:second`, + payload: { + ...firstItem!.payload, + workSyncIntentKey: 'test-second', + }, + nowIso: status.evaluatedAt, + }); + + const inserted: Array[0]> = []; + const inbox: MemberWorkSyncInboxNudgePort = { + insertIfAbsent: async (input) => { + if (input.messageId === firstItem!.id) { + return new Promise(() => undefined); + } + inserted.push(input); + return { inserted: true, messageId: input.messageId }; + }, + }; + const dispatcher = new MemberWorkSyncNudgeDispatcher({ + ...deps, + inboxNudge: inbox, + }); + + await expect( + dispatcher.dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + itemTimeoutMs: 1, + }) + ).resolves.toMatchObject({ + claimed: 2, + delivered: 1, + retryable: 1, + }); + + expect(outbox.items.get(firstItem!.id)).toMatchObject({ + status: 'failed_retryable', + lastError: 'nudge dispatch item timed out after 1ms', + }); + expect(inserted).toHaveLength(1); + expect(inserted[0]?.messageId).toBe(`${firstItem!.id}:second`); + expect(outbox.items.get(`${firstItem!.id}:second`)).toMatchObject({ + status: 'delivered', + }); + }); + + it('does not late-deliver an item after item dispatch timeout resolves', async () => { + vi.useFakeTimers(); + try { + const outbox = new InMemoryOutboxStore(); + const { deps, store } = createDeps({ outboxStore: outbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const status = await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + const firstItem = [...outbox.items.values()][0]; + expect(firstItem).toBeDefined(); + + let resolveInsertStarted!: () => void; + const insertStarted = new Promise((resolve) => { + resolveInsertStarted = resolve; + }); + let resolveInsert!: (value: { inserted: boolean; messageId: string }) => void; + const insertResult = new Promise<{ inserted: boolean; messageId: string }>((resolve) => { + resolveInsert = resolve; + }); + const inbox: MemberWorkSyncInboxNudgePort = { + insertIfAbsent: async () => { + resolveInsertStarted(); + return insertResult; + }, + }; + + const dispatch = new MemberWorkSyncNudgeDispatcher({ + ...deps, + inboxNudge: inbox, + }).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + itemTimeoutMs: 5, + teamTimeoutMs: 100, + }); + await insertStarted; + await vi.advanceTimersByTimeAsync(5); + + await expect(dispatch).resolves.toMatchObject({ + claimed: 1, + delivered: 0, + retryable: 1, + }); + expect(outbox.items.get(firstItem!.id)).toMatchObject({ + status: 'failed_retryable', + lastError: 'nudge dispatch item timed out after 5ms', + }); + + resolveInsert({ inserted: true, messageId: firstItem!.id }); + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(100); + + expect(outbox.items.get(`member-work-sync:team-a:bob:${status.agenda.fingerprint}`)) + .toMatchObject({ + status: 'failed_retryable', + lastError: 'nudge dispatch item timed out after 5ms', + }); + } finally { + vi.useRealTimers(); + } + }); + + it('continues dispatching later claimed nudges when retry marking also hangs', async () => { + const outbox = new InMemoryOutboxStore(); + const { deps, store } = createDeps({ outboxStore: outbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const status = await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + const firstItem = [...outbox.items.values()][0]; + expect(firstItem).toBeDefined(); + await outbox.ensurePending({ + id: `${firstItem!.id}:second`, + teamName: firstItem!.teamName, + memberName: firstItem!.memberName, + agendaFingerprint: firstItem!.agendaFingerprint, + payloadHash: `${firstItem!.payloadHash}:second`, + payload: { + ...firstItem!.payload, + workSyncIntentKey: 'test-second', + }, + nowIso: status.evaluatedAt, + }); + + const originalMarkFailed = outbox.markFailed.bind(outbox); + outbox.markFailed = async (input) => { + if (input.id === firstItem!.id) { + return new Promise(() => undefined); + } + return originalMarkFailed(input); + }; + const inserted: Array[0]> = []; + const inbox: MemberWorkSyncInboxNudgePort = { + insertIfAbsent: async (input) => { + if (input.messageId === firstItem!.id) { + return new Promise(() => undefined); + } + inserted.push(input); + return { inserted: true, messageId: input.messageId }; + }, + }; + const dispatcher = new MemberWorkSyncNudgeDispatcher({ + ...deps, + inboxNudge: inbox, + }); + + await expect( + dispatcher.dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + itemTimeoutMs: 1, + }) + ).resolves.toMatchObject({ + claimed: 2, + delivered: 1, + retryable: 1, + }); + + expect(inserted).toHaveLength(1); + expect(inserted[0]?.messageId).toBe(`${firstItem!.id}:second`); + expect(outbox.items.get(`${firstItem!.id}:second`)).toMatchObject({ + status: 'delivered', + }); + }); + + it('continues checking other teams when one team outbox claim hangs', async () => { + vi.useFakeTimers(); + try { + const warn = vi.fn(); + const claimDue = vi.fn( + async (input: Parameters[0]) => { + if (input.teamName === 'stuck') { + await new Promise(() => undefined); + } + return []; + } + ); + const inbox = new InMemoryInboxNudge(); + const { deps } = createDeps({ + outboxStore: { claimDue } as unknown as MemberWorkSyncOutboxStorePort, + inboxNudge: inbox, + }); + deps.logger = { + debug: vi.fn(), + warn, + error: vi.fn(), + }; + + const dispatch = new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['stuck', 'healthy'], + claimedBy: 'test-dispatcher', + claimTimeoutMs: 10, + teamTimeoutMs: 50, + }); + await vi.advanceTimersByTimeAsync(10); + + await expect(dispatch).resolves.toEqual({ + claimed: 0, + delivered: 0, + superseded: 0, + retryable: 0, + terminal: 0, + }); + expect(claimDue).toHaveBeenCalledWith( + expect.objectContaining({ + teamName: 'healthy', + }) + ); + expect(warn).toHaveBeenCalledWith( + 'member work sync nudge claim timed out', + expect.objectContaining({ + teamName: 'stuck', + timeoutMs: 10, + }) + ); + } finally { + vi.useRealTimers(); + } + }); + + it('does not mutate timed-out team items after team dispatch returns', async () => { + vi.useFakeTimers(); + try { + const warn = vi.fn(); + const outbox = new InMemoryOutboxStore(); + const { deps, store } = createDeps({ outboxStore: outbox }); + store.phase2ReadinessState = 'shadow_ready'; + + const status = await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + const firstItem = [...outbox.items.values()][0]; + expect(firstItem).toBeDefined(); + + let resolveInsertStarted!: () => void; + const insertStarted = new Promise((resolve) => { + resolveInsertStarted = resolve; + }); + let resolveInsert!: (value: { inserted: boolean; messageId: string }) => void; + const insertResult = new Promise<{ inserted: boolean; messageId: string }>((resolve) => { + resolveInsert = resolve; + }); + const inbox: MemberWorkSyncInboxNudgePort = { + insertIfAbsent: async () => { + resolveInsertStarted(); + return insertResult; + }, + }; + deps.logger = { + debug: vi.fn(), + warn, + error: vi.fn(), + }; + + const dispatch = new MemberWorkSyncNudgeDispatcher({ + ...deps, + inboxNudge: inbox, + }).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + itemTimeoutMs: 100, + teamTimeoutMs: 5, + }); + await insertStarted; + await vi.advanceTimersByTimeAsync(5); + + await expect(dispatch).resolves.toEqual({ + claimed: 0, + delivered: 0, + superseded: 0, + retryable: 0, + terminal: 0, + }); + expect(outbox.items.get(firstItem!.id)).toMatchObject({ + status: 'claimed', + }); + expect(warn).toHaveBeenCalledWith( + 'member work sync team nudge dispatch timed out', + expect.objectContaining({ + teamName: 'team-a', + timeoutMs: 5, + }) + ); + + resolveInsert({ inserted: true, messageId: firstItem!.id }); + await Promise.resolve(); + await vi.advanceTimersByTimeAsync(100); + + expect(outbox.items.get(`member-work-sync:team-a:bob:${status.agenda.fingerprint}`)) + .toMatchObject({ + status: 'claimed', + }); + } finally { + vi.useRealTimers(); + } + }); + it('creates a status-only recovery nudge after a delivered nudge turn settles without a report', async () => { const outbox = new InMemoryOutboxStore(); const inbox = new InMemoryInboxNudge(); @@ -1712,6 +2204,33 @@ describe('MemberWorkSync use cases', () => { expect(recoverySummary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 }); expect(inbox.inserted).toHaveLength(2); expect(inbox.inserted[1]?.messageId).toContain('agenda-sync-still-stuck'); + + clock.set('2026-04-29T01:02:00.000Z'); + store.phase2ReadinessState = 'shadow_ready'; + store.phase2ReadinessReasons = []; + store.metricsGeneratedAt = '2026-04-29T01:02:00.000Z'; + await reconciler.execute( + { + teamName: 'team-a', + memberName: 'team-lead', + }, + { reconciledBy: 'queue', triggerReasons: ['manual_refresh'] } + ); + + const recoveryItems = [...outbox.items.values()].filter((item) => + item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:') + ); + expect(recoveryItems).toHaveLength(2); + expect(new Set(recoveryItems.map((item) => item.id)).size).toBe(2); + + const secondRecoverySummary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(secondRecoverySummary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 }); + expect(inbox.inserted).toHaveLength(3); + expect(inbox.inserted[2]?.messageId).toContain('agenda-sync-still-stuck'); }); it('creates a still-stuck recovery when a terminal inbox conflict blocks an agenda nudge', async () => { @@ -2297,6 +2816,45 @@ describe('MemberWorkSync use cases', () => { ); }); + it('uses the watchdog cooldown retry deadline instead of exponential retry backoff', async () => { + const outbox = new InMemoryOutboxStore(); + const inbox = new InMemoryInboxNudge(); + const { deps, store } = createDeps({ + outboxStore: outbox, + inboxNudge: inbox, + watchdogCooldown: { + hasRecentNudge: async () => true, + getRecentNudgeCooldown: async () => ({ + active: true, + retryAfterIso: '2026-04-29T00:10:00.000Z', + }), + }, + }); + store.phase2ReadinessState = 'shadow_ready'; + + const current = await new MemberWorkSyncReconciler(deps).execute( + { + teamName: 'team-a', + memberName: 'bob', + }, + { reconciledBy: 'queue', triggerReasons: ['task_changed'] } + ); + const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({ + teamNames: ['team-a'], + claimedBy: 'test-dispatcher', + }); + + expect(summary).toMatchObject({ claimed: 1, delivered: 0, retryable: 1 }); + expect(inbox.inserted).toEqual([]); + expect( + outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`) + ).toMatchObject({ + status: 'failed_retryable', + lastError: 'watchdog_cooldown_active', + nextAttemptAt: '2026-04-29T00:10:00.000Z', + }); + }); + it('uses bounded retry backoff when inbox delivery fails', async () => { const outbox = new InMemoryOutboxStore(); const inbox = new InMemoryInboxNudge(); @@ -2386,4 +2944,41 @@ describe('MemberWorkSync use cases', () => { }); expect(store.writes.at(-1)?.state).toBe('still_working'); }); + + it('supersedes pending controller intents when the member runtime is inactive', async () => { + const { deps, store } = createDeps(); + const reader = new MemberWorkSyncReconciler(deps); + const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' }); + store.pendingIntents.set('intent-1', { + id: 'intent-1', + teamName: 'team-a', + memberName: 'bob', + status: 'pending', + reason: 'control_api_unavailable', + recordedAt: '2026-04-29T00:00:01.000Z', + request: { + teamName: 'team-a', + memberName: 'bob', + state: 'still_working', + agendaFingerprint: current.agenda.fingerprint, + reportToken: current.reportToken, + leaseTtlMs: 120_000, + source: 'mcp', + }, + }); + + const summary = await new MemberWorkSyncPendingReportIntentReplayer({ + ...deps, + lifecycle: { + isTeamActive: () => true, + isMemberActive: () => false, + }, + }).replayTeam('team-a'); + + expect(summary).toEqual({ processed: 1, accepted: 0, rejected: 0, superseded: 1 }); + expect(store.pendingIntents.get('intent-1')).toMatchObject({ + status: 'superseded', + resultCode: 'member_runtime_inactive', + }); + }); }); diff --git a/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts b/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts index 74dc094b..b3ab6f7b 100644 --- a/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts +++ b/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts @@ -712,6 +712,60 @@ describe('JsonMemberWorkSyncStore', () => { ).resolves.toEqual([]); }); + it('treats invalid retry delay timestamps as due so retryable items cannot sleep forever', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:abc', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:abc', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + + await store.ensurePending(input); + const [claimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + await store.markFailed({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimed.attemptGeneration, + retryable: true, + error: 'member_busy:active_tool_activity', + nextAttemptAt: '2026-04-29T00:30:00.000Z', + nowIso: '2026-04-29T00:02:00.000Z', + }); + + const memberOutboxPath = join(memberWorkSyncDir(root, 'team-a', 'bob'), 'outbox.json'); + const memberOutbox = JSON.parse(await readFile(memberOutboxPath, 'utf8')); + memberOutbox.items[input.id].nextAttemptAt = 'not-a-date'; + await writeFile(memberOutboxPath, JSON.stringify(memberOutbox), 'utf8'); + + const indexPath = join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json'); + const index = JSON.parse(await readFile(indexPath, 'utf8')); + index.items[input.id].nextAttemptAt = 'not-a-date'; + await writeFile(indexPath, JSON.stringify(index), 'utf8'); + + await expect( + store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-b', + nowIso: '2026-04-29T00:04:00.000Z', + limit: 1, + }) + ).resolves.toEqual([ + expect.objectContaining({ + id: input.id, + status: 'claimed', + attemptGeneration: claimed.attemptGeneration + 1, + }), + ]); + }); + it('clears retry delay when a retryable outbox item is delivered', async () => { const input = { id: 'member-work-sync:team-a:bob:agenda:v1:abc', @@ -773,6 +827,185 @@ describe('JsonMemberWorkSyncStore', () => { expect(index.items[input.id]).not.toHaveProperty('nextAttemptAt'); }); + it('keeps delivered outbox items delivered when a late retry mark races after delivery', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:abc', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:abc', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + + await store.ensurePending(input); + const [claimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + await store.markDelivered({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimed.attemptGeneration, + deliveredMessageId: 'message-1', + nowIso: '2026-04-29T00:01:30.000Z', + }); + await store.markFailed({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimed.attemptGeneration, + retryable: true, + error: 'nudge dispatch item timed out after 1ms', + nextAttemptAt: '2026-04-29T00:03:00.000Z', + nowIso: '2026-04-29T00:02:00.000Z', + }); + + const memberOutbox = JSON.parse( + await readFile( + join(root, 'team-a', 'members', 'bob', '.member-work-sync', 'outbox.json'), + 'utf8' + ) + ); + expect(memberOutbox.items[input.id]).toMatchObject({ + status: 'delivered', + deliveredMessageId: 'message-1', + }); + expect(memberOutbox.items[input.id]).not.toHaveProperty('lastError'); + expect(memberOutbox.items[input.id]).not.toHaveProperty('nextAttemptAt'); + + const index = JSON.parse( + await readFile( + join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json'), + 'utf8' + ) + ); + expect(index.items[input.id]).toMatchObject({ + status: 'delivered', + }); + expect(index.items[input.id]).not.toHaveProperty('nextAttemptAt'); + }); + + it('keeps retryable outbox items retryable when a late delivery races after timeout', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:retry-race', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:retry-race', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + + await store.ensurePending(input); + const [claimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + await store.markFailed({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimed.attemptGeneration, + retryable: true, + error: 'nudge dispatch item timed out after 1ms', + nextAttemptAt: '2026-04-29T00:03:00.000Z', + nowIso: '2026-04-29T00:02:00.000Z', + }); + await store.markDelivered({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimed.attemptGeneration, + deliveredMessageId: 'late-message', + nowIso: '2026-04-29T00:02:30.000Z', + }); + + const memberOutbox = JSON.parse( + await readFile( + join(root, 'team-a', 'members', 'bob', '.member-work-sync', 'outbox.json'), + 'utf8' + ) + ); + expect(memberOutbox.items[input.id]).toMatchObject({ + status: 'failed_retryable', + lastError: 'nudge dispatch item timed out after 1ms', + nextAttemptAt: '2026-04-29T00:03:00.000Z', + }); + expect(memberOutbox.items[input.id]).not.toHaveProperty('deliveredMessageId'); + + const index = JSON.parse( + await readFile( + join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json'), + 'utf8' + ) + ); + expect(index.items[input.id]).toMatchObject({ + status: 'failed_retryable', + nextAttemptAt: '2026-04-29T00:03:00.000Z', + }); + expect(index.items[input.id]).not.toHaveProperty('deliveredMessageId'); + }); + + it('keeps terminal outbox items terminal when a late delivery races after failure', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:terminal-race', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:terminal-race', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + + await store.ensurePending(input); + const [claimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + await store.markFailed({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimed.attemptGeneration, + retryable: false, + error: 'inbox_payload_conflict', + nowIso: '2026-04-29T00:01:30.000Z', + }); + await store.markDelivered({ + teamName: 'team-a', + id: input.id, + attemptGeneration: claimed.attemptGeneration, + deliveredMessageId: 'late-message', + nowIso: '2026-04-29T00:02:00.000Z', + }); + + const memberOutbox = JSON.parse( + await readFile( + join(root, 'team-a', 'members', 'bob', '.member-work-sync', 'outbox.json'), + 'utf8' + ) + ); + expect(memberOutbox.items[input.id]).toMatchObject({ + status: 'failed_terminal', + lastError: 'inbox_payload_conflict', + }); + expect(memberOutbox.items[input.id]).not.toHaveProperty('deliveredMessageId'); + + const index = JSON.parse( + await readFile( + join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json'), + 'utf8' + ) + ); + expect(index.items[input.id]).toMatchObject({ + status: 'failed_terminal', + }); + expect(index.items[input.id]).not.toHaveProperty('deliveredMessageId'); + }); + it('finds recent recovery outbox rows by logical intent key', async () => { const olderInput = { id: 'member-work-sync:team-a:bob:agenda:v1:older', @@ -1017,6 +1250,48 @@ describe('JsonMemberWorkSyncStore', () => { }); }); + it('treats future claimedAt outbox items as stale', async () => { + const input = { + id: 'member-work-sync:team-a:bob:agenda:v1:future-claim', + teamName: 'team-a', + memberName: 'bob', + agendaFingerprint: 'agenda:v1:future-claim', + payloadHash: 'hash-a', + payload: makeNudgePayload(), + nowIso: '2026-04-29T00:00:00.000Z', + }; + await store.ensurePending(input); + + const [claimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-a', + nowIso: '2026-04-29T00:10:00.000Z', + limit: 1, + }); + expect(claimed).toMatchObject({ + id: input.id, + status: 'claimed', + attemptGeneration: 1, + claimedBy: 'dispatcher-a', + claimedAt: '2026-04-29T00:10:00.000Z', + }); + + const [reclaimed] = await store.claimDue({ + teamName: 'team-a', + claimedBy: 'dispatcher-b', + nowIso: '2026-04-29T00:01:00.000Z', + limit: 1, + }); + + expect(reclaimed).toMatchObject({ + id: input.id, + status: 'claimed', + attemptGeneration: 2, + claimedBy: 'dispatcher-b', + claimedAt: '2026-04-29T00:01:00.000Z', + }); + }); + it('claims due outbox items from the index without scanning unrelated member outboxes', async () => { const bobInput = { id: 'member-work-sync:team-a:bob:agenda:v1:abc', diff --git a/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts b/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts index 99b56878..50407e41 100644 --- a/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts +++ b/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts @@ -229,6 +229,29 @@ describe('MemberWorkSyncEventQueue', () => { await queue.stop(); }); + it('can reconcile inactive teams when the caller needs inactive statuses refreshed', async () => { + const reconcile = vi.fn(async () => undefined); + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 1, + reconcile, + isTeamActive: () => false, + reconcileInactiveTeams: true, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'manual_refresh' }); + await vi.advanceTimersByTimeAsync(1); + + expect(reconcile).toHaveBeenCalledWith( + { teamName: 'team-a', memberName: 'bob' }, + expect.objectContaining({ + reconciledBy: 'queue', + triggerReasons: ['manual_refresh'], + }) + ); + await queue.stop(); + expect(queue.getDiagnostics()).toMatchObject({ dropped: 0, reconciled: 1 }); + }); + it('runs one follow-up pass when events arrive during an active reconcile', async () => { let release: () => void = () => { throw new Error('reconcile did not start'); @@ -408,6 +431,116 @@ describe('MemberWorkSyncEventQueue', () => { await queue.stop(); }); + it('times out a hung reconcile and retries so the member cannot stay running forever', async () => { + let reconcileCalls = 0; + const auditEvents: string[] = []; + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 1, + retryDelayMs: 10, + reconcileTimeoutMs: 20, + maxRetryAttempts: 1, + reconcile: async () => { + reconcileCalls += 1; + if (reconcileCalls === 1) { + await new Promise(() => undefined); + } + }, + isTeamActive: () => true, + auditJournal: { + append: async (event) => { + auditEvents.push(event.event); + }, + }, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'turn_settled' }); + await vi.advanceTimersByTimeAsync(1); + + expect(reconcileCalls).toBe(1); + expect(queue.getDiagnostics()).toMatchObject({ running: 1, queued: 0, failed: 0 }); + + await vi.advanceTimersByTimeAsync(20); + + expect(queue.getDiagnostics()).toMatchObject({ + running: 0, + queued: 1, + failed: 1, + reconciled: 0, + }); + expect(auditEvents).toEqual(['queue_enqueued', 'queue_retry_scheduled']); + + await vi.advanceTimersByTimeAsync(9); + expect(reconcileCalls).toBe(1); + + await vi.advanceTimersByTimeAsync(1); + + expect(reconcileCalls).toBe(2); + expect(queue.getDiagnostics()).toMatchObject({ + running: 0, + queued: 0, + failed: 1, + reconciled: 1, + }); + expect(auditEvents).toEqual([ + 'queue_enqueued', + 'queue_retry_scheduled', + 'queue_reconciled', + ]); + + await queue.stop(); + }); + + it('marks a timed-out reconcile context as cancelled for late continuations', async () => { + let releaseFirst: () => void = () => { + throw new Error('first reconcile did not start'); + }; + let reconcileCalls = 0; + const lateSideEffects: string[] = []; + const cancellationChecks: boolean[] = []; + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 1, + retryDelayMs: 10, + reconcileTimeoutMs: 20, + maxRetryAttempts: 1, + reconcile: async (_request, context) => { + reconcileCalls += 1; + if (reconcileCalls === 1) { + await new Promise((resolve) => { + releaseFirst = resolve; + }); + const cancelled = context.isCancelled?.() === true; + cancellationChecks.push(cancelled); + if (!cancelled) { + lateSideEffects.push('first'); + } + return; + } + + const cancelled = context.isCancelled?.() === true; + cancellationChecks.push(cancelled); + if (!cancelled) { + lateSideEffects.push('retry'); + } + }, + isTeamActive: () => true, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'turn_settled' }); + await vi.advanceTimersByTimeAsync(1); + await vi.advanceTimersByTimeAsync(20); + await vi.advanceTimersByTimeAsync(10); + + expect(lateSideEffects).toEqual(['retry']); + + releaseFirst(); + await vi.advanceTimersByTimeAsync(1); + + expect(cancellationChecks).toEqual([false, true]); + expect(lateSideEffects).toEqual(['retry']); + + await queue.stop(); + }); + it('drops a failed reconcile after the retry budget is exhausted', async () => { const reconcile = vi.fn(async () => { throw new Error('still failing'); diff --git a/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts b/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts index 729ece23..692dcb7b 100644 --- a/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts +++ b/test/features/member-work-sync/main/MemberWorkSyncNudgeDispatchScheduler.test.ts @@ -18,7 +18,9 @@ describe('MemberWorkSyncNudgeDispatchScheduler', () => { const first = scheduler.runOnce(); const second = scheduler.runOnce(); - await Promise.resolve(); + await new Promise((resolve) => { + setTimeout(resolve, 0); + }); expect(dispatchDue).toHaveBeenCalledTimes(1); release(); @@ -60,4 +62,99 @@ describe('MemberWorkSyncNudgeDispatchScheduler', () => { expect.objectContaining({ error: 'Error: list failed' }) ); }); + + it('times out a hung dispatch so later scheduled runs can continue', async () => { + vi.useFakeTimers(); + try { + let dispatchCalls = 0; + const warn = vi.fn(); + const dispatchDue = vi.fn(async () => { + dispatchCalls += 1; + if (dispatchCalls === 1) { + await new Promise(() => undefined); + } + return { claimed: 0, delivered: 0, superseded: 0, retryable: 0, terminal: 0 }; + }); + const scheduler = new MemberWorkSyncNudgeDispatchScheduler({ + listLifecycleActiveTeamNames: async () => ['team-a'], + dispatchDue, + dispatchTimeoutMs: 20, + logger: { + debug: vi.fn(), + warn, + error: vi.fn(), + }, + }); + + const first = scheduler.runOnce(); + await vi.advanceTimersByTimeAsync(0); + + expect(dispatchDue).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(20); + await first; + + expect(warn).toHaveBeenCalledWith( + 'member work sync scheduled nudge dispatch failed', + expect.objectContaining({ + error: 'Error: member work sync scheduled nudge dispatch timed out after 20ms', + }) + ); + + await scheduler.runOnce(); + + expect(dispatchDue).toHaveBeenCalledTimes(2); + } finally { + vi.useRealTimers(); + } + }); + + it('times out hung active team listing so later scheduled runs can continue', async () => { + vi.useFakeTimers(); + try { + let listCalls = 0; + const warn = vi.fn(); + const dispatchDue = vi.fn(async () => ({ + claimed: 0, + delivered: 0, + superseded: 0, + retryable: 0, + terminal: 0, + })); + const scheduler = new MemberWorkSyncNudgeDispatchScheduler({ + listLifecycleActiveTeamNames: async () => { + listCalls += 1; + if (listCalls === 1) { + await new Promise(() => undefined); + } + return ['team-a']; + }, + dispatchDue, + dispatchTimeoutMs: 20, + logger: { + debug: vi.fn(), + warn, + error: vi.fn(), + }, + }); + + const first = scheduler.runOnce(); + await vi.advanceTimersByTimeAsync(20); + await first; + + expect(warn).toHaveBeenCalledWith( + 'member work sync scheduled nudge dispatch failed', + expect.objectContaining({ + error: 'Error: member work sync scheduled nudge team listing timed out after 20ms', + }) + ); + expect(dispatchDue).not.toHaveBeenCalled(); + + await scheduler.runOnce(); + + expect(dispatchDue).toHaveBeenCalledWith(['team-a']); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/test/features/member-work-sync/main/MemberWorkSyncTeamChangeRouter.test.ts b/test/features/member-work-sync/main/MemberWorkSyncTeamChangeRouter.test.ts index 59f0038d..d4625eec 100644 --- a/test/features/member-work-sync/main/MemberWorkSyncTeamChangeRouter.test.ts +++ b/test/features/member-work-sync/main/MemberWorkSyncTeamChangeRouter.test.ts @@ -1,6 +1,5 @@ -import { describe, expect, it, vi } from 'vitest'; - import { MemberWorkSyncTeamChangeRouter } from '@features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter'; +import { describe, expect, it, vi } from 'vitest'; function createRouter(activeMembers: string[] = ['alice', 'bob']) { const queue = { @@ -96,13 +95,25 @@ describe('MemberWorkSyncTeamChangeRouter', () => { }); }); - it('drops queued work when the team goes offline', () => { + it('refreshes member runtime state when the team goes offline', async () => { const { queue, router } = createRouter(); router.noteTeamChange({ type: 'lead-activity', teamName: 'team-a', detail: 'offline' }); + await Promise.resolve(); expect(queue.dropTeam).toHaveBeenCalledWith('team-a'); - expect(queue.enqueue).not.toHaveBeenCalled(); + expect(queue.enqueue).toHaveBeenCalledWith({ + teamName: 'team-a', + memberName: 'alice', + triggerReason: 'runtime_activity', + runAfterMs: 0, + }); + expect(queue.enqueue).toHaveBeenCalledWith({ + teamName: 'team-a', + memberName: 'bob', + triggerReason: 'runtime_activity', + runAfterMs: 0, + }); }); it('routes member-turn-settled events to one member reconcile', () => { diff --git a/test/features/member-work-sync/main/MemberWorkSyncToolActivityBusySignal.test.ts b/test/features/member-work-sync/main/MemberWorkSyncToolActivityBusySignal.test.ts index 9c736020..8be733e9 100644 --- a/test/features/member-work-sync/main/MemberWorkSyncToolActivityBusySignal.test.ts +++ b/test/features/member-work-sync/main/MemberWorkSyncToolActivityBusySignal.test.ts @@ -1,5 +1,5 @@ import { MemberWorkSyncToolActivityBusySignal } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal'; -import { describe, expect, it } from 'vitest'; +import { describe, expect, it, vi } from 'vitest'; import type { TeamChangeEvent, ToolActivityEventPayload } from '@shared/types'; @@ -180,4 +180,80 @@ describe('MemberWorkSyncToolActivityBusySignal', () => { }) ).resolves.toEqual({ busy: false }); }); + + it('bounds future tool timestamps so busy state cannot sleep nudges for too long', async () => { + vi.useFakeTimers(); + try { + vi.setSystemTime(new Date('2026-04-29T00:00:00.000Z')); + + const activeSignal = new MemberWorkSyncToolActivityBusySignal({ + busyGraceMs: 90_000, + activeToolStaleMs: 10 * 60_000, + }); + + activeSignal.noteTeamChange( + toolEvent('team-a', { + action: 'start', + activity: { + memberName: 'bob', + toolUseId: 'tool-1', + toolName: 'bash', + startedAt: '2026-04-29T01:00:00.000Z', + source: 'runtime', + }, + }) + ); + + await expect( + activeSignal.isBusy({ + teamName: 'team-a', + memberName: 'bob', + nowIso: '2026-04-29T00:09:59.000Z', + }) + ).resolves.toMatchObject({ + busy: true, + reason: 'active_tool_activity', + }); + + await expect( + activeSignal.isBusy({ + teamName: 'team-a', + memberName: 'bob', + nowIso: '2026-04-29T00:10:00.000Z', + }) + ).resolves.toEqual({ busy: false }); + + const finishSignal = new MemberWorkSyncToolActivityBusySignal({ busyGraceMs: 90_000 }); + finishSignal.noteTeamChange( + toolEvent('team-a', { + action: 'finish', + memberName: 'bob', + toolUseId: 'tool-2', + finishedAt: '2026-04-29T01:00:00.000Z', + }) + ); + + await expect( + finishSignal.isBusy({ + teamName: 'team-a', + memberName: 'bob', + nowIso: '2026-04-29T00:01:29.000Z', + }) + ).resolves.toMatchObject({ + busy: true, + reason: 'recent_tool_activity', + retryAfterIso: '2026-04-29T00:01:30.000Z', + }); + + await expect( + finishSignal.isBusy({ + teamName: 'team-a', + memberName: 'bob', + nowIso: '2026-04-29T00:01:30.000Z', + }) + ).resolves.toEqual({ busy: false }); + } finally { + vi.useRealTimers(); + } + }); }); diff --git a/test/features/member-work-sync/main/RuntimeTurnSettledDrainScheduler.test.ts b/test/features/member-work-sync/main/RuntimeTurnSettledDrainScheduler.test.ts new file mode 100644 index 00000000..9c0fcc96 --- /dev/null +++ b/test/features/member-work-sync/main/RuntimeTurnSettledDrainScheduler.test.ts @@ -0,0 +1,75 @@ +import { RuntimeTurnSettledDrainScheduler } from '@features/member-work-sync/main/infrastructure/RuntimeTurnSettledDrainScheduler'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +describe('RuntimeTurnSettledDrainScheduler', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('does not overlap active drains', async () => { + let release!: () => void; + const firstDrain = new Promise((resolve) => { + release = resolve; + }); + const drain = vi.fn(async () => { + await firstDrain; + return { claimed: 1, enqueued: 1, unresolved: 0, ignored: 0, invalid: 0, failed: 0 }; + }); + const scheduler = new RuntimeTurnSettledDrainScheduler({ drain }); + + const first = scheduler.drainNow(); + await vi.advanceTimersByTimeAsync(0); + + await expect(scheduler.drainNow()).resolves.toBeNull(); + expect(drain).toHaveBeenCalledTimes(1); + + release(); + await first; + }); + + it('times out a hung drain so later turn-settled drains can continue', async () => { + let drainCalls = 0; + const warn = vi.fn(); + const drain = vi.fn(async () => { + drainCalls += 1; + if (drainCalls === 1) { + await new Promise(() => undefined); + } + return { claimed: 0, enqueued: 0, unresolved: 0, ignored: 0, invalid: 0, failed: 0 }; + }); + const scheduler = new RuntimeTurnSettledDrainScheduler({ + drain, + drainTimeoutMs: 20, + logger: { + debug: vi.fn(), + warn, + error: vi.fn(), + }, + }); + + const first = scheduler.drainNow(); + await vi.advanceTimersByTimeAsync(0); + + expect(drain).toHaveBeenCalledTimes(1); + + await vi.advanceTimersByTimeAsync(20); + await expect(first).resolves.toBeNull(); + + expect(warn).toHaveBeenCalledWith( + 'runtime turn settled scheduled drain failed', + expect.objectContaining({ + error: 'Error: runtime turn settled drain timed out after 20ms', + }) + ); + + await expect(scheduler.drainNow()).resolves.toMatchObject({ + claimed: 0, + enqueued: 0, + }); + expect(drain).toHaveBeenCalledTimes(2); + }); +}); diff --git a/test/features/member-work-sync/main/TeamTaskStallJournalWorkSyncCooldown.test.ts b/test/features/member-work-sync/main/TeamTaskStallJournalWorkSyncCooldown.test.ts index 6dba26db..1e264f30 100644 --- a/test/features/member-work-sync/main/TeamTaskStallJournalWorkSyncCooldown.test.ts +++ b/test/features/member-work-sync/main/TeamTaskStallJournalWorkSyncCooldown.test.ts @@ -41,6 +41,71 @@ describe('TeamTaskStallJournalWorkSyncCooldown', () => { ).resolves.toBe(true); }); + it('returns the exact retry deadline for an active watchdog cooldown', async () => { + await mkdir(join(root, 'team-a'), { recursive: true }); + await writeFile( + join(root, 'team-a', 'stall-monitor-journal.json'), + JSON.stringify([ + { + taskId: 'task-1', + memberName: 'bob', + state: 'alerted', + alertedAt: '2026-04-29T00:05:00.000Z', + }, + ]), + 'utf8' + ); + + const cooldown = new TeamTaskStallJournalWorkSyncCooldown(root, 10 * 60_000); + + await expect( + cooldown.getRecentNudgeCooldown({ + teamName: 'team-a', + memberName: 'bob', + taskIds: ['task-1'], + nowIso: '2026-04-29T00:10:00.000Z', + }) + ).resolves.toEqual({ + active: true, + retryAfterIso: '2026-04-29T00:15:00.000Z', + }); + }); + + it('does not suppress a reassigned task for a different member', async () => { + await mkdir(join(root, 'team-a'), { recursive: true }); + await writeFile( + join(root, 'team-a', 'stall-monitor-journal.json'), + JSON.stringify([ + { + taskId: 'task-1', + memberName: 'alice', + state: 'alerted', + alertedAt: '2026-04-29T00:05:00.000Z', + }, + ]), + 'utf8' + ); + + const cooldown = new TeamTaskStallJournalWorkSyncCooldown(root, 10 * 60_000); + + await expect( + cooldown.hasRecentNudge({ + teamName: 'team-a', + memberName: 'bob', + taskIds: ['task-1'], + nowIso: '2026-04-29T00:10:00.000Z', + }) + ).resolves.toBe(false); + await expect( + cooldown.hasRecentNudge({ + teamName: 'team-a', + memberName: 'alice', + taskIds: ['task-1'], + nowIso: '2026-04-29T00:10:00.000Z', + }) + ).resolves.toBe(true); + }); + it('ignores old watchdog alerts and missing journals', async () => { await mkdir(join(root, 'team-a'), { recursive: true }); await writeFile( @@ -75,6 +140,58 @@ describe('TeamTaskStallJournalWorkSyncCooldown', () => { ).resolves.toBe(false); }); + it('does not suppress exactly at the watchdog cooldown boundary', async () => { + await mkdir(join(root, 'team-a'), { recursive: true }); + await writeFile( + join(root, 'team-a', 'stall-monitor-journal.json'), + JSON.stringify([ + { + taskId: 'task-1', + state: 'alerted', + alertedAt: '2026-04-29T00:00:00.000Z', + }, + ]), + 'utf8' + ); + + const cooldown = new TeamTaskStallJournalWorkSyncCooldown(root, 10 * 60_000); + + await expect( + cooldown.getRecentNudgeCooldown({ + teamName: 'team-a', + memberName: 'bob', + taskIds: ['task-1'], + nowIso: '2026-04-29T00:10:00.000Z', + }) + ).resolves.toEqual({ active: false }); + }); + + it('ignores future watchdog alert timestamps', async () => { + await mkdir(join(root, 'team-a'), { recursive: true }); + await writeFile( + join(root, 'team-a', 'stall-monitor-journal.json'), + JSON.stringify([ + { + taskId: 'task-1', + state: 'alerted', + alertedAt: '2026-04-29T01:00:00.000Z', + }, + ]), + 'utf8' + ); + + const cooldown = new TeamTaskStallJournalWorkSyncCooldown(root, 10 * 60_000); + + await expect( + cooldown.hasRecentNudge({ + teamName: 'team-a', + memberName: 'bob', + taskIds: ['task-1'], + nowIso: '2026-04-29T00:10:00.000Z', + }) + ).resolves.toBe(false); + }); + it('fails open when the watchdog journal is invalid', async () => { await mkdir(join(root, 'team-a'), { recursive: true }); await writeFile(join(root, 'team-a', 'stall-monitor-journal.json'), '{bad json', 'utf8'); diff --git a/test/features/member-work-sync/main/createMemberWorkSyncFeature.test.ts b/test/features/member-work-sync/main/createMemberWorkSyncFeature.test.ts index 3f3b80aa..45a4d82c 100644 --- a/test/features/member-work-sync/main/createMemberWorkSyncFeature.test.ts +++ b/test/features/member-work-sync/main/createMemberWorkSyncFeature.test.ts @@ -651,6 +651,95 @@ describe('createMemberWorkSyncFeature composition', () => { } }); + it('dispatches existing due nudges before background stale refresh work', async () => { + const claudeRoot = makeTempRoot(); + setClaudeBasePathOverride(claudeRoot); + const teamsBasePath = getTeamsBasePath(); + const teamName = 'team-a'; + const memberName = 'bob'; + let postSeedGetConfigCalls = 0; + let refreshBlocked = false; + let releaseRefresh: () => void = () => undefined; + const refreshBlocker = new Promise((resolve) => { + releaseRefresh = resolve; + }); + const getConfig = vi.fn(async () => { + postSeedGetConfigCalls += 1; + if (postSeedGetConfigCalls === 2) { + refreshBlocked = true; + await refreshBlocker; + } + return { + name: teamName, + members: [{ name: memberName }], + }; + }); + const feature = createMemberWorkSyncFeature({ + teamsBasePath, + configReader: { + getConfig, + } as never, + taskReader: { + getTasks: vi.fn(async () => [ + { + id: 'task-1', + displayId: '11111111', + subject: 'Ship sync', + status: 'pending', + owner: memberName, + }, + ]), + } as never, + kanbanManager: { + getState: vi.fn(async () => ({ + teamName, + reviewers: [], + tasks: {}, + })), + } as never, + membersMetaStore: { + getMembers: vi.fn(async () => []), + } as never, + }); + let dispatchPromise: Promise | null = null; + + try { + await seedShadowReadyMetrics({ teamsBasePath, teamName, memberName }); + const status = await feature.refreshStatus({ teamName, memberName }); + const outboxInput = buildMemberWorkSyncOutboxEnsureInput({ + status, + hash: new NodeHashAdapter(), + nowIso: status.evaluatedAt, + }); + expect(outboxInput).not.toBeNull(); + const store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(teamsBasePath)); + await expect(store.ensurePending(outboxInput!)).resolves.toMatchObject({ + ok: true, + outcome: 'existing', + }); + + postSeedGetConfigCalls = 0; + dispatchPromise = feature.dispatchDueNudges([teamName]); + await waitForAssertion(() => { + expect(refreshBlocked).toBe(true); + }); + + await expect(readInboxMessages({ teamsBasePath, teamName, memberName })).resolves.toEqual( + expect.arrayContaining([expect.objectContaining({ messageId: outboxInput!.id })]) + ); + + releaseRefresh(); + await expect(dispatchPromise).resolves.toMatchObject({ + claimed: 1, + delivered: 1, + }); + } finally { + releaseRefresh(); + await dispatchPromise?.catch(() => undefined); + await feature.dispose(); + } + }); + it('suppresses queued proof-missing recovery when the original delivery is no longer proof-missing', async () => { const claudeRoot = makeTempRoot(); setClaudeBasePathOverride(claudeRoot); @@ -1115,8 +1204,6 @@ describe('createMemberWorkSyncFeature composition', () => { ); await expect(feature.drainRuntimeTurnSettledEvents()).resolves.toMatchObject({ - claimed: 1, - enqueued: 1, invalid: 0, unresolved: 0, }); @@ -3891,6 +3978,73 @@ describe('createMemberWorkSyncFeature composition', () => { } }); + it('refreshes stale needs_sync into inactive after the whole team stops', async () => { + const claudeRoot = makeTempRoot(); + setClaudeBasePathOverride(claudeRoot); + const teamsBasePath = getTeamsBasePath(); + const teamName = 'team-stopped'; + const memberName = 'bob'; + let teamActive = true; + const feature = createMemberWorkSyncFeature({ + teamsBasePath, + configReader: { + getConfig: vi.fn(async () => ({ + name: teamName, + members: [{ name: memberName, providerId: 'codex' }], + })), + } as never, + taskReader: { + getTasks: vi.fn(async () => [ + { + id: 'task-1', + displayId: '11111111', + subject: 'Finish work after sleep', + status: 'pending', + owner: memberName, + }, + ]), + } as never, + kanbanManager: { + getState: vi.fn(async () => ({ + teamName, + reviewers: [], + tasks: {}, + })), + } as never, + membersMetaStore: { + getMembers: vi.fn(async () => []), + } as never, + isTeamActive: vi.fn(async () => teamActive), + canDispatchNudges: vi.fn(async () => teamActive), + }); + + try { + const current = await feature.refreshStatus({ teamName, memberName }); + expect(current.state).toBe('needs_sync'); + + const store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(teamsBasePath)); + await store.write({ + ...current, + evaluatedAt: new Date(Date.now() - 3 * 60_000).toISOString(), + }); + teamActive = false; + + await expect(feature.getStatus({ teamName, memberName })).resolves.toMatchObject({ + state: 'needs_sync', + diagnostics: expect.arrayContaining(['status_stale_refresh_enqueued']), + }); + await waitForQueueIdle(feature); + + await expect(store.read({ teamName, memberName })).resolves.toMatchObject({ + state: 'inactive', + diagnostics: expect.arrayContaining(['team_runtime_inactive']), + shadow: { reconciledBy: 'queue', triggerReasons: ['manual_refresh'] }, + }); + } finally { + await feature.dispose(); + } + }); + it('uses snapshot config reads for startup roster materialization', async () => { const getConfig = vi.fn(async () => ({ members: [] })); const getConfigSnapshot = vi.fn(async () => ({