fix(member-work-sync): recover stale work leases
This commit is contained in:
parent
5b1fccdbed
commit
dd47dfb8a2
19 changed files with 1126 additions and 87 deletions
|
|
@ -15,6 +15,7 @@ export type MemberWorkSyncNudgeActivationReason =
|
|||
| MemberWorkSyncTargetedRecoveryReason
|
||||
| 'review_pickup_required'
|
||||
| 'native_stale_in_progress'
|
||||
| 'native_stale_assigned_work'
|
||||
| 'status_not_nudgeable'
|
||||
| 'blocking_metrics'
|
||||
| 'phase2_not_ready';
|
||||
|
|
@ -77,6 +78,7 @@ function hasActiveAcceptedWorkLease(status: MemberWorkSyncStatus): boolean {
|
|||
function hasNoCurrentAcceptedWorkProof(status: MemberWorkSyncStatus): boolean {
|
||||
return (
|
||||
status.diagnostics.includes('no_current_report') ||
|
||||
status.diagnostics.includes('report_lease_missing') ||
|
||||
status.diagnostics.includes('report_lease_expired') ||
|
||||
status.diagnostics.includes('report_fingerprint_stale')
|
||||
);
|
||||
|
|
@ -147,10 +149,24 @@ function getCurrentFingerprintStableSinceMs(
|
|||
return currentNeedsSyncEventTimes.length > 0 ? Math.min(...currentNeedsSyncEventTimes) : null;
|
||||
}
|
||||
|
||||
function isNativeStaleInProgressCandidate(input: {
|
||||
function isNativeStaleWorkItem(status: MemberWorkSyncStatus['agenda']['items'][number]): boolean {
|
||||
return (
|
||||
status.kind === 'work' &&
|
||||
((status.reason === 'owned_in_progress_task' && status.evidence.status === 'in_progress') ||
|
||||
(status.reason === 'owned_pending_task' && status.evidence.status === 'pending'))
|
||||
);
|
||||
}
|
||||
|
||||
function isNativeStaleEligibleItem(
|
||||
status: MemberWorkSyncStatus['agenda']['items'][number]
|
||||
): boolean {
|
||||
return isNativeStaleWorkItem(status) || isStrictReviewPickupItem(status);
|
||||
}
|
||||
|
||||
function getNativeStaleWorkRecoveryReason(input: {
|
||||
status: MemberWorkSyncStatus;
|
||||
metrics: MemberWorkSyncTeamMetrics;
|
||||
}): boolean {
|
||||
}): 'native_stale_in_progress' | 'native_stale_assigned_work' | null {
|
||||
const { status, metrics } = input;
|
||||
if (
|
||||
status.state !== 'needs_sync' ||
|
||||
|
|
@ -159,27 +175,36 @@ function isNativeStaleInProgressCandidate(input: {
|
|||
!status.providerId ||
|
||||
!NATIVE_STALE_IN_PROGRESS_PROVIDERS.has(status.providerId) ||
|
||||
isLeadLikeMemberName(status.memberName) ||
|
||||
status.agenda.items.length !== 1 ||
|
||||
status.agenda.items.length === 0 ||
|
||||
hasActiveAcceptedWorkLease(status)
|
||||
) {
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
|
||||
const [item] = status.agenda.items;
|
||||
if (
|
||||
item.kind !== 'work' ||
|
||||
item.reason !== 'owned_in_progress_task' ||
|
||||
item.evidence.status !== 'in_progress'
|
||||
) {
|
||||
return false;
|
||||
if (!status.agenda.items.every(isNativeStaleEligibleItem)) {
|
||||
return null;
|
||||
}
|
||||
if (!status.agenda.items.some(isNativeStaleWorkItem)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const nowMs = parseTime(metrics.generatedAt) ?? parseTime(status.evaluatedAt);
|
||||
if (nowMs == null) {
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
const stableSinceMs = getCurrentFingerprintStableSinceMs(status, metrics, nowMs);
|
||||
return stableSinceMs != null && nowMs - stableSinceMs >= NATIVE_STALE_IN_PROGRESS_MIN_AGE_MS;
|
||||
if (stableSinceMs == null || nowMs - stableSinceMs < NATIVE_STALE_IN_PROGRESS_MIN_AGE_MS) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return status.agenda.items.every(
|
||||
(item) =>
|
||||
item.kind === 'work' &&
|
||||
item.reason === 'owned_in_progress_task' &&
|
||||
item.evidence.status === 'in_progress'
|
||||
)
|
||||
? 'native_stale_in_progress'
|
||||
: 'native_stale_assigned_work';
|
||||
}
|
||||
|
||||
function isReviewPickupRequiredCandidate(status: MemberWorkSyncStatus): boolean {
|
||||
|
|
@ -206,8 +231,9 @@ export function decideMemberWorkSyncNudgeActivation(input: {
|
|||
return { active: true, reason: 'review_pickup_required' };
|
||||
}
|
||||
|
||||
if (isNativeStaleInProgressCandidate(input)) {
|
||||
return { active: true, reason: 'native_stale_in_progress' };
|
||||
const nativeStaleWorkReason = getNativeStaleWorkRecoveryReason(input);
|
||||
if (nativeStaleWorkReason) {
|
||||
return { active: true, reason: nativeStaleWorkReason };
|
||||
}
|
||||
|
||||
const targetedRecovery = decideMemberWorkSyncTargetedRecovery(input.status);
|
||||
|
|
|
|||
|
|
@ -153,6 +153,14 @@ export function validateMemberWorkSyncReport(input: {
|
|||
};
|
||||
}
|
||||
|
||||
if (input.request.state === 'still_working' && input.agenda.items.length === 0) {
|
||||
return {
|
||||
ok: false,
|
||||
code: 'still_working_rejected_agenda_empty',
|
||||
message: 'Cannot report still_working when no actionable work remains.',
|
||||
};
|
||||
}
|
||||
|
||||
if (
|
||||
input.request.state === 'blocked' &&
|
||||
!agendaHasBlockedEvidence(input.agenda, input.request.taskIds)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,18 @@ export interface SyncDecision {
|
|||
diagnostics: string[];
|
||||
}
|
||||
|
||||
function getWorkLeaseDiagnostic(
|
||||
report: MemberWorkSyncReport,
|
||||
nowIso: string
|
||||
): 'report_lease_missing' | 'report_lease_expired' | null {
|
||||
const expiresAtMs = Date.parse(report.expiresAt ?? '');
|
||||
const nowMs = Date.parse(nowIso);
|
||||
if (!Number.isFinite(expiresAtMs) || !Number.isFinite(nowMs)) {
|
||||
return 'report_lease_missing';
|
||||
}
|
||||
return expiresAtMs <= nowMs ? 'report_lease_expired' : null;
|
||||
}
|
||||
|
||||
export function decideMemberWorkSyncStatus(input: {
|
||||
agenda: MemberWorkSyncAgenda;
|
||||
latestAcceptedReport?: MemberWorkSyncReport | null;
|
||||
|
|
@ -25,7 +37,8 @@ export function decideMemberWorkSyncStatus(input: {
|
|||
state: 'caught_up',
|
||||
diagnostics: ['agenda_empty'],
|
||||
acceptedReport:
|
||||
input.latestAcceptedReport?.agendaFingerprint === input.agenda.fingerprint
|
||||
input.latestAcceptedReport?.state === 'caught_up' &&
|
||||
input.latestAcceptedReport.agendaFingerprint === input.agenda.fingerprint
|
||||
? input.latestAcceptedReport
|
||||
: undefined,
|
||||
};
|
||||
|
|
@ -38,13 +51,18 @@ export function decideMemberWorkSyncStatus(input: {
|
|||
if (report.agendaFingerprint !== input.agenda.fingerprint) {
|
||||
return { state: 'needs_sync', diagnostics: ['report_fingerprint_stale'] };
|
||||
}
|
||||
if (report.expiresAt && Date.parse(report.expiresAt) <= Date.parse(input.nowIso)) {
|
||||
return { state: 'needs_sync', diagnostics: ['report_lease_expired'] };
|
||||
}
|
||||
if (report.state === 'still_working') {
|
||||
const leaseDiagnostic = getWorkLeaseDiagnostic(report, input.nowIso);
|
||||
if (leaseDiagnostic) {
|
||||
return { state: 'needs_sync', diagnostics: [leaseDiagnostic] };
|
||||
}
|
||||
return { state: 'still_working', acceptedReport: report, diagnostics: ['lease_still_working'] };
|
||||
}
|
||||
if (report.state === 'blocked') {
|
||||
const leaseDiagnostic = getWorkLeaseDiagnostic(report, input.nowIso);
|
||||
if (leaseDiagnostic) {
|
||||
return { state: 'needs_sync', diagnostics: [leaseDiagnostic] };
|
||||
}
|
||||
return { state: 'blocked', acceptedReport: report, diagnostics: ['lease_blocked'] };
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ export class TeamTaskStallJournalWorkSyncCooldown implements MemberWorkSyncWatch
|
|||
);
|
||||
const parsed = JSON.parse(raw) as unknown;
|
||||
if (!Array.isArray(parsed)) {
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
const now = parseTime(input.nowIso) ?? Date.now();
|
||||
return parsed.some((entry): boolean => {
|
||||
|
|
@ -54,8 +54,8 @@ export class TeamTaskStallJournalWorkSyncCooldown implements MemberWorkSyncWatch
|
|||
const alertedAt = parseTime(row.alertedAt);
|
||||
return alertedAt != null && now - alertedAt <= this.cooldownMs;
|
||||
});
|
||||
} catch (error) {
|
||||
return (error as NodeJS.ErrnoException).code !== 'ENOENT';
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,11 +63,44 @@ import type { TeamTaskReader } from '@main/services/team/TeamTaskReader';
|
|||
import type { TeamChangeEvent } from '@shared/types';
|
||||
|
||||
const STALE_STATUS_MAX_AGE_MS = 2 * 60_000;
|
||||
const CAUGHT_UP_STATUS_MAX_AGE_MS = 5 * 60_000;
|
||||
const PROOF_MISSING_RECOVERY_RECENT_WINDOW_MS = 10 * 60_000;
|
||||
|
||||
function isAcceptedWorkLeaseStatus(status: MemberWorkSyncStatus): boolean {
|
||||
return (
|
||||
status.report?.accepted === true &&
|
||||
(status.state === 'still_working' || status.state === 'blocked')
|
||||
);
|
||||
}
|
||||
|
||||
function getAcceptedWorkLeaseStaleness(
|
||||
status: MemberWorkSyncStatus,
|
||||
nowMs: number
|
||||
): 'missing' | 'expired' | null {
|
||||
if (!isAcceptedWorkLeaseStatus(status)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const reportExpiresAtMs = Date.parse(status.report?.expiresAt ?? '');
|
||||
if (!Number.isFinite(reportExpiresAtMs) || !Number.isFinite(nowMs)) {
|
||||
return 'missing';
|
||||
}
|
||||
return reportExpiresAtMs <= nowMs ? 'expired' : null;
|
||||
}
|
||||
|
||||
function isEmptyAgendaStaleState(status: MemberWorkSyncStatus): boolean {
|
||||
return (
|
||||
status.agenda.items.length === 0 &&
|
||||
(status.state === 'needs_sync' ||
|
||||
status.state === 'still_working' ||
|
||||
status.state === 'blocked' ||
|
||||
status.state === 'unknown')
|
||||
);
|
||||
}
|
||||
|
||||
function statusNeedsBackgroundRefresh(status: MemberWorkSyncStatus, nowMs: number): boolean {
|
||||
if (status.agenda.items.length === 0) {
|
||||
return false;
|
||||
if (isEmptyAgendaStaleState(status)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const evaluatedAtMs = Date.parse(status.evaluatedAt);
|
||||
|
|
@ -75,17 +108,19 @@ function statusNeedsBackgroundRefresh(status: MemberWorkSyncStatus, nowMs: numbe
|
|||
return true;
|
||||
}
|
||||
|
||||
if (status.state === 'caught_up' && nowMs - evaluatedAtMs > CAUGHT_UP_STATUS_MAX_AGE_MS) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (status.agenda.items.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (status.state === 'needs_sync' && nowMs - evaluatedAtMs > STALE_STATUS_MAX_AGE_MS) {
|
||||
return true;
|
||||
}
|
||||
|
||||
const reportExpiresAtMs = Date.parse(status.report?.expiresAt ?? '');
|
||||
return (
|
||||
status.report?.accepted === true &&
|
||||
Number.isFinite(reportExpiresAtMs) &&
|
||||
reportExpiresAtMs <= nowMs &&
|
||||
(status.state === 'still_working' || status.state === 'blocked')
|
||||
);
|
||||
return getAcceptedWorkLeaseStaleness(status, nowMs) !== null;
|
||||
}
|
||||
|
||||
function getStatusStalenessDiagnostics(status: MemberWorkSyncStatus, nowMs: number): string[] {
|
||||
|
|
@ -93,6 +128,10 @@ function getStatusStalenessDiagnostics(status: MemberWorkSyncStatus, nowMs: numb
|
|||
const evaluatedAtMs = Date.parse(status.evaluatedAt);
|
||||
if (!Number.isFinite(evaluatedAtMs)) {
|
||||
diagnostics.push('status_evaluated_at_invalid');
|
||||
} else if (isEmptyAgendaStaleState(status)) {
|
||||
diagnostics.push('empty_agenda_state_refresh_enqueued');
|
||||
} else if (status.state === 'caught_up' && nowMs - evaluatedAtMs > CAUGHT_UP_STATUS_MAX_AGE_MS) {
|
||||
diagnostics.push('caught_up_stale_refresh_enqueued');
|
||||
} else if (
|
||||
status.agenda.items.length > 0 &&
|
||||
['needs_sync', 'still_working', 'blocked'].includes(status.state) &&
|
||||
|
|
@ -101,13 +140,10 @@ function getStatusStalenessDiagnostics(status: MemberWorkSyncStatus, nowMs: numb
|
|||
diagnostics.push('status_stale_refresh_enqueued');
|
||||
}
|
||||
|
||||
const reportExpiresAtMs = Date.parse(status.report?.expiresAt ?? '');
|
||||
if (
|
||||
status.report?.accepted &&
|
||||
Number.isFinite(reportExpiresAtMs) &&
|
||||
reportExpiresAtMs <= nowMs &&
|
||||
(status.state === 'still_working' || status.state === 'blocked')
|
||||
) {
|
||||
const leaseStaleness = getAcceptedWorkLeaseStaleness(status, nowMs);
|
||||
if (leaseStaleness === 'missing') {
|
||||
diagnostics.push('accepted_report_lease_missing_refresh_enqueued');
|
||||
} else if (leaseStaleness === 'expired') {
|
||||
diagnostics.push('accepted_report_lease_expired_refresh_enqueued');
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,14 +22,6 @@ export class CompositeMemberWorkSyncBusySignal implements MemberWorkSyncBusySign
|
|||
memberName: input.memberName,
|
||||
error: String(error),
|
||||
});
|
||||
const nowMs = Date.parse(input.nowIso);
|
||||
return {
|
||||
busy: true,
|
||||
reason: 'busy_signal_error',
|
||||
retryAfterIso: new Date(
|
||||
(Number.isFinite(nowMs) ? nowMs : Date.now()) + 60_000
|
||||
).toISOString(),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -242,6 +242,17 @@ function canReviveOutboxItem(status: MemberWorkSyncOutboxItem['status']): boolea
|
|||
return status === 'superseded' || (!isOutboxTerminal(status) && status !== 'pending');
|
||||
}
|
||||
|
||||
function applyOptionalNextAttemptAt(
|
||||
item: MemberWorkSyncOutboxItem,
|
||||
nextAttemptAt: string | undefined
|
||||
): void {
|
||||
if (nextAttemptAt) {
|
||||
item.nextAttemptAt = nextAttemptAt;
|
||||
return;
|
||||
}
|
||||
delete item.nextAttemptAt;
|
||||
}
|
||||
|
||||
function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boolean {
|
||||
if (item.status !== 'pending' && item.status !== 'failed_retryable') {
|
||||
return false;
|
||||
|
|
@ -730,12 +741,7 @@ export class JsonMemberWorkSyncStore
|
|||
status: 'pending',
|
||||
updatedAt: input.nowIso,
|
||||
};
|
||||
const nextAttemptAt = input.nextAttemptAt ?? current.nextAttemptAt;
|
||||
if (nextAttemptAt) {
|
||||
next.nextAttemptAt = nextAttemptAt;
|
||||
} else {
|
||||
delete next.nextAttemptAt;
|
||||
}
|
||||
applyOptionalNextAttemptAt(next, input.nextAttemptAt);
|
||||
delete next.claimedBy;
|
||||
delete next.claimedAt;
|
||||
delete next.lastError;
|
||||
|
|
@ -761,12 +767,7 @@ export class JsonMemberWorkSyncStore
|
|||
status: 'pending',
|
||||
updatedAt: input.nowIso,
|
||||
};
|
||||
const nextAttemptAt = input.nextAttemptAt ?? current.nextAttemptAt;
|
||||
if (nextAttemptAt) {
|
||||
next.nextAttemptAt = nextAttemptAt;
|
||||
} else {
|
||||
delete next.nextAttemptAt;
|
||||
}
|
||||
applyOptionalNextAttemptAt(next, input.nextAttemptAt);
|
||||
delete next.claimedBy;
|
||||
delete next.claimedAt;
|
||||
delete next.lastError;
|
||||
|
|
|
|||
|
|
@ -2,9 +2,10 @@ import type { MemberWorkSyncBusySignalPort } from '../../core/application';
|
|||
import type { TeamChangeEvent, ToolActivityEventPayload } from '@shared/types';
|
||||
|
||||
const DEFAULT_TOOL_ACTIVITY_BUSY_GRACE_MS = 90_000;
|
||||
const DEFAULT_ACTIVE_TOOL_STALE_MS = 10 * 60_000;
|
||||
|
||||
interface MemberActivityState {
|
||||
activeToolIds: Set<string>;
|
||||
activeToolStartedAtByToolId: Map<string, string>;
|
||||
recentBusyUntilByToolId: Map<string, string>;
|
||||
}
|
||||
|
||||
|
|
@ -46,9 +47,14 @@ function maxIso(values: Iterable<string>): string | null {
|
|||
export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusySignalPort {
|
||||
private readonly activityByMember = new Map<string, MemberActivityState>();
|
||||
private readonly busyGraceMs: number;
|
||||
private readonly activeToolStaleMs: number;
|
||||
|
||||
constructor(options: { busyGraceMs?: number } = {}) {
|
||||
constructor(options: { busyGraceMs?: number; activeToolStaleMs?: number } = {}) {
|
||||
this.busyGraceMs = Math.max(0, options.busyGraceMs ?? DEFAULT_TOOL_ACTIVITY_BUSY_GRACE_MS);
|
||||
this.activeToolStaleMs = Math.max(
|
||||
this.busyGraceMs,
|
||||
options.activeToolStaleMs ?? DEFAULT_ACTIVE_TOOL_STALE_MS
|
||||
);
|
||||
}
|
||||
|
||||
noteTeamChange(event: TeamChangeEvent): void {
|
||||
|
|
@ -67,7 +73,12 @@ export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusyS
|
|||
}
|
||||
|
||||
if (payload.action === 'start' && payload.activity) {
|
||||
this.noteStart(event.teamName, payload.activity.memberName, payload.activity.toolUseId);
|
||||
this.noteStart(
|
||||
event.teamName,
|
||||
payload.activity.memberName,
|
||||
payload.activity.toolUseId,
|
||||
payload.activity.startedAt
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -94,7 +105,7 @@ export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusyS
|
|||
|
||||
this.pruneState(key, state, input.nowIso);
|
||||
|
||||
if (state.activeToolIds.size > 0) {
|
||||
if (state.activeToolStartedAtByToolId.size > 0) {
|
||||
return {
|
||||
busy: true,
|
||||
reason: 'active_tool_activity',
|
||||
|
|
@ -114,13 +125,19 @@ export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusyS
|
|||
return { busy: false };
|
||||
}
|
||||
|
||||
private noteStart(teamName: string, memberName: string, toolUseId: string): void {
|
||||
private noteStart(
|
||||
teamName: string,
|
||||
memberName: string,
|
||||
toolUseId: string,
|
||||
startedAt: string | undefined
|
||||
): void {
|
||||
const normalizedToolUseId = toolUseId.trim();
|
||||
if (!memberName.trim() || !normalizedToolUseId) {
|
||||
return;
|
||||
}
|
||||
const state = this.getOrCreateState(teamName, memberName);
|
||||
state.activeToolIds.add(normalizedToolUseId);
|
||||
const startedAtMs = parseIsoMs(startedAt, Date.now());
|
||||
state.activeToolStartedAtByToolId.set(normalizedToolUseId, new Date(startedAtMs).toISOString());
|
||||
state.recentBusyUntilByToolId.delete(normalizedToolUseId);
|
||||
}
|
||||
|
||||
|
|
@ -137,7 +154,7 @@ export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusyS
|
|||
const finishedAtMs = parseIsoMs(finishedAt, Date.now());
|
||||
const busyUntilIso = new Date(finishedAtMs + this.busyGraceMs).toISOString();
|
||||
const state = this.getOrCreateState(teamName, memberName);
|
||||
state.activeToolIds.delete(normalizedToolUseId);
|
||||
state.activeToolStartedAtByToolId.delete(normalizedToolUseId);
|
||||
state.recentBusyUntilByToolId.set(normalizedToolUseId, busyUntilIso);
|
||||
}
|
||||
|
||||
|
|
@ -163,10 +180,10 @@ export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusyS
|
|||
}
|
||||
|
||||
for (const toolUseId of normalizedToolUseIds) {
|
||||
state.activeToolIds.delete(toolUseId);
|
||||
state.activeToolStartedAtByToolId.delete(toolUseId);
|
||||
state.recentBusyUntilByToolId.delete(toolUseId);
|
||||
}
|
||||
if (state.activeToolIds.size === 0 && state.recentBusyUntilByToolId.size === 0) {
|
||||
if (state.activeToolStartedAtByToolId.size === 0 && state.recentBusyUntilByToolId.size === 0) {
|
||||
this.activityByMember.delete(key);
|
||||
}
|
||||
}
|
||||
|
|
@ -178,7 +195,7 @@ export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusyS
|
|||
return existing;
|
||||
}
|
||||
const created: MemberActivityState = {
|
||||
activeToolIds: new Set(),
|
||||
activeToolStartedAtByToolId: new Map(),
|
||||
recentBusyUntilByToolId: new Map(),
|
||||
};
|
||||
this.activityByMember.set(key, created);
|
||||
|
|
@ -187,12 +204,20 @@ export class MemberWorkSyncToolActivityBusySignal implements MemberWorkSyncBusyS
|
|||
|
||||
private pruneState(key: string, state: MemberActivityState, nowIso: string): void {
|
||||
const nowMs = Date.parse(nowIso);
|
||||
if (Number.isFinite(nowMs)) {
|
||||
for (const [toolUseId, startedAtIso] of state.activeToolStartedAtByToolId) {
|
||||
const startedAtMs = Date.parse(startedAtIso);
|
||||
if (!Number.isFinite(startedAtMs) || nowMs - startedAtMs >= this.activeToolStaleMs) {
|
||||
state.activeToolStartedAtByToolId.delete(toolUseId);
|
||||
}
|
||||
}
|
||||
}
|
||||
for (const [toolUseId, busyUntilIso] of state.recentBusyUntilByToolId) {
|
||||
if (Date.parse(busyUntilIso) <= nowMs) {
|
||||
state.recentBusyUntilByToolId.delete(toolUseId);
|
||||
}
|
||||
}
|
||||
if (state.activeToolIds.size === 0 && state.recentBusyUntilByToolId.size === 0) {
|
||||
if (state.activeToolStartedAtByToolId.size === 0 && state.recentBusyUntilByToolId.size === 0) {
|
||||
this.activityByMember.delete(key);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -144,6 +144,9 @@ export class TeamTaskStallJournal {
|
|||
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
|
||||
return [];
|
||||
}
|
||||
if (error instanceof SyntaxError) {
|
||||
return [];
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,8 @@
|
|||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import {
|
||||
buildActionableWorkAgenda,
|
||||
validateMemberWorkSyncReport,
|
||||
} from '@features/member-work-sync/core/domain';
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
const nowIso = '2026-04-29T00:00:00.000Z';
|
||||
const hash = (value: string) => `h${value.length}`;
|
||||
|
|
@ -28,6 +27,17 @@ function agendaWithWork() {
|
|||
});
|
||||
}
|
||||
|
||||
function emptyAgenda() {
|
||||
return buildActionableWorkAgenda({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
generatedAt: nowIso,
|
||||
members: [{ name: 'bob' }],
|
||||
tasks: [],
|
||||
hash,
|
||||
});
|
||||
}
|
||||
|
||||
function leadAgendaWithBrokenDependency() {
|
||||
return buildActionableWorkAgenda({
|
||||
teamName: 'team-a',
|
||||
|
|
@ -122,6 +132,27 @@ describe('validateMemberWorkSyncReport', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('rejects still_working when the agenda is empty', () => {
|
||||
const agenda = emptyAgenda();
|
||||
const result = validateMemberWorkSyncReport({
|
||||
request: {
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
state: 'still_working',
|
||||
agendaFingerprint: agenda.fingerprint,
|
||||
},
|
||||
agenda,
|
||||
nowIso,
|
||||
activeMemberNames: ['bob'],
|
||||
tokenValidation: validToken,
|
||||
});
|
||||
|
||||
expect(result).toMatchObject({
|
||||
ok: false,
|
||||
code: 'still_working_rejected_agenda_empty',
|
||||
});
|
||||
});
|
||||
|
||||
it('rejects blocked without current blocker evidence', () => {
|
||||
const agenda = agendaWithWork();
|
||||
const result = validateMemberWorkSyncReport({
|
||||
|
|
|
|||
|
|
@ -524,6 +524,33 @@ describe('MemberWorkSync use cases', () => {
|
|||
expect(result.status.state).toBe('caught_up');
|
||||
});
|
||||
|
||||
it('rejects still_working on an empty agenda without recording a working status', async () => {
|
||||
const { deps, store } = createDeps({ items: [] });
|
||||
const reader = new MemberWorkSyncReconciler(deps);
|
||||
const reporter = new MemberWorkSyncReporter(deps);
|
||||
const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' });
|
||||
|
||||
const result = await reporter.execute({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
state: 'still_working',
|
||||
agendaFingerprint: current.agenda.fingerprint,
|
||||
reportToken: current.reportToken,
|
||||
source: 'test',
|
||||
});
|
||||
|
||||
expect(result.accepted).toBe(false);
|
||||
expect(result.code).toBe('still_working_rejected_agenda_empty');
|
||||
expect(result.status.state).toBe('caught_up');
|
||||
expect(store.writes.at(-1)).toMatchObject({
|
||||
state: 'caught_up',
|
||||
report: {
|
||||
accepted: false,
|
||||
rejectionCode: 'still_working_rejected_agenda_empty',
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it('marks status inactive when the team runtime is not active', async () => {
|
||||
const { deps } = createDeps({ teamActive: false });
|
||||
const status = await new MemberWorkSyncReconciler(deps).execute({
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { decideMemberWorkSyncStatus } from '@features/member-work-sync/core/domain';
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import type { MemberWorkSyncAgenda, MemberWorkSyncReport } from '@features/member-work-sync/contracts';
|
||||
|
||||
|
|
@ -33,4 +32,123 @@ describe('decideMemberWorkSyncStatus', () => {
|
|||
expect(decision.acceptedReport).toBeUndefined();
|
||||
expect(decision.diagnostics).toContain('agenda_empty');
|
||||
});
|
||||
|
||||
it('does not carry stale work reports into an empty caught_up agenda', () => {
|
||||
const agenda: MemberWorkSyncAgenda = {
|
||||
teamName: 'forge-labs',
|
||||
memberName: 'jack',
|
||||
generatedAt: '2026-05-06T19:06:07.257Z',
|
||||
fingerprint: 'agenda-empty',
|
||||
items: [],
|
||||
diagnostics: [],
|
||||
};
|
||||
const legacyReport: MemberWorkSyncReport = {
|
||||
teamName: 'forge-labs',
|
||||
memberName: 'jack',
|
||||
state: 'still_working',
|
||||
agendaFingerprint: agenda.fingerprint,
|
||||
reportedAt: '2026-05-06T19:00:26.089Z',
|
||||
expiresAt: '2026-05-06T19:15:26.089Z',
|
||||
accepted: true,
|
||||
};
|
||||
|
||||
const decision = decideMemberWorkSyncStatus({
|
||||
agenda,
|
||||
latestAcceptedReport: legacyReport,
|
||||
nowIso: '2026-05-06T19:06:07.257Z',
|
||||
});
|
||||
|
||||
expect(decision).toEqual({
|
||||
state: 'caught_up',
|
||||
diagnostics: ['agenda_empty'],
|
||||
});
|
||||
});
|
||||
|
||||
it('treats accepted work reports without a lease as needs_sync', () => {
|
||||
const agenda: MemberWorkSyncAgenda = {
|
||||
teamName: 'forge-labs',
|
||||
memberName: 'jack',
|
||||
generatedAt: '2026-05-06T19:06:07.257Z',
|
||||
fingerprint: 'agenda-active',
|
||||
items: [
|
||||
{
|
||||
taskId: 'task-1',
|
||||
displayId: '11111111',
|
||||
subject: 'Ship it',
|
||||
kind: 'work',
|
||||
assignee: 'jack',
|
||||
priority: 'normal',
|
||||
reason: 'owned_in_progress_task',
|
||||
evidence: {
|
||||
status: 'in_progress',
|
||||
owner: 'jack',
|
||||
},
|
||||
},
|
||||
],
|
||||
diagnostics: [],
|
||||
};
|
||||
const unboundedReport: MemberWorkSyncReport = {
|
||||
teamName: 'forge-labs',
|
||||
memberName: 'jack',
|
||||
state: 'still_working',
|
||||
agendaFingerprint: agenda.fingerprint,
|
||||
reportedAt: '2026-05-06T19:00:26.089Z',
|
||||
accepted: true,
|
||||
};
|
||||
|
||||
const decision = decideMemberWorkSyncStatus({
|
||||
agenda,
|
||||
latestAcceptedReport: unboundedReport,
|
||||
nowIso: '2026-05-06T19:06:07.257Z',
|
||||
});
|
||||
|
||||
expect(decision).toEqual({
|
||||
state: 'needs_sync',
|
||||
diagnostics: ['report_lease_missing'],
|
||||
});
|
||||
});
|
||||
|
||||
it('treats work reports as needs_sync when the current time is invalid', () => {
|
||||
const agenda: MemberWorkSyncAgenda = {
|
||||
teamName: 'forge-labs',
|
||||
memberName: 'jack',
|
||||
generatedAt: '2026-05-06T19:06:07.257Z',
|
||||
fingerprint: 'agenda-active',
|
||||
items: [
|
||||
{
|
||||
taskId: 'task-1',
|
||||
subject: 'Ship it',
|
||||
kind: 'work',
|
||||
assignee: 'jack',
|
||||
priority: 'normal',
|
||||
reason: 'owned_in_progress_task',
|
||||
evidence: {
|
||||
status: 'in_progress',
|
||||
owner: 'jack',
|
||||
},
|
||||
},
|
||||
],
|
||||
diagnostics: [],
|
||||
};
|
||||
const report: MemberWorkSyncReport = {
|
||||
teamName: 'forge-labs',
|
||||
memberName: 'jack',
|
||||
state: 'blocked',
|
||||
agendaFingerprint: agenda.fingerprint,
|
||||
reportedAt: '2026-05-06T19:00:26.089Z',
|
||||
expiresAt: '2026-05-06T20:00:26.089Z',
|
||||
accepted: true,
|
||||
};
|
||||
|
||||
const decision = decideMemberWorkSyncStatus({
|
||||
agenda,
|
||||
latestAcceptedReport: report,
|
||||
nowIso: 'not-a-date',
|
||||
});
|
||||
|
||||
expect(decision).toEqual({
|
||||
state: 'needs_sync',
|
||||
diagnostics: ['report_lease_missing'],
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -529,6 +529,17 @@ describe('MemberWorkSyncNudgeActivationPolicy', () => {
|
|||
).toEqual({ active: true, reason: 'native_stale_in_progress' });
|
||||
});
|
||||
|
||||
it('activates stale native in-progress recovery when a legacy accepted report has no lease', () => {
|
||||
expect(
|
||||
decideMemberWorkSyncNudgeActivation({
|
||||
status: nativeStaleInProgressStatus({
|
||||
diagnostics: ['report_lease_missing'],
|
||||
}),
|
||||
metrics: staleMetrics(),
|
||||
})
|
||||
).toEqual({ active: true, reason: 'native_stale_in_progress' });
|
||||
});
|
||||
|
||||
it('does not activate stale native in-progress recovery when the accepted report state is still current', () => {
|
||||
expect(
|
||||
decideMemberWorkSyncNudgeActivation({
|
||||
|
|
@ -683,7 +694,7 @@ describe('MemberWorkSyncNudgeActivationPolicy', () => {
|
|||
).toEqual({ active: true, reason: 'lead_targeted_shadow_collecting' });
|
||||
});
|
||||
|
||||
it('does not activate stale native in-progress recovery for multiple or non-in-progress work items', () => {
|
||||
it('activates stale native assigned-work recovery for pending and multi-item safe agendas', () => {
|
||||
const baseItem = nativeStaleInProgressStatus().agenda.items[0]!;
|
||||
|
||||
expect(
|
||||
|
|
@ -702,7 +713,7 @@ describe('MemberWorkSyncNudgeActivationPolicy', () => {
|
|||
}),
|
||||
metrics: staleMetrics(),
|
||||
})
|
||||
).toEqual({ active: false, reason: 'blocking_metrics' });
|
||||
).toEqual({ active: true, reason: 'native_stale_in_progress' });
|
||||
|
||||
expect(
|
||||
decideMemberWorkSyncNudgeActivation({
|
||||
|
|
@ -723,7 +734,41 @@ describe('MemberWorkSyncNudgeActivationPolicy', () => {
|
|||
}),
|
||||
metrics: staleMetrics(),
|
||||
})
|
||||
).toEqual({ active: false, reason: 'blocking_metrics' });
|
||||
).toEqual({ active: true, reason: 'native_stale_assigned_work' });
|
||||
|
||||
expect(
|
||||
decideMemberWorkSyncNudgeActivation({
|
||||
status: nativeStaleInProgressStatus({
|
||||
agenda: {
|
||||
...nativeStaleInProgressStatus().agenda,
|
||||
items: [
|
||||
baseItem,
|
||||
{
|
||||
taskId: 'task-review',
|
||||
displayId: '#2',
|
||||
subject: 'Review current request',
|
||||
kind: 'review',
|
||||
assignee: 'alice',
|
||||
priority: 'review_requested',
|
||||
reason: 'current_cycle_review_assigned',
|
||||
evidence: {
|
||||
status: 'completed',
|
||||
owner: 'bob',
|
||||
reviewer: 'alice',
|
||||
reviewState: 'review',
|
||||
reviewCycleId: 'evt-review-request',
|
||||
reviewRequestEventId: 'evt-review-request',
|
||||
reviewObligation: 'review_pickup_required',
|
||||
canBypassPhase2: true,
|
||||
historyEventIds: ['evt-review-request'],
|
||||
},
|
||||
},
|
||||
],
|
||||
},
|
||||
}),
|
||||
metrics: staleMetrics(),
|
||||
})
|
||||
).toEqual({ active: true, reason: 'native_stale_assigned_work' });
|
||||
});
|
||||
|
||||
it('does not activate stale native in-progress recovery for needsFix, review, blocked dependency, or clarification agenda items', () => {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,68 @@
|
|||
import { CompositeMemberWorkSyncBusySignal } from '@features/member-work-sync/main/infrastructure/CompositeMemberWorkSyncBusySignal';
|
||||
import { describe, expect, it, vi } from 'vitest';
|
||||
|
||||
describe('CompositeMemberWorkSyncBusySignal', () => {
|
||||
it('does not block nudges forever when one busy signal fails', async () => {
|
||||
const logger = { warn: vi.fn() };
|
||||
const signal = new CompositeMemberWorkSyncBusySignal(
|
||||
[
|
||||
{
|
||||
isBusy: vi.fn(async () => {
|
||||
throw new Error('delivery status unavailable');
|
||||
}),
|
||||
},
|
||||
{
|
||||
isBusy: vi.fn(async () => ({ busy: false })),
|
||||
},
|
||||
],
|
||||
logger
|
||||
);
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
nowIso: '2026-04-29T00:00:00.000Z',
|
||||
workSyncIntent: 'agenda_sync',
|
||||
taskRefs: [{ teamName: 'team-a', taskId: 'task-1' }],
|
||||
})
|
||||
).resolves.toEqual({ busy: false });
|
||||
expect(logger.warn).toHaveBeenCalledWith(
|
||||
'member work sync busy signal failed',
|
||||
expect.objectContaining({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
error: 'Error: delivery status unavailable',
|
||||
})
|
||||
);
|
||||
});
|
||||
|
||||
it('still stops at the first positive busy signal', async () => {
|
||||
const secondSignal = vi.fn(async () => ({ busy: false }));
|
||||
const signal = new CompositeMemberWorkSyncBusySignal([
|
||||
{
|
||||
isBusy: vi.fn(async () => ({
|
||||
busy: true,
|
||||
reason: 'active_tool_activity',
|
||||
retryAfterIso: '2026-04-29T00:01:00.000Z',
|
||||
})),
|
||||
},
|
||||
{ isBusy: secondSignal },
|
||||
]);
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
nowIso: '2026-04-29T00:00:00.000Z',
|
||||
workSyncIntent: 'agenda_sync',
|
||||
taskRefs: [{ teamName: 'team-a', taskId: 'task-1' }],
|
||||
})
|
||||
).resolves.toEqual({
|
||||
busy: true,
|
||||
reason: 'active_tool_activity',
|
||||
retryAfterIso: '2026-04-29T00:01:00.000Z',
|
||||
});
|
||||
expect(secondSignal).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
|
@ -1,10 +1,10 @@
|
|||
import { mkdir, mkdtemp, readFile, readdir, rm, writeFile } from 'fs/promises';
|
||||
import { join } from 'path';
|
||||
import { tmpdir } from 'os';
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
|
||||
import { JsonMemberWorkSyncStore } from '@features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore';
|
||||
import { MemberWorkSyncStorePaths } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths';
|
||||
import { mkdir, mkdtemp, readdir, readFile, rm, writeFile } from 'fs/promises';
|
||||
import { tmpdir } from 'os';
|
||||
import { join } from 'path';
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
|
||||
import type {
|
||||
MemberWorkSyncNudgePayload,
|
||||
MemberWorkSyncStatus,
|
||||
|
|
@ -430,6 +430,101 @@ describe('JsonMemberWorkSyncStore', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('clears stale retry delay when a fresh reconcile revives the same outbox item', async () => {
|
||||
const input = {
|
||||
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
agendaFingerprint: 'agenda:v1:abc',
|
||||
payloadHash: 'hash-a',
|
||||
payload: makeNudgePayload(),
|
||||
nowIso: '2026-04-29T00:00:00.000Z',
|
||||
};
|
||||
|
||||
await store.ensurePending(input);
|
||||
const [claimed] = await store.claimDue({
|
||||
teamName: 'team-a',
|
||||
claimedBy: 'dispatcher-a',
|
||||
nowIso: '2026-04-29T00:01:00.000Z',
|
||||
limit: 1,
|
||||
});
|
||||
await store.markFailed({
|
||||
teamName: 'team-a',
|
||||
id: input.id,
|
||||
attemptGeneration: claimed.attemptGeneration,
|
||||
retryable: true,
|
||||
error: 'member_busy:active_tool_activity',
|
||||
nextAttemptAt: '2026-04-29T00:30:00.000Z',
|
||||
nowIso: '2026-04-29T00:02:00.000Z',
|
||||
});
|
||||
|
||||
const revived = await store.ensurePending({ ...input, nowIso: '2026-04-29T00:03:00.000Z' });
|
||||
|
||||
expect(revived).toMatchObject({
|
||||
ok: true,
|
||||
outcome: 'existing',
|
||||
item: { status: 'pending', attemptGeneration: 1 },
|
||||
});
|
||||
expect(revived.item).not.toHaveProperty('nextAttemptAt');
|
||||
expect(revived.item).not.toHaveProperty('lastError');
|
||||
|
||||
const [reclaimed] = await store.claimDue({
|
||||
teamName: 'team-a',
|
||||
claimedBy: 'dispatcher-b',
|
||||
nowIso: '2026-04-29T00:04:00.000Z',
|
||||
limit: 1,
|
||||
});
|
||||
expect(reclaimed).toMatchObject({ id: input.id, attemptGeneration: 2 });
|
||||
});
|
||||
|
||||
it('keeps an explicitly requested retry delay when reviving an outbox item', async () => {
|
||||
const input = {
|
||||
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
agendaFingerprint: 'agenda:v1:abc',
|
||||
payloadHash: 'hash-a',
|
||||
payload: makeNudgePayload(),
|
||||
nowIso: '2026-04-29T00:00:00.000Z',
|
||||
};
|
||||
|
||||
await store.ensurePending(input);
|
||||
const [claimed] = await store.claimDue({
|
||||
teamName: 'team-a',
|
||||
claimedBy: 'dispatcher-a',
|
||||
nowIso: '2026-04-29T00:01:00.000Z',
|
||||
limit: 1,
|
||||
});
|
||||
await store.markFailed({
|
||||
teamName: 'team-a',
|
||||
id: input.id,
|
||||
attemptGeneration: claimed.attemptGeneration,
|
||||
retryable: true,
|
||||
error: 'member_busy:active_tool_activity',
|
||||
nextAttemptAt: '2026-04-29T00:30:00.000Z',
|
||||
nowIso: '2026-04-29T00:02:00.000Z',
|
||||
});
|
||||
|
||||
const revived = await store.ensurePending({
|
||||
...input,
|
||||
nextAttemptAt: '2026-04-29T00:10:00.000Z',
|
||||
nowIso: '2026-04-29T00:03:00.000Z',
|
||||
});
|
||||
|
||||
expect(revived.item).toMatchObject({
|
||||
status: 'pending',
|
||||
nextAttemptAt: '2026-04-29T00:10:00.000Z',
|
||||
});
|
||||
await expect(
|
||||
store.claimDue({
|
||||
teamName: 'team-a',
|
||||
claimedBy: 'dispatcher-b',
|
||||
nowIso: '2026-04-29T00:04:00.000Z',
|
||||
limit: 1,
|
||||
})
|
||||
).resolves.toEqual([]);
|
||||
});
|
||||
|
||||
it('finds recent recovery outbox rows by logical intent key', async () => {
|
||||
const olderInput = {
|
||||
id: 'member-work-sync:team-a:bob:agenda:v1:older',
|
||||
|
|
|
|||
|
|
@ -1,6 +1,5 @@
|
|||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { MemberWorkSyncToolActivityBusySignal } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncToolActivityBusySignal';
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import type { TeamChangeEvent, ToolActivityEventPayload } from '@shared/types';
|
||||
|
||||
|
|
@ -142,4 +141,43 @@ describe('MemberWorkSyncToolActivityBusySignal', () => {
|
|||
})
|
||||
).resolves.toEqual({ busy: false });
|
||||
});
|
||||
|
||||
it('expires stale active tools when the finish event is missing', async () => {
|
||||
const signal = new MemberWorkSyncToolActivityBusySignal({
|
||||
busyGraceMs: 90_000,
|
||||
activeToolStaleMs: 10 * 60_000,
|
||||
});
|
||||
|
||||
signal.noteTeamChange(
|
||||
toolEvent('team-a', {
|
||||
action: 'start',
|
||||
activity: {
|
||||
memberName: 'bob',
|
||||
toolUseId: 'tool-1',
|
||||
toolName: 'bash',
|
||||
startedAt: '2026-04-29T00:00:00.000Z',
|
||||
source: 'runtime',
|
||||
},
|
||||
})
|
||||
);
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
nowIso: '2026-04-29T00:09:59.000Z',
|
||||
})
|
||||
).resolves.toMatchObject({
|
||||
busy: true,
|
||||
reason: 'active_tool_activity',
|
||||
});
|
||||
|
||||
await expect(
|
||||
signal.isBusy({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
nowIso: '2026-04-29T00:10:00.000Z',
|
||||
})
|
||||
).resolves.toEqual({ busy: false });
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,10 +1,9 @@
|
|||
import { TeamTaskStallJournalWorkSyncCooldown } from '@features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown';
|
||||
import { mkdir, mkdtemp, rm, writeFile } from 'fs/promises';
|
||||
import { tmpdir } from 'os';
|
||||
import { join } from 'path';
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
|
||||
import { TeamTaskStallJournalWorkSyncCooldown } from '@features/member-work-sync/main/adapters/output/TeamTaskStallJournalWorkSyncCooldown';
|
||||
|
||||
describe('TeamTaskStallJournalWorkSyncCooldown', () => {
|
||||
let root: string;
|
||||
|
||||
|
|
@ -75,4 +74,34 @@ describe('TeamTaskStallJournalWorkSyncCooldown', () => {
|
|||
})
|
||||
).resolves.toBe(false);
|
||||
});
|
||||
|
||||
it('fails open when the watchdog journal is invalid', async () => {
|
||||
await mkdir(join(root, 'team-a'), { recursive: true });
|
||||
await writeFile(join(root, 'team-a', 'stall-monitor-journal.json'), '{bad json', 'utf8');
|
||||
|
||||
const cooldown = new TeamTaskStallJournalWorkSyncCooldown(root, 10 * 60_000);
|
||||
|
||||
await expect(
|
||||
cooldown.hasRecentNudge({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
taskIds: ['task-1'],
|
||||
nowIso: '2026-04-29T00:10:00.000Z',
|
||||
})
|
||||
).resolves.toBe(false);
|
||||
|
||||
await writeFile(
|
||||
join(root, 'team-a', 'stall-monitor-journal.json'),
|
||||
JSON.stringify({ taskId: 'task-1', state: 'alerted' }),
|
||||
'utf8'
|
||||
);
|
||||
await expect(
|
||||
cooldown.hasRecentNudge({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
taskIds: ['task-1'],
|
||||
nowIso: '2026-04-29T00:10:00.000Z',
|
||||
})
|
||||
).resolves.toBe(false);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1423,6 +1423,114 @@ describe('createMemberWorkSyncFeature composition', () => {
|
|||
}
|
||||
});
|
||||
|
||||
it('delivers native stale pending-work recovery nudges despite noisy global metrics', async () => {
|
||||
const claudeRoot = makeTempRoot();
|
||||
setClaudeBasePathOverride(claudeRoot);
|
||||
const teamsBasePath = getTeamsBasePath();
|
||||
const teamName = 'team-native-stale-pending';
|
||||
const memberName = 'alice';
|
||||
const nudgeDeliveryWake = {
|
||||
schedule: vi.fn(async () => undefined),
|
||||
};
|
||||
const feature = createMemberWorkSyncFeature({
|
||||
teamsBasePath,
|
||||
configReader: {
|
||||
getConfig: vi.fn(async () => ({
|
||||
name: teamName,
|
||||
members: [{ name: memberName, providerId: 'codex' }],
|
||||
})),
|
||||
} as never,
|
||||
taskReader: {
|
||||
getTasks: vi.fn(async () => [
|
||||
{
|
||||
id: 'task-1',
|
||||
displayId: '11111111',
|
||||
subject: 'Start assigned pending work',
|
||||
status: 'pending',
|
||||
owner: memberName,
|
||||
},
|
||||
]),
|
||||
} as never,
|
||||
kanbanManager: {
|
||||
getState: vi.fn(async () => ({
|
||||
teamName,
|
||||
reviewers: [],
|
||||
tasks: {},
|
||||
})),
|
||||
} as never,
|
||||
membersMetaStore: {
|
||||
getMembers: vi.fn(async () => []),
|
||||
} as never,
|
||||
isTeamActive: vi.fn(async () => true),
|
||||
nudgeDeliveryWake,
|
||||
queueQuietWindowMs: 1,
|
||||
});
|
||||
|
||||
try {
|
||||
feature.noteTeamChange({ type: 'task', teamName, taskId: 'task-1' } as never);
|
||||
|
||||
let agendaFingerprint = '';
|
||||
await waitForAssertion(async () => {
|
||||
const status = await feature.getStatus({ teamName, memberName });
|
||||
expect(status).toMatchObject({
|
||||
state: 'needs_sync',
|
||||
providerId: 'codex',
|
||||
diagnostics: expect.arrayContaining(['no_current_report']),
|
||||
agenda: {
|
||||
items: [
|
||||
expect.objectContaining({
|
||||
reason: 'owned_pending_task',
|
||||
evidence: expect.objectContaining({ status: 'pending' }),
|
||||
}),
|
||||
],
|
||||
},
|
||||
});
|
||||
agendaFingerprint = status.agenda.fingerprint;
|
||||
});
|
||||
expect(await readInboxMessages({ teamsBasePath, teamName, memberName })).toEqual([]);
|
||||
expect(nudgeDeliveryWake.schedule).not.toHaveBeenCalled();
|
||||
|
||||
await seedNativeStaleInProgressBlockingMetrics({
|
||||
teamsBasePath,
|
||||
teamName,
|
||||
memberName,
|
||||
agendaFingerprint,
|
||||
});
|
||||
feature.noteTeamChange({ type: 'task', teamName, taskId: 'task-1' } as never);
|
||||
|
||||
await waitForAssertion(async () => {
|
||||
const nudges = (await readInboxMessages({ teamsBasePath, teamName, memberName })).filter(
|
||||
(message) => message.messageKind === 'member_work_sync_nudge'
|
||||
);
|
||||
expect(nudges).toHaveLength(1);
|
||||
expect(nudges[0]?.text).toContain('Work sync check');
|
||||
expect(nudges[0]?.text).toContain('11111111');
|
||||
expect(nudgeDeliveryWake.schedule).toHaveBeenCalledTimes(1);
|
||||
await expect(feature.getMetrics({ teamName })).resolves.toMatchObject({
|
||||
phase2Readiness: {
|
||||
reasons: expect.arrayContaining(['would_nudge_rate_high']),
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
const journal = await fs.promises.readFile(
|
||||
path.join(
|
||||
teamsBasePath,
|
||||
teamName,
|
||||
'members',
|
||||
memberName,
|
||||
'.member-work-sync',
|
||||
'journal.jsonl'
|
||||
),
|
||||
'utf8'
|
||||
);
|
||||
expect(journal).toContain('"event":"nudge_delivered"');
|
||||
expect(journal).not.toContain('"reason":"blocking_metrics"');
|
||||
} finally {
|
||||
await feature.dispose();
|
||||
}
|
||||
});
|
||||
|
||||
it('delivers targeted OpenCode nudges even when global phase2 metrics are noisy', async () => {
|
||||
const claudeRoot = makeTempRoot();
|
||||
setClaudeBasePathOverride(claudeRoot);
|
||||
|
|
@ -2557,7 +2665,7 @@ describe('createMemberWorkSyncFeature composition', () => {
|
|||
memberName,
|
||||
toolUseId: 'tool-1',
|
||||
toolName: 'bash',
|
||||
startedAt: '2026-05-05T12:00:00.000Z',
|
||||
startedAt: new Date(Date.now()).toISOString(),
|
||||
source: 'runtime',
|
||||
},
|
||||
}),
|
||||
|
|
@ -2641,6 +2749,109 @@ describe('createMemberWorkSyncFeature composition', () => {
|
|||
}
|
||||
});
|
||||
|
||||
it('clears stale retry delay and recovers when tool activity finishes without agenda changes', async () => {
|
||||
const claudeRoot = makeTempRoot();
|
||||
setClaudeBasePathOverride(claudeRoot);
|
||||
const teamsBasePath = getTeamsBasePath();
|
||||
const teamName = 'team-a';
|
||||
const memberName = 'bob';
|
||||
const tasks = [
|
||||
{
|
||||
id: 'task-1',
|
||||
displayId: '11111111',
|
||||
subject: 'Ship sync after tool finish',
|
||||
status: 'pending',
|
||||
owner: memberName,
|
||||
},
|
||||
];
|
||||
const feature = createMemberWorkSyncFeature({
|
||||
teamsBasePath,
|
||||
configReader: {
|
||||
getConfig: vi.fn(async () => ({
|
||||
name: teamName,
|
||||
members: [{ name: memberName, providerId: 'codex' }],
|
||||
})),
|
||||
} as never,
|
||||
taskReader: {
|
||||
getTasks: vi.fn(async () => tasks),
|
||||
} as never,
|
||||
kanbanManager: {
|
||||
getState: vi.fn(async () => ({
|
||||
teamName,
|
||||
reviewers: [],
|
||||
tasks: {},
|
||||
})),
|
||||
} as never,
|
||||
membersMetaStore: {
|
||||
getMembers: vi.fn(async () => []),
|
||||
} as never,
|
||||
isTeamActive: vi.fn(async () => true),
|
||||
queueQuietWindowMs: 1,
|
||||
});
|
||||
|
||||
try {
|
||||
await seedShadowReadyMetrics({ teamsBasePath, teamName, memberName });
|
||||
feature.noteTeamChange({
|
||||
type: 'tool-activity',
|
||||
teamName,
|
||||
detail: JSON.stringify({
|
||||
action: 'start',
|
||||
activity: {
|
||||
memberName,
|
||||
toolUseId: 'tool-1',
|
||||
toolName: 'bash',
|
||||
startedAt: new Date(Date.now()).toISOString(),
|
||||
source: 'runtime',
|
||||
},
|
||||
}),
|
||||
} as never);
|
||||
feature.noteTeamChange({ type: 'task', teamName, taskId: 'task-1' } as never);
|
||||
|
||||
await waitForAssertion(async () => {
|
||||
expect(await readInboxMessages({ teamsBasePath, teamName, memberName })).toEqual([]);
|
||||
const outboxItems = Object.values(
|
||||
await readMemberOutboxItems({ teamsBasePath, teamName, memberName })
|
||||
);
|
||||
expect(outboxItems).toEqual([
|
||||
expect.objectContaining({
|
||||
status: 'failed_retryable',
|
||||
lastError: 'member_busy:active_tool_activity',
|
||||
nextAttemptAt: expect.any(String),
|
||||
}),
|
||||
]);
|
||||
});
|
||||
|
||||
feature.noteTeamChange({
|
||||
type: 'tool-activity',
|
||||
teamName,
|
||||
detail: JSON.stringify({
|
||||
action: 'finish',
|
||||
memberName,
|
||||
toolUseId: 'tool-1',
|
||||
finishedAt: new Date(Date.now() - 120_000).toISOString(),
|
||||
}),
|
||||
} as never);
|
||||
|
||||
await waitForAssertion(async () => {
|
||||
const nudges = (await readInboxMessages({ teamsBasePath, teamName, memberName })).filter(
|
||||
(message) => message.messageKind === 'member_work_sync_nudge'
|
||||
);
|
||||
expect(nudges).toHaveLength(1);
|
||||
expect(nudges[0]?.text).toContain('11111111');
|
||||
const outboxItems = Object.values(
|
||||
await readMemberOutboxItems({ teamsBasePath, teamName, memberName })
|
||||
);
|
||||
expect(outboxItems).toEqual([
|
||||
expect.objectContaining({
|
||||
status: 'delivered',
|
||||
}),
|
||||
]);
|
||||
});
|
||||
} finally {
|
||||
await feature.dispose();
|
||||
}
|
||||
});
|
||||
|
||||
it('rate-limits the active loop after two delivered nudges per member per hour', async () => {
|
||||
const claudeRoot = makeTempRoot();
|
||||
setClaudeBasePathOverride(claudeRoot);
|
||||
|
|
@ -3241,6 +3452,235 @@ describe('createMemberWorkSyncFeature composition', () => {
|
|||
}
|
||||
});
|
||||
|
||||
it('refreshes a legacy still_working report without a lease during nudge dispatch', async () => {
|
||||
const claudeRoot = makeTempRoot();
|
||||
setClaudeBasePathOverride(claudeRoot);
|
||||
const teamsBasePath = getTeamsBasePath();
|
||||
const teamName = 'team-a';
|
||||
const memberName = 'bob';
|
||||
const feature = createMemberWorkSyncFeature({
|
||||
teamsBasePath,
|
||||
configReader: {
|
||||
getConfig: vi.fn(async () => ({
|
||||
name: teamName,
|
||||
members: [{ name: memberName, providerId: 'codex' }],
|
||||
})),
|
||||
} as never,
|
||||
taskReader: {
|
||||
getTasks: vi.fn(async () => [
|
||||
{
|
||||
id: 'task-1',
|
||||
displayId: '11111111',
|
||||
subject: 'Wake after missing lease',
|
||||
status: 'in_progress',
|
||||
owner: memberName,
|
||||
},
|
||||
]),
|
||||
} as never,
|
||||
kanbanManager: {
|
||||
getState: vi.fn(async () => ({
|
||||
teamName,
|
||||
reviewers: [],
|
||||
tasks: {},
|
||||
})),
|
||||
} as never,
|
||||
membersMetaStore: {
|
||||
getMembers: vi.fn(async () => []),
|
||||
} as never,
|
||||
isTeamActive: vi.fn(async () => true),
|
||||
canDispatchNudges: vi.fn(async () => true),
|
||||
});
|
||||
|
||||
try {
|
||||
await seedBlockingShadowCollectingMetrics({ teamsBasePath, teamName, memberName });
|
||||
const current = await feature.refreshStatus({ teamName, memberName });
|
||||
await expect(
|
||||
feature.report({
|
||||
teamName,
|
||||
memberName,
|
||||
state: 'still_working',
|
||||
agendaFingerprint: current.agenda.fingerprint,
|
||||
reportToken: current.reportToken,
|
||||
taskIds: ['task-1'],
|
||||
source: 'test',
|
||||
})
|
||||
).resolves.toMatchObject({
|
||||
accepted: true,
|
||||
status: { state: 'still_working', report: { accepted: true } },
|
||||
});
|
||||
|
||||
const store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(teamsBasePath));
|
||||
const acceptedStatus = await store.read({ teamName, memberName });
|
||||
const legacyReport = { ...acceptedStatus!.report! };
|
||||
delete legacyReport.expiresAt;
|
||||
await store.write({
|
||||
...acceptedStatus!,
|
||||
evaluatedAt: new Date(Date.now() - 7 * 60_000).toISOString(),
|
||||
report: legacyReport,
|
||||
});
|
||||
await seedShadowReadyMetrics({ teamsBasePath, teamName, memberName });
|
||||
|
||||
await expect(feature.dispatchDueNudges([teamName])).resolves.toEqual({
|
||||
claimed: 1,
|
||||
delivered: 1,
|
||||
superseded: 0,
|
||||
retryable: 0,
|
||||
terminal: 0,
|
||||
});
|
||||
expect(await readInboxMessages({ teamsBasePath, teamName, memberName })).toHaveLength(1);
|
||||
} finally {
|
||||
await feature.dispose();
|
||||
}
|
||||
});
|
||||
|
||||
it('repairs a legacy working status when the stored agenda is empty', async () => {
|
||||
const claudeRoot = makeTempRoot();
|
||||
setClaudeBasePathOverride(claudeRoot);
|
||||
const teamsBasePath = getTeamsBasePath();
|
||||
const teamName = 'team-a';
|
||||
const memberName = 'bob';
|
||||
const feature = createMemberWorkSyncFeature({
|
||||
teamsBasePath,
|
||||
configReader: {
|
||||
getConfig: vi.fn(async () => ({
|
||||
name: teamName,
|
||||
members: [{ name: memberName, providerId: 'codex' }],
|
||||
})),
|
||||
} as never,
|
||||
taskReader: {
|
||||
getTasks: vi.fn(async () => []),
|
||||
} as never,
|
||||
kanbanManager: {
|
||||
getState: vi.fn(async () => ({
|
||||
teamName,
|
||||
reviewers: [],
|
||||
tasks: {},
|
||||
})),
|
||||
} as never,
|
||||
membersMetaStore: {
|
||||
getMembers: vi.fn(async () => []),
|
||||
} as never,
|
||||
isTeamActive: vi.fn(async () => true),
|
||||
canDispatchNudges: vi.fn(async () => true),
|
||||
});
|
||||
|
||||
try {
|
||||
const current = await feature.refreshStatus({ teamName, memberName });
|
||||
expect(current.state).toBe('caught_up');
|
||||
const store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(teamsBasePath));
|
||||
await store.write({
|
||||
...current,
|
||||
state: 'still_working',
|
||||
report: {
|
||||
teamName,
|
||||
memberName,
|
||||
state: 'still_working',
|
||||
agendaFingerprint: current.agenda.fingerprint,
|
||||
reportedAt: current.evaluatedAt,
|
||||
expiresAt: new Date(Date.now() + 60 * 60_000).toISOString(),
|
||||
accepted: true,
|
||||
},
|
||||
});
|
||||
|
||||
await expect(feature.dispatchDueNudges([teamName])).resolves.toEqual({
|
||||
claimed: 0,
|
||||
delivered: 0,
|
||||
superseded: 0,
|
||||
retryable: 0,
|
||||
terminal: 0,
|
||||
});
|
||||
const repaired = await store.read({ teamName, memberName });
|
||||
expect(repaired).toMatchObject({
|
||||
state: 'caught_up',
|
||||
diagnostics: expect.arrayContaining(['agenda_empty']),
|
||||
});
|
||||
expect(repaired?.report).toBeUndefined();
|
||||
expect(await readInboxMessages({ teamsBasePath, teamName, memberName })).toHaveLength(0);
|
||||
} finally {
|
||||
await feature.dispose();
|
||||
}
|
||||
});
|
||||
|
||||
it('refreshes stale caught_up status during nudge dispatch when new work appears', async () => {
|
||||
const claudeRoot = makeTempRoot();
|
||||
setClaudeBasePathOverride(claudeRoot);
|
||||
const teamsBasePath = getTeamsBasePath();
|
||||
const teamName = 'team-a';
|
||||
const memberName = 'bob';
|
||||
let tasks: Array<{
|
||||
id: string;
|
||||
displayId: string;
|
||||
subject: string;
|
||||
status: 'pending';
|
||||
owner: string;
|
||||
}> = [];
|
||||
const feature = createMemberWorkSyncFeature({
|
||||
teamsBasePath,
|
||||
configReader: {
|
||||
getConfig: vi.fn(async () => ({
|
||||
name: teamName,
|
||||
members: [{ name: memberName, providerId: 'codex' }],
|
||||
})),
|
||||
} as never,
|
||||
taskReader: {
|
||||
getTasks: vi.fn(async () => tasks),
|
||||
} as never,
|
||||
kanbanManager: {
|
||||
getState: vi.fn(async () => ({
|
||||
teamName,
|
||||
reviewers: [],
|
||||
tasks: {},
|
||||
})),
|
||||
} as never,
|
||||
membersMetaStore: {
|
||||
getMembers: vi.fn(async () => []),
|
||||
} as never,
|
||||
isTeamActive: vi.fn(async () => true),
|
||||
canDispatchNudges: vi.fn(async () => true),
|
||||
});
|
||||
|
||||
try {
|
||||
await seedShadowReadyMetrics({ teamsBasePath, teamName, memberName });
|
||||
const current = await feature.refreshStatus({ teamName, memberName });
|
||||
expect(current.state).toBe('caught_up');
|
||||
const store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(teamsBasePath));
|
||||
await store.write({
|
||||
...current,
|
||||
evaluatedAt: new Date(Date.now() - 7 * 60_000).toISOString(),
|
||||
});
|
||||
tasks = [
|
||||
{
|
||||
id: 'task-1',
|
||||
displayId: '11111111',
|
||||
subject: 'Wake after missed task event',
|
||||
status: 'pending',
|
||||
owner: memberName,
|
||||
},
|
||||
];
|
||||
|
||||
await expect(feature.dispatchDueNudges([teamName])).resolves.toEqual({
|
||||
claimed: 1,
|
||||
delivered: 1,
|
||||
superseded: 0,
|
||||
retryable: 0,
|
||||
terminal: 0,
|
||||
});
|
||||
const nudges = (await readInboxMessages({ teamsBasePath, teamName, memberName })).filter(
|
||||
(message) => message.messageKind === 'member_work_sync_nudge'
|
||||
);
|
||||
expect(nudges).toHaveLength(1);
|
||||
expect(nudges[0]?.text).toContain('11111111');
|
||||
await expect(store.read({ teamName, memberName })).resolves.toMatchObject({
|
||||
state: 'needs_sync',
|
||||
agenda: {
|
||||
items: [expect.objectContaining({ taskId: 'task-1' })],
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
await feature.dispose();
|
||||
}
|
||||
});
|
||||
|
||||
it('materializes a missing active-member status during nudge dispatch', async () => {
|
||||
const claudeRoot = makeTempRoot();
|
||||
setClaudeBasePathOverride(claudeRoot);
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
import * as fs from 'fs/promises';
|
||||
import * as os from 'os';
|
||||
import * as path from 'path';
|
||||
import { afterEach, describe, expect, it } from 'vitest';
|
||||
import * as fs from 'fs/promises';
|
||||
|
||||
import { TeamTaskStallJournal } from '../../../../../src/main/services/team/stallMonitor/TeamTaskStallJournal';
|
||||
import { setClaudeBasePathOverride } from '../../../../../src/main/utils/pathDecoder';
|
||||
|
|
@ -101,4 +101,43 @@ describe('TeamTaskStallJournal', () => {
|
|||
}>;
|
||||
expect(saved.map((entry) => entry.epochKey)).toEqual(['task-codex:epoch-1']);
|
||||
});
|
||||
|
||||
it('recovers from an invalid journal file on the next scan', async () => {
|
||||
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'stall-journal-'));
|
||||
setClaudeBasePathOverride(tmpDir);
|
||||
const teamDir = path.join(tmpDir, 'teams', 'demo');
|
||||
await fs.mkdir(teamDir, { recursive: true });
|
||||
const journalPath = path.join(teamDir, 'stall-monitor-journal.json');
|
||||
await fs.writeFile(journalPath, '{bad json', 'utf8');
|
||||
|
||||
const journal = new TeamTaskStallJournal();
|
||||
const evaluation = {
|
||||
status: 'alert',
|
||||
taskId: 'task-a',
|
||||
branch: 'work',
|
||||
signal: 'turn_ended_after_touch',
|
||||
epochKey: 'task-a:epoch-1',
|
||||
reason: 'Potential work stall',
|
||||
} as const;
|
||||
|
||||
await expect(
|
||||
journal.reconcileScan({
|
||||
teamName: 'demo',
|
||||
evaluations: [evaluation],
|
||||
activeTaskIds: ['task-a'],
|
||||
now: '2026-04-19T12:10:00.000Z',
|
||||
})
|
||||
).resolves.toEqual([]);
|
||||
|
||||
const saved = JSON.parse(await fs.readFile(journalPath, 'utf8')) as Array<{
|
||||
epochKey: string;
|
||||
state: string;
|
||||
}>;
|
||||
expect(saved).toEqual([
|
||||
expect.objectContaining({
|
||||
epochKey: 'task-a:epoch-1',
|
||||
state: 'suspected',
|
||||
}),
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Reference in a new issue