fix(member-work-sync): harden recovery delivery

This commit is contained in:
777genius 2026-06-02 14:10:53 +03:00
parent cf1f525486
commit 3b9b6a5469
12 changed files with 1446 additions and 150 deletions

View file

@ -56,6 +56,29 @@ function isTurnSettledReconcile(status: MemberWorkSyncStatus): boolean {
return status.shadow?.triggerReasons?.includes('turn_settled') === true;
}
function parseTime(value: string | undefined): number | null {
if (!value) {
return null;
}
const time = Date.parse(value);
return Number.isFinite(time) ? time : null;
}
function hasActiveAcceptedWorkLease(status: MemberWorkSyncStatus): boolean {
const report = status.report;
if (
report?.accepted !== true ||
report.agendaFingerprint !== status.agenda.fingerprint ||
(report.state !== 'still_working' && report.state !== 'blocked')
) {
return false;
}
const evaluatedAtMs = parseTime(status.evaluatedAt);
const expiresAtMs = parseTime(report.expiresAt);
return evaluatedAtMs != null && expiresAtMs != null && expiresAtMs > evaluatedAtMs;
}
function shouldPlanStatusOnlyRecovery(input: {
status: MemberWorkSyncStatus;
baseInput: MemberWorkSyncOutboxEnsureInput;
@ -68,7 +91,7 @@ function shouldPlanStatusOnlyRecovery(input: {
input.baseInput.payload.workSyncIntent === 'agenda_sync' &&
input.baseInput.payload.workSyncIntentKey === undefined &&
input.existingItemStatus === 'delivered' &&
input.status.report?.accepted !== true
!hasActiveAcceptedWorkLease(input.status)
);
}
@ -84,18 +107,10 @@ function shouldPlanAgendaSyncRefreshRecovery(input: {
input.baseInput.payload.workSyncIntentKey === undefined &&
input.existingItem.status === 'delivered' &&
input.existingItem.agendaFingerprint === input.baseInput.agendaFingerprint &&
input.status.report?.accepted !== true
!hasActiveAcceptedWorkLease(input.status)
);
}
function parseTime(value: string | undefined): number | null {
if (!value) {
return null;
}
const time = Date.parse(value);
return Number.isFinite(time) ? time : null;
}
function isDeliveredStillStuckRecoveryReason(reason: MemberWorkSyncNudgeActivationReason): boolean {
return (
reason === 'shadow_ready' ||
@ -125,7 +140,7 @@ function shouldPlanDeliveredStillStuckRecovery(input: {
input.baseInput.payload.workSyncIntentKey !== undefined ||
!recoverableExistingItem ||
input.existingItem.agendaFingerprint !== input.baseInput.agendaFingerprint ||
input.status.report?.accepted === true ||
hasActiveAcceptedWorkLease(input.status) ||
!isDeliveredStillStuckRecoveryReason(input.activationReason)
) {
return false;

View file

@ -71,6 +71,7 @@ export type MemberWorkSyncAuditEventName =
| 'turn_settled_ignored'
| 'queue_enqueued'
| 'queue_coalesced'
| 'queue_retry_scheduled'
| 'queue_reconciled'
| 'queue_dropped'
| 'reconcile_started'

View file

@ -110,6 +110,7 @@ interface OutboxIndexFile {
type OutboxIndexRoute = OutboxIndexFile['items'][string];
type OutboxDueRoute = [string, OutboxIndexRoute];
const MEMBER_WORK_SYNC_OUTBOX_CLAIM_STALE_MS = 5 * 60 * 1000;
export interface JsonMemberWorkSyncStoreDeps {
auditJournal?: MemberWorkSyncAuditJournalPort;
@ -117,8 +118,12 @@ export interface JsonMemberWorkSyncStoreDeps {
now?: () => Date;
}
function normalizeMemberKey(memberName: string): string {
return memberName.trim().toLowerCase();
function normalizeMemberKey(memberName: unknown): string {
return typeof memberName === 'string' ? memberName.trim().toLowerCase() : '';
}
function normalizeTeamKey(teamName: unknown): string {
return typeof teamName === 'string' ? teamName.trim().toLowerCase() : '';
}
function emptyMetricsIndex(): MetricsIndexFile {
@ -242,6 +247,46 @@ function canReviveOutboxItem(status: MemberWorkSyncOutboxItem['status']): boolea
return status === 'superseded' || (!isOutboxTerminal(status) && status !== 'pending');
}
function isReportIntentOwnedBy(
teamName: string,
memberName: string,
intent: MemberWorkSyncReportIntent
): boolean {
return (
normalizeTeamKey(intent.teamName) === normalizeTeamKey(teamName) &&
normalizeMemberKey(intent.memberName) === normalizeMemberKey(memberName)
);
}
function isOutboxItemOwnedBy(
teamName: string,
memberName: string,
item: MemberWorkSyncOutboxItem
): boolean {
return (
normalizeTeamKey(item.teamName) === normalizeTeamKey(teamName) &&
normalizeMemberKey(item.memberName) === normalizeMemberKey(memberName)
);
}
function parseIsoMs(value: string | undefined): number | null {
if (!value) {
return null;
}
const ms = Date.parse(value);
return Number.isFinite(ms) ? ms : null;
}
function isStaleClaim(claimedAt: string | undefined, nowIso: string): boolean {
const claimedAtMs = parseIsoMs(claimedAt);
const nowMs = parseIsoMs(nowIso);
return (
claimedAtMs != null &&
nowMs != null &&
nowMs - claimedAtMs >= MEMBER_WORK_SYNC_OUTBOX_CLAIM_STALE_MS
);
}
function applyOptionalNextAttemptAt(
item: MemberWorkSyncOutboxItem,
nextAttemptAt: string | undefined
@ -254,6 +299,9 @@ function applyOptionalNextAttemptAt(
}
function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boolean {
if (item.status === 'claimed') {
return isStaleClaim(item.claimedAt ?? item.updatedAt, nowIso);
}
if (item.status !== 'pending' && item.status !== 'failed_retryable') {
return false;
}
@ -263,14 +311,23 @@ function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boo
return item.nextAttemptAt <= nowIso;
}
function canClaimOutboxRoute(route: OutboxIndexRoute, nowIso: string): boolean {
if (route.status === 'claimed') {
return isStaleClaim(route.updatedAt, nowIso);
}
return (
(route.status === 'pending' || route.status === 'failed_retryable') &&
(!route.nextAttemptAt || route.nextAttemptAt <= nowIso)
);
}
function getDueOutboxRoutes(
index: OutboxIndexFile,
nowIso: string,
limit: number
): OutboxDueRoute[] {
return Object.entries(index.items)
.filter(([, route]) => route.status === 'pending' || route.status === 'failed_retryable')
.filter(([, route]) => !route.nextAttemptAt || route.nextAttemptAt <= nowIso)
.filter(([, route]) => canClaimOutboxRoute(route, nowIso))
.sort((left, right) => {
const leftTime = left[1].nextAttemptAt ?? left[1].updatedAt;
const rightTime = right[1].nextAttemptAt ?? right[1].updatedAt;
@ -623,10 +680,10 @@ export class JsonMemberWorkSyncStore
staleIndex = true;
}
}
const missingIndexedPending = staleIndex
const unindexedOrStaleIndexedPending = staleIndex
? false
: await this.hasMissingIndexedPendingReport(teamName, index);
if (staleIndex || missingIndexedPending) {
: await this.hasUnindexedOrStaleIndexedPendingReport(teamName, index);
if (staleIndex || unindexedOrStaleIndexedPending) {
await this.enqueue(teamName, async () => {
await withFileLock(this.paths.getPendingReportsIndexPath(teamName), async () => {
index = await this.repairPendingReportsIndex(teamName);
@ -666,29 +723,58 @@ export class JsonMemberWorkSyncStore
if (!route) {
return;
}
await withFileLock(
this.paths.getMemberReportsPath(teamName, route.memberName),
async () => {
const reports = await this.readMemberReportsFile(teamName, route.memberName);
const current = reports.intents[id];
if (current?.status !== 'pending') {
return;
const updateRoute = async (
targetRoute: PendingReportsIndexFile['items'][string]
): Promise<boolean> => {
let staleRoute = false;
await withFileLock(
this.paths.getMemberReportsPath(teamName, targetRoute.memberName),
async () => {
const reports = await this.readMemberReportsFile(teamName, targetRoute.memberName);
const current = reports.intents[id];
if (!current) {
delete index.items[id];
staleRoute = true;
return;
}
if (!isReportIntentOwnedBy(teamName, targetRoute.memberName, current)) {
delete index.items[id];
staleRoute = true;
return;
}
if (current.status !== 'pending') {
return;
}
const next: MemberWorkSyncReportIntent = {
...current,
status: result.status,
resultCode: result.resultCode,
processedAt: result.processedAt,
};
reports.intents[id] = next;
await this.writeMemberReportsFile(teamName, targetRoute.memberName, reports);
index.items[id] = toPendingReportIndexItem(
next,
this.paths.getMemberKey(next.memberName)
);
await this.writePendingReportsIndexFile(teamName, index);
}
reports.intents[id] = {
...current,
status: result.status,
resultCode: result.resultCode,
processedAt: result.processedAt,
};
await this.writeMemberReportsFile(teamName, route.memberName, reports);
index.items[id] = {
...route,
status: result.status,
processedAt: result.processedAt,
};
await this.writePendingReportsIndexFile(teamName, index);
);
return staleRoute;
};
let staleRoute = await updateRoute(route);
if (staleRoute) {
index = await this.repairPendingReportsIndex(teamName);
const repairedRoute = index.items[id];
if (!repairedRoute) {
return;
}
);
staleRoute = await updateRoute(repairedRoute);
if (staleRoute) {
await this.repairPendingReportsIndex(teamName);
}
}
});
});
}
@ -801,45 +887,67 @@ export class JsonMemberWorkSyncStore
}
let dueRoutes = getDueOutboxRoutes(index, input.nowIso, input.limit);
if (
dueRoutes.length > 0 &&
dueRoutes.length < Math.max(0, input.limit) &&
(await this.hasMissingIndexedDueOutboxItem(input.teamName, index, input.nowIso))
(await this.hasUnindexedOrStaleIndexedDueOutboxItem(input.teamName, index, input.nowIso))
) {
index = await this.repairOutboxIndex(input.teamName);
dueRoutes = getDueOutboxRoutes(index, input.nowIso, input.limit);
}
let staleIndex = false;
for (const [id, route] of dueRoutes) {
await withFileLock(
this.paths.getMemberOutboxPath(input.teamName, route.memberName),
async () => {
const outbox = await this.readMemberOutboxFile(input.teamName, route.memberName);
const item = outbox.items[id];
if (!item || !canClaimOutboxItem(item, input.nowIso)) {
delete index.items[id];
staleIndex = true;
return;
}
const next: MemberWorkSyncOutboxItem = {
...item,
status: 'claimed',
attemptGeneration: item.attemptGeneration + 1,
claimedBy: input.claimedBy,
claimedAt: input.nowIso,
updatedAt: input.nowIso,
};
delete next.lastError;
outbox.items[id] = next;
await this.writeMemberOutboxFile(input.teamName, route.memberName, outbox);
index.items[id] = toOutboxIndexItem(next, route.memberKey);
claimed.push(next);
const claimRoutes = async (routes: OutboxDueRoute[]): Promise<boolean> => {
let staleIndex = false;
for (const [id, route] of routes) {
if (claimed.length >= Math.max(0, input.limit)) {
break;
}
);
}
await withFileLock(
this.paths.getMemberOutboxPath(input.teamName, route.memberName),
async () => {
const outbox = await this.readMemberOutboxFile(input.teamName, route.memberName);
const item = outbox.items[id];
if (!item || !canClaimOutboxItem(item, input.nowIso)) {
delete index.items[id];
staleIndex = true;
return;
}
const memberKey = this.paths.getMemberKey(item.memberName);
if (!isOutboxItemOwnedBy(input.teamName, route.memberName, item)) {
delete index.items[id];
staleIndex = true;
return;
}
const next: MemberWorkSyncOutboxItem = {
...item,
status: 'claimed',
attemptGeneration: item.attemptGeneration + 1,
claimedBy: input.claimedBy,
claimedAt: input.nowIso,
updatedAt: input.nowIso,
};
delete next.nextAttemptAt;
delete next.lastError;
outbox.items[id] = next;
await this.writeMemberOutboxFile(input.teamName, route.memberName, outbox);
index.items[id] = toOutboxIndexItem(next, memberKey);
claimed.push(next);
}
);
}
return staleIndex;
};
let staleIndex = await claimRoutes(dueRoutes);
if (staleIndex) {
index = await this.repairOutboxIndex(input.teamName);
const remainingLimit = Math.max(0, input.limit) - claimed.length;
dueRoutes =
remainingLimit > 0 ? getDueOutboxRoutes(index, input.nowIso, remainingLimit) : [];
staleIndex = dueRoutes.length > 0 ? await claimRoutes(dueRoutes) : false;
if (staleIndex) {
await this.repairOutboxIndex(input.teamName);
} else if (dueRoutes.length > 0) {
await this.writeOutboxIndexFile(input.teamName, index);
}
} else if (dueRoutes.length > 0) {
await this.writeOutboxIndexFile(input.teamName, index);
}
@ -996,7 +1104,8 @@ export class JsonMemberWorkSyncStore
(item) =>
item.payload.workSyncIntentKey === intentKey &&
item.updatedAt >= input.sinceIso &&
item.status !== 'failed_terminal'
item.status !== 'failed_terminal' &&
item.status !== 'superseded'
)
.sort((left, right) => right.updatedAt.localeCompare(left.updatedAt));
const latest = matches[0];
@ -1171,17 +1280,48 @@ export class JsonMemberWorkSyncStore
if (!route) {
return;
}
await withFileLock(this.paths.getMemberOutboxPath(teamName, route.memberName), async () => {
const outbox = await this.readMemberOutboxFile(teamName, route.memberName);
const next = updater(outbox.items[id]);
if (!next) {
const updateRoute = async (targetRoute: OutboxIndexRoute): Promise<boolean> => {
let staleRoute = false;
await withFileLock(
this.paths.getMemberOutboxPath(teamName, targetRoute.memberName),
async () => {
const outbox = await this.readMemberOutboxFile(teamName, targetRoute.memberName);
const current = outbox.items[id];
if (!current) {
delete index.items[id];
staleRoute = true;
return;
}
if (!isOutboxItemOwnedBy(teamName, targetRoute.memberName, current)) {
delete index.items[id];
staleRoute = true;
return;
}
const next = updater(current);
if (!next) {
return;
}
outbox.items[id] = next;
await this.writeMemberOutboxFile(teamName, targetRoute.memberName, outbox);
index.items[id] = toOutboxIndexItem(next, this.paths.getMemberKey(next.memberName));
await this.writeOutboxIndexFile(teamName, index);
}
);
return staleRoute;
};
let staleRoute = await updateRoute(route);
if (staleRoute) {
index = await this.repairOutboxIndex(teamName);
const repairedRoute = index.items[id];
if (!repairedRoute) {
return;
}
outbox.items[id] = next;
await this.writeMemberOutboxFile(teamName, route.memberName, outbox);
index.items[id] = toOutboxIndexItem(next, route.memberKey);
await this.writeOutboxIndexFile(teamName, index);
});
staleRoute = await updateRoute(repairedRoute);
if (staleRoute) {
await this.repairOutboxIndex(teamName);
}
}
});
});
}
@ -1251,11 +1391,17 @@ export class JsonMemberWorkSyncStore
for (const { memberName, reports } of await this.scanMemberReports(teamName)) {
const memberKey = this.paths.getMemberKey(memberName);
for (const intent of Object.values(reports.intents)) {
if (!isReportIntentOwnedBy(teamName, memberName, intent)) {
continue;
}
index.items[intent.id] = toPendingReportIndexItem(intent, memberKey);
repairedMembers.add(intent.memberName);
}
}
for (const intent of Object.values((await this.readLegacyPendingFile(teamName)).intents)) {
if (!isReportIntentOwnedBy(teamName, intent.memberName, intent)) {
continue;
}
const memberKey = this.paths.getMemberKey(intent.memberName);
if (!index.items[intent.id]) {
await withFileLock(
@ -1300,11 +1446,17 @@ export class JsonMemberWorkSyncStore
for (const { memberName, outbox } of await this.scanMemberOutboxes(teamName)) {
const memberKey = this.paths.getMemberKey(memberName);
for (const item of Object.values(outbox.items)) {
if (!isOutboxItemOwnedBy(teamName, memberName, item)) {
continue;
}
index.items[item.id] = toOutboxIndexItem(item, memberKey);
repairedMembers.add(item.memberName);
}
}
for (const item of Object.values((await this.readLegacyOutboxFile(teamName)).items)) {
if (!isOutboxItemOwnedBy(teamName, item.memberName, item)) {
continue;
}
const memberKey = this.paths.getMemberKey(item.memberName);
if (!index.items[item.id]) {
await withFileLock(this.paths.getMemberOutboxPath(teamName, item.memberName), async () => {
@ -1382,26 +1534,54 @@ export class JsonMemberWorkSyncStore
return reports;
}
private async hasMissingIndexedPendingReport(
private async hasUnindexedOrStaleIndexedPendingReport(
teamName: string,
index: PendingReportsIndexFile
): Promise<boolean> {
const indexedIds = new Set(Object.keys(index.items));
for (const { reports } of await this.scanMemberReports(teamName)) {
const routes = index.items;
for (const { memberName, reports } of await this.scanMemberReports(teamName)) {
for (const intent of Object.values(reports.intents)) {
if (intent.status === 'pending' && !indexedIds.has(intent.id)) {
if (!isReportIntentOwnedBy(teamName, memberName, intent)) {
continue;
}
const route = routes[intent.id];
if (
intent.status === 'pending' &&
!this.isCurrentPendingReportRoute(teamName, route, intent)
) {
return true;
}
}
}
for (const intent of Object.values((await this.readLegacyPendingFile(teamName)).intents)) {
if (intent.status === 'pending' && !indexedIds.has(intent.id)) {
if (!isReportIntentOwnedBy(teamName, intent.memberName, intent)) {
continue;
}
const route = routes[intent.id];
if (
intent.status === 'pending' &&
!this.isCurrentPendingReportRoute(teamName, route, intent)
) {
return true;
}
}
return false;
}
private isCurrentPendingReportRoute(
teamName: string,
route: PendingReportsIndexFile['items'][string] | undefined,
intent: MemberWorkSyncReportIntent
): boolean {
return (
!!route &&
normalizeTeamKey(intent.teamName) === normalizeTeamKey(teamName) &&
route.status === 'pending' &&
normalizeMemberKey(route.memberName) === normalizeMemberKey(intent.memberName) &&
route.memberKey === this.paths.getMemberKey(intent.memberName)
);
}
private async scanMemberOutboxes(
teamName: string
): Promise<{ memberName: string; outbox: MemberOutboxFile }[]> {
@ -1412,27 +1592,56 @@ export class JsonMemberWorkSyncStore
return outboxes;
}
private async hasMissingIndexedDueOutboxItem(
private async hasUnindexedOrStaleIndexedDueOutboxItem(
teamName: string,
index: OutboxIndexFile,
nowIso: string
): Promise<boolean> {
const indexedIds = new Set(Object.keys(index.items));
for (const { outbox } of await this.scanMemberOutboxes(teamName)) {
const routes = index.items;
for (const { memberName, outbox } of await this.scanMemberOutboxes(teamName)) {
for (const item of Object.values(outbox.items)) {
if (canClaimOutboxItem(item, nowIso) && !indexedIds.has(item.id)) {
if (!isOutboxItemOwnedBy(teamName, memberName, item)) {
continue;
}
const route = routes[item.id];
if (
canClaimOutboxItem(item, nowIso) &&
!this.isCurrentDueOutboxRoute(teamName, route, item, nowIso)
) {
return true;
}
}
}
for (const item of Object.values((await this.readLegacyOutboxFile(teamName)).items)) {
if (canClaimOutboxItem(item, nowIso) && !indexedIds.has(item.id)) {
if (!isOutboxItemOwnedBy(teamName, item.memberName, item)) {
continue;
}
const route = routes[item.id];
if (
canClaimOutboxItem(item, nowIso) &&
!this.isCurrentDueOutboxRoute(teamName, route, item, nowIso)
) {
return true;
}
}
return false;
}
private isCurrentDueOutboxRoute(
teamName: string,
route: OutboxIndexRoute | undefined,
item: MemberWorkSyncOutboxItem,
nowIso: string
): boolean {
return (
!!route &&
normalizeTeamKey(item.teamName) === normalizeTeamKey(teamName) &&
normalizeMemberKey(route.memberName) === normalizeMemberKey(item.memberName) &&
route.memberKey === this.paths.getMemberKey(item.memberName) &&
canClaimOutboxRoute(route, nowIso)
);
}
private async appendAudit(input: {
teamName: string;
memberName: string;

View file

@ -61,6 +61,7 @@ interface QueueItem {
maxRunAt: number;
triggerReasons: Set<MemberWorkSyncTriggerReason>;
triggerReasonCounts: Map<MemberWorkSyncTriggerReason, number>;
retryCount: number;
recovery?: MemberWorkSyncReconcileContext['recovery'];
}
@ -87,6 +88,8 @@ export interface MemberWorkSyncEventQueueDeps {
quietWindowMs?: number;
triggerTiming?: Partial<Record<MemberWorkSyncTriggerReason, Partial<TriggerTimingPolicy>>>;
concurrency?: number;
retryDelayMs?: number;
maxRetryAttempts?: number;
now?: () => number;
nowIso?: () => string;
auditJournal?: MemberWorkSyncAuditJournalPort;
@ -107,6 +110,8 @@ export class MemberWorkSyncEventQueue {
private readonly inFlight = new Set<Promise<void>>();
private readonly quietWindowMs: number;
private readonly concurrency: number;
private readonly retryDelayMs: number;
private readonly maxRetryAttempts: number;
private readonly now: () => number;
private readonly nowIso: () => string;
private timer: ReturnType<typeof setTimeout> | null = null;
@ -122,6 +127,8 @@ export class MemberWorkSyncEventQueue {
constructor(private readonly deps: MemberWorkSyncEventQueueDeps) {
this.quietWindowMs = deps.quietWindowMs ?? 90_000;
this.concurrency = Math.max(1, deps.concurrency ?? 2);
this.retryDelayMs = Math.max(0, deps.retryDelayMs ?? 30_000);
this.maxRetryAttempts = Math.max(0, deps.maxRetryAttempts ?? 3);
this.now = deps.now ?? Date.now;
this.nowIso = deps.nowIso ?? (() => new Date().toISOString());
}
@ -209,6 +216,7 @@ export class MemberWorkSyncEventQueue {
? Math.min(existing.runAt, runAt)
: Math.min(Math.max(existing.runAt, runAt), existing.maxRunAt);
incrementReasonCount(existing.triggerReasonCounts, input.triggerReason);
existing.retryCount = 0;
this.counters.coalesced += 1;
this.appendAudit({
teamName,
@ -230,6 +238,7 @@ export class MemberWorkSyncEventQueue {
maxRunAt: now + timing.maxCoalesceWaitMs,
triggerReasons: new Set([input.triggerReason]),
triggerReasonCounts: new Map([[input.triggerReason, 1]]),
retryCount: 0,
...(input.recovery ? { recovery: input.recovery } : {}),
});
this.counters.enqueued += 1;
@ -366,8 +375,10 @@ export class MemberWorkSyncEventQueue {
};
this.running.set(key, running);
let failed = false;
const promise = this.executeItem(key, item, running)
.catch((error: unknown) => {
failed = true;
this.counters.failed += 1;
this.deps.logger?.warn('member work sync queue reconcile failed', {
teamName: item.teamName,
@ -380,6 +391,8 @@ export class MemberWorkSyncEventQueue {
this.inFlight.delete(promise);
if (running.rerunRequested && !this.stopped) {
this.enqueueFollowUp(item, running);
} else if (failed && !this.stopped) {
this.enqueueRetryAfterFailure(key, item, running);
}
this.pump();
});
@ -387,6 +400,53 @@ export class MemberWorkSyncEventQueue {
this.inFlight.add(promise);
}
private enqueueRetryAfterFailure(key: string, item: QueueItem, running: RunningItem): void {
if (item.retryCount >= this.maxRetryAttempts) {
this.counters.dropped += 1;
this.appendAudit({
teamName: item.teamName,
memberName: item.memberName,
event: 'queue_dropped',
source: 'event_queue',
reason: 'reconcile_failed_max_retries',
triggerReasons: [...running.triggerReasons].sort(),
metadata: {
retryCount: item.retryCount,
maxRetryAttempts: this.maxRetryAttempts,
},
});
return;
}
const now = this.now();
const retryCount = item.retryCount + 1;
const recovery = running.recovery ?? item.recovery;
this.items.set(key, {
...item,
lastQueuedAt: now,
runAt: now + this.retryDelayMs,
maxRunAt: now + this.retryDelayMs,
triggerReasons: new Set(running.triggerReasons),
triggerReasonCounts: new Map(item.triggerReasonCounts),
retryCount,
...(recovery ? { recovery } : {}),
});
this.appendAudit({
teamName: item.teamName,
memberName: item.memberName,
event: 'queue_retry_scheduled',
source: 'event_queue',
reason: 'reconcile_failed',
triggerReasons: [...running.triggerReasons].sort(),
metadata: {
retryCount,
retryDelayMs: this.retryDelayMs,
maxRetryAttempts: this.maxRetryAttempts,
},
});
this.schedule();
}
private enqueueFollowUp(item: QueueItem, running: RunningItem): void {
const reasons = [...running.triggerReasons].sort();
const recovery = running.recovery ?? item.recovery;

View file

@ -2146,17 +2146,8 @@ async function initializeServices(): Promise<void> {
return Number.isFinite(expiresAtMs) && expiresAtMs > Date.now();
});
scheduleStartupTask(() => {
void teamDataService
.listTeams()
.then(async (teams) => {
const lifecycleActiveTeamNames = teams
.filter(
(team) =>
!team.deletedAt &&
(teamProvisioningService.isTeamAlive(team.teamName) ||
teamProvisioningService.hasProvisioningRun(team.teamName))
)
.map((team) => team.teamName);
void listMemberWorkSyncLifecycleActiveTeamNames()
.then(async (lifecycleActiveTeamNames) => {
await memberWorkSyncFeature?.replayPendingReports(lifecycleActiveTeamNames);
await memberWorkSyncFeature?.enqueueStartupScan(lifecycleActiveTeamNames);
})

View file

@ -6162,6 +6162,27 @@ export class TeamProvisioningService {
return enabled;
}
private async markOpenCodePromptLedgerFailedTerminal(input: {
ledger: OpenCodePromptDeliveryLedgerStore;
id: string;
reason: string;
diagnostics?: string[];
failedAt: string;
eventContext?: Record<string, unknown>;
}): Promise<OpenCodePromptDeliveryLedgerRecord> {
const failed = await input.ledger.markFailedTerminal({
id: input.id,
reason: input.reason,
...(input.diagnostics ? { diagnostics: input.diagnostics } : {}),
failedAt: input.failedAt,
});
this.logOpenCodePromptDeliveryEvent('opencode_prompt_delivery_terminal_failure', failed, {
reason: input.reason,
...(input.eventContext ?? {}),
});
return failed;
}
private async findOpenCodeVisibleReplyByRelayOfMessageId(input: {
teamName: string;
replyRecipient?: string | null;
@ -7243,7 +7264,8 @@ export class TeamProvisioningService {
input.ledgerRecord.maxSessionRefreshAttempts ??
OPENCODE_PROMPT_DELIVERY_SESSION_REFRESH_MAX_ATTEMPTS;
if ((input.ledgerRecord.sessionRefreshAttempts ?? 0) >= maxSessionRefreshAttempts) {
return await input.ledger.markFailedTerminal({
return await this.markOpenCodePromptLedgerFailedTerminal({
ledger: input.ledger,
id: input.ledgerRecord.id,
reason: 'opencode_session_stale_observe_loop_after_accepted_prompt',
diagnostics: [
@ -7251,6 +7273,11 @@ export class TeamProvisioningService {
`OpenCode session stayed stale while observing an accepted prompt after ${maxSessionRefreshAttempts} attempt(s).`,
],
failedAt: now,
eventContext: {
observeOnlyAfterAcceptedPrompt: true,
sessionRefreshAttempts: input.ledgerRecord.sessionRefreshAttempts ?? 0,
maxSessionRefreshAttempts,
},
});
}
const delayMs = OPENCODE_PROMPT_DELIVERY_RETRY_DELAY_MS;
@ -7287,7 +7314,8 @@ export class TeamProvisioningService {
input.ledgerRecord.maxSessionRefreshAttempts ??
OPENCODE_PROMPT_DELIVERY_SESSION_REFRESH_MAX_ATTEMPTS;
if ((input.ledgerRecord.sessionRefreshAttempts ?? 0) >= maxSessionRefreshAttempts) {
return await input.ledger.markFailedTerminal({
return await this.markOpenCodePromptLedgerFailedTerminal({
ledger: input.ledger,
id: input.ledgerRecord.id,
reason: 'opencode_session_refresh_loop_after_resolved_behavior_changed',
diagnostics: [
@ -7295,6 +7323,11 @@ export class TeamProvisioningService {
`OpenCode session stayed stale after ${maxSessionRefreshAttempts} session refresh attempt(s).`,
],
failedAt: now,
eventContext: {
retry: true,
sessionRefreshAttempts: input.ledgerRecord.sessionRefreshAttempts ?? 0,
maxSessionRefreshAttempts,
},
});
}
const delayMs = this.getOpenCodeDeliveryNextDelayMs({
@ -7338,10 +7371,12 @@ export class TeamProvisioningService {
input.ledgerRecord.attempts >= input.ledgerRecord.maxAttempts &&
!canScheduleNoAssistantRecoveryRetry
) {
return await input.ledger.markFailedTerminal({
return await this.markOpenCodePromptLedgerFailedTerminal({
ledger: input.ledger,
id: input.ledgerRecord.id,
reason: input.reason,
failedAt: now,
eventContext: { retry: input.retry },
});
}
const delayMs = this.getOpenCodeDeliveryNextDelayMs({
@ -23493,6 +23528,29 @@ export class TeamProvisioningService {
.catch(() => []);
const targetMessage = inboxMessages.find((message) => message.messageId === onlyMessageId);
if (targetMessage?.read) {
if (targetMessage.messageKind === 'member_work_sync_nudge') {
this.scheduleOpenCodeMemberInboxDeliveryWake({
teamName,
memberName,
messageId: onlyMessageId,
delayMs: 500,
});
const diagnostic = `opencode_work_sync_read_commit_waiting_for_active_relay: ${onlyMessageId}`;
return {
relayed: 0,
attempted: 1,
delivered: 0,
failed: 0,
lastDelivery: {
delivered: true,
accepted: false,
responsePending: true,
reason: 'opencode_work_sync_read_commit_waiting_for_active_relay',
diagnostics: [diagnostic],
},
diagnostics: [diagnostic],
};
}
return {
relayed: 0,
attempted: 1,
@ -23576,7 +23634,7 @@ export class TeamProvisioningService {
const onlyMessageId = options.onlyMessageId?.trim();
if (onlyMessageId) {
const targetMessage = inboxMessages.find((message) => message.messageId === onlyMessageId);
if (targetMessage?.read) {
if (targetMessage?.read && targetMessage.messageKind !== 'member_work_sync_nudge') {
return {
relayed: 0,
attempted: 1,
@ -23603,8 +23661,13 @@ export class TeamProvisioningService {
}
const unread = inboxMessages
.filter((message): message is InboxMessage & { messageId: string } => {
if (message.read) return false;
if (onlyMessageId && message.messageId !== onlyMessageId) return false;
if (
message.read &&
(!onlyMessageId || message.messageKind !== 'member_work_sync_nudge')
) {
return false;
}
if (typeof message.text !== 'string' || message.text.trim().length === 0) return false;
return this.hasStableMessageId(message);
})
@ -23813,17 +23876,14 @@ export class TeamProvisioningService {
pendingRecord
);
}
failedRecord = await promptLedger.markFailedTerminal({
failedRecord = await this.markOpenCodePromptLedgerFailedTerminal({
ledger: promptLedger,
id: pendingRecord.id,
reason: attachmentPayloads.reason,
diagnostics: attachmentPayloads.diagnostics,
failedAt: nowIso(),
eventContext: { attachmentPayloadUnavailable: true },
});
this.logOpenCodePromptDeliveryEvent(
'opencode_prompt_delivery_response_observed',
failedRecord,
{ attachmentPayloadUnavailable: true }
);
} catch (error) {
const diagnostic = `opencode_inbox_attachment_terminal_ledger_failed: ${getErrorMessage(
error

View file

@ -902,9 +902,7 @@ describe('MemberWorkSync use cases', () => {
busySignal: {
isBusy: async () => {
busyChecks += 1;
return busyChecks > 1
? { busy: true, reason: 'recent_tool_activity' }
: { busy: false };
return busyChecks > 1 ? { busy: true, reason: 'recent_tool_activity' } : { busy: false };
},
},
});
@ -1093,6 +1091,31 @@ describe('MemberWorkSync use cases', () => {
expect(recoverySummary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 });
expect(inbox.inserted).toHaveLength(2);
expect(inbox.inserted[1]?.messageId).toContain('agenda-sync-still-stuck');
clock.set('2026-04-29T01:02:00.000Z');
store.metricsGeneratedAt = '2026-04-29T01:02:00.000Z';
await reconciler.execute(
{
teamName: 'team-a',
memberName: 'team-lead',
},
{ reconciledBy: 'queue', triggerReasons: ['manual_refresh'] }
);
const recoveryItems = [...outbox.items.values()].filter((item) =>
item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:')
);
expect(recoveryItems).toHaveLength(2);
expect(new Set(recoveryItems.map((item) => item.id)).size).toBe(2);
const secondRecoverySummary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
teamNames: ['team-a'],
claimedBy: 'test-dispatcher',
});
expect(secondRecoverySummary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 });
expect(inbox.inserted).toHaveLength(3);
expect(inbox.inserted[2]?.messageId).toContain('agenda-sync-still-stuck');
});
it('creates an agenda-sync refresh recovery when a delivered nudge has a stale payload hash', async () => {
@ -1396,6 +1419,184 @@ describe('MemberWorkSync use cases', () => {
expect(inbox.inserted).toHaveLength(3);
});
it('creates a delivered-still-stuck recovery after an accepted still_working lease expires', async () => {
const outbox = new InMemoryOutboxStore();
const inbox = new InMemoryInboxNudge();
const { clock, deps, store } = createDeps({
providerId: 'codex',
outboxStore: outbox,
inboxNudge: inbox,
});
store.phase2ReadinessState = 'shadow_ready';
const reconciler = new MemberWorkSyncReconciler(deps);
const reporter = new MemberWorkSyncReporter(deps);
const firstStatus = await reconciler.execute(
{
teamName: 'team-a',
memberName: 'bob',
},
{ reconciledBy: 'queue', triggerReasons: ['task_changed'] }
);
await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
teamNames: ['team-a'],
claimedBy: 'test-dispatcher',
});
const baseId = `member-work-sync:team-a:bob:${firstStatus.agenda.fingerprint}`;
expect(outbox.items.get(baseId)).toMatchObject({ status: 'delivered' });
await reporter.execute({
teamName: 'team-a',
memberName: 'bob',
state: 'still_working',
agendaFingerprint: firstStatus.agenda.fingerprint,
reportToken: firstStatus.reportToken,
taskIds: ['task-1'],
leaseTtlMs: 120_000,
source: 'test',
});
clock.set('2026-04-29T00:10:00.000Z');
store.phase2ReadinessState = 'blocked';
store.phase2ReadinessReasons = ['would_nudge_rate_high'];
store.metricsGeneratedAt = '2026-04-29T00:10:00.000Z';
store.recentEvents = [
{
id: 'old-report-accepted',
teamName: 'team-a',
memberName: 'bob',
kind: 'report_accepted',
state: 'still_working',
agendaFingerprint: firstStatus.agenda.fingerprint,
recordedAt: '2026-04-29T00:01:00.000Z',
actionableCount: 1,
providerId: 'codex',
},
{
id: 'needs-sync-after-lease-expired',
teamName: 'team-a',
memberName: 'bob',
kind: 'status_evaluated',
state: 'needs_sync',
agendaFingerprint: firstStatus.agenda.fingerprint,
recordedAt: '2026-04-29T00:04:00.000Z',
actionableCount: 1,
providerId: 'codex',
},
];
const expiredStatus = await reconciler.execute(
{
teamName: 'team-a',
memberName: 'bob',
},
{ reconciledBy: 'queue', triggerReasons: ['manual_refresh'] }
);
expect(expiredStatus).toMatchObject({
state: 'needs_sync',
diagnostics: expect.arrayContaining(['report_lease_expired']),
});
expect(expiredStatus.report).toBeUndefined();
const recovery = [...outbox.items.values()].find((item) =>
item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:')
);
expect(recovery).toMatchObject({
status: 'pending',
agendaFingerprint: firstStatus.agenda.fingerprint,
});
const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
teamNames: ['team-a'],
claimedBy: 'test-dispatcher',
});
expect(summary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 });
expect(inbox.inserted).toHaveLength(2);
expect(inbox.inserted[1]?.messageId).toContain('agenda-sync-still-stuck');
});
it('creates a delivered-still-stuck recovery for mixed review pickup and native work under noisy metrics', async () => {
const outbox = new InMemoryOutboxStore();
const inbox = new InMemoryInboxNudge();
const inProgressItem: MemberWorkSyncActionableWorkItem = {
...workItem,
reason: 'owned_in_progress_task',
evidence: {
status: 'in_progress',
owner: 'bob',
},
};
const { clock, deps, store } = createDeps({
items: [reviewPickupItem, inProgressItem],
providerId: 'codex',
outboxStore: outbox,
inboxNudge: inbox,
});
store.phase2ReadinessState = 'shadow_ready';
const reconciler = new MemberWorkSyncReconciler(deps);
const firstStatus = await reconciler.execute(
{
teamName: 'team-a',
memberName: 'bob',
},
{ reconciledBy: 'queue', triggerReasons: ['task_changed'] }
);
await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
teamNames: ['team-a'],
claimedBy: 'test-dispatcher',
});
const baseId = `member-work-sync:team-a:bob:${firstStatus.agenda.fingerprint}`;
expect(outbox.items.get(baseId)).toMatchObject({ status: 'delivered' });
clock.set('2026-04-29T00:10:00.000Z');
store.phase2ReadinessState = 'blocked';
store.phase2ReadinessReasons = ['would_nudge_rate_high'];
store.metricsGeneratedAt = '2026-04-29T00:10:00.000Z';
store.recentEvents = [
{
id: 'mixed-needs-sync-stable',
teamName: 'team-a',
memberName: 'bob',
kind: 'status_evaluated',
state: 'needs_sync',
agendaFingerprint: firstStatus.agenda.fingerprint,
recordedAt: '2026-04-29T00:02:00.000Z',
actionableCount: 2,
providerId: 'codex',
},
];
await reconciler.execute(
{
teamName: 'team-a',
memberName: 'bob',
},
{ reconciledBy: 'queue', triggerReasons: ['manual_refresh'] }
);
const recovery = [...outbox.items.values()].find((item) =>
item.payload.workSyncIntentKey?.startsWith('agenda-sync-still-stuck:')
);
expect(recovery).toMatchObject({
status: 'pending',
agendaFingerprint: firstStatus.agenda.fingerprint,
});
expect(recovery?.payload.text).toContain('still no accepted member_work_sync_report');
const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
teamNames: ['team-a'],
claimedBy: 'test-dispatcher',
});
expect(summary).toMatchObject({ claimed: 1, delivered: 1, retryable: 0 });
expect(inbox.inserted).toHaveLength(2);
expect(inbox.inserted[1]?.messageId).toContain('agenda-sync-still-stuck');
});
it('records an existing delivered agenda nudge as skipped before still-stuck recovery age', async () => {
const outbox = new InMemoryOutboxStore();
const inbox = new InMemoryInboxNudge();

View file

@ -305,6 +305,107 @@ describe('JsonMemberWorkSyncStore', () => {
).toEqual(['bob', 'tom']);
});
it('repairs a stale processed pending-report index route when member report is pending', async () => {
const request = {
teamName: 'team-a',
memberName: 'bob',
state: 'still_working' as const,
agendaFingerprint: 'agenda:v1:abc',
reportToken: 'wrs:v1.test',
source: 'mcp' as const,
};
await store.appendPendingReport(request, 'control_api_unavailable');
const [intent] = await store.listPendingReports('team-a');
await store.markPendingReportProcessed('team-a', intent!.id, {
status: 'accepted',
resultCode: 'accepted',
processedAt: '2026-04-29T00:01:00.000Z',
});
const reportsPath = join(memberWorkSyncDir(root, 'team-a', 'bob'), 'reports.json');
const reports = JSON.parse(await readFile(reportsPath, 'utf8'));
reports.intents[intent!.id] = {
...reports.intents[intent!.id],
status: 'pending',
};
delete reports.intents[intent!.id].resultCode;
delete reports.intents[intent!.id].processedAt;
await writeFile(reportsPath, JSON.stringify(reports), 'utf8');
const pending = await store.listPendingReports('team-a');
expect(pending).toHaveLength(1);
expect(pending[0]).toMatchObject({
id: intent!.id,
memberName: 'bob',
status: 'pending',
});
});
it('repairs stale pending-report update routes before marking processed', async () => {
const request = {
teamName: 'team-a',
memberName: 'bob',
state: 'still_working' as const,
agendaFingerprint: 'agenda:v1:abc',
reportToken: 'wrs:v1.test',
source: 'mcp' as const,
};
await store.appendPendingReport(request, 'control_api_unavailable');
await mkdir(memberWorkSyncDir(root, 'team-a', 'tom'), { recursive: true });
const [intent] = await store.listPendingReports('team-a');
await writeFile(
join(memberWorkSyncDir(root, 'team-a', 'tom'), 'reports.json'),
JSON.stringify({
schemaVersion: 2,
intents: {
[intent!.id]: {
...intent!,
teamName: 'other-team',
memberName: 'tom',
},
},
}),
'utf8'
);
const indexPath = join(
root,
'team-a',
'.member-work-sync',
'indexes',
'pending-reports-index.json'
);
const index = JSON.parse(await readFile(indexPath, 'utf8'));
index.items[intent!.id] = {
...index.items[intent!.id],
memberKey: 'tom',
memberName: 'tom',
};
await writeFile(indexPath, JSON.stringify(index), 'utf8');
await store.markPendingReportProcessed('team-a', intent!.id, {
status: 'accepted',
resultCode: 'accepted',
processedAt: '2026-04-29T00:01:00.000Z',
});
const reports = JSON.parse(
await readFile(join(memberWorkSyncDir(root, 'team-a', 'bob'), 'reports.json'), 'utf8')
);
expect(reports.intents[intent!.id]).toMatchObject({
memberName: 'bob',
status: 'accepted',
resultCode: 'accepted',
});
const repaired = JSON.parse(await readFile(indexPath, 'utf8'));
expect(repaired.items[intent!.id]).toMatchObject({
memberKey: 'bob',
memberName: 'bob',
status: 'accepted',
});
});
it('records bounded shadow metrics from status writes', async () => {
await store.write(makeStatus({}));
await store.write(
@ -761,6 +862,34 @@ describe('JsonMemberWorkSyncStore', () => {
).resolves.toBeNull();
});
it('ignores superseded rows for logical recovery lookup', async () => {
const input = {
id: 'member-work-sync:team-a:bob:agenda:v1:superseded',
teamName: 'team-a',
memberName: 'bob',
agendaFingerprint: 'agenda:v1:superseded',
payloadHash: 'hash-a',
payload: makeNudgePayload({ workSyncIntentKey: 'proof-missing:message-1' }),
nowIso: '2026-04-29T00:00:00.000Z',
};
await store.ensurePending(input);
await store.markSuperseded({
teamName: 'team-a',
id: input.id,
reason: 'status_no_longer_matches_outbox',
nowIso: '2026-04-29T00:01:00.000Z',
});
await expect(
store.findRecentRecoveryByIntent({
teamName: 'team-a',
memberName: 'bob',
intentKey: 'proof-missing:message-1',
sinceIso: '2026-04-29T00:00:00.000Z',
})
).resolves.toBeNull();
});
it('claims due outbox items and fences terminal updates by attempt generation', async () => {
const input = {
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
@ -838,6 +967,56 @@ describe('JsonMemberWorkSyncStore', () => {
});
});
it('reclaims stale claimed outbox items without waiting for a fresh reconcile', async () => {
const input = {
id: 'member-work-sync:team-a:bob:agenda:v1:stale-claim',
teamName: 'team-a',
memberName: 'bob',
agendaFingerprint: 'agenda:v1:stale-claim',
payloadHash: 'hash-a',
payload: makeNudgePayload(),
nowIso: '2026-04-29T00:00:00.000Z',
};
await store.ensurePending(input);
const [claimed] = await store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-a',
nowIso: '2026-04-29T00:01:00.000Z',
limit: 1,
});
expect(claimed).toMatchObject({
id: input.id,
status: 'claimed',
attemptGeneration: 1,
claimedBy: 'dispatcher-a',
claimedAt: '2026-04-29T00:01:00.000Z',
});
await expect(
store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-b',
nowIso: '2026-04-29T00:05:59.000Z',
limit: 1,
})
).resolves.toEqual([]);
const [reclaimed] = await store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-b',
nowIso: '2026-04-29T00:06:00.000Z',
limit: 1,
});
expect(reclaimed).toMatchObject({
id: input.id,
status: 'claimed',
attemptGeneration: 2,
claimedBy: 'dispatcher-b',
claimedAt: '2026-04-29T00:06:00.000Z',
});
});
it('claims due outbox items from the index without scanning unrelated member outboxes', async () => {
const bobInput = {
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
@ -1210,6 +1389,221 @@ describe('JsonMemberWorkSyncStore', () => {
expect(claimed.map((item) => item.memberName).sort()).toEqual(['bob', 'tom']);
});
it('rewrites stale due outbox member keys while claiming', async () => {
const input = {
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
teamName: 'team-a',
memberName: 'bob',
agendaFingerprint: 'agenda:v1:abc',
payloadHash: 'hash-a',
payload: makeNudgePayload(),
nowIso: '2026-04-29T00:00:00.000Z',
};
await store.ensurePending(input);
const indexPath = join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json');
const index = JSON.parse(await readFile(indexPath, 'utf8'));
index.items[input.id] = {
...index.items[input.id],
memberKey: 'tom',
memberName: 'bob',
};
await writeFile(indexPath, JSON.stringify(index), 'utf8');
const [claimed] = await store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-a',
nowIso: '2026-04-29T00:01:00.000Z',
limit: 1,
});
expect(claimed).toMatchObject({
id: input.id,
memberName: 'bob',
status: 'claimed',
});
const repaired = JSON.parse(await readFile(indexPath, 'utf8'));
expect(repaired.items[input.id]).toMatchObject({
memberKey: 'bob',
memberName: 'bob',
status: 'claimed',
});
});
it('repairs stale outbox update routes before marking failures', async () => {
const input = {
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
teamName: 'team-a',
memberName: 'bob',
agendaFingerprint: 'agenda:v1:abc',
payloadHash: 'hash-a',
payload: makeNudgePayload(),
nowIso: '2026-04-29T00:00:00.000Z',
};
await store.ensurePending(input);
await mkdir(memberWorkSyncDir(root, 'team-a', 'tom'), { recursive: true });
const [claimed] = await store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-a',
nowIso: '2026-04-29T00:01:00.000Z',
limit: 1,
});
await writeFile(
join(memberWorkSyncDir(root, 'team-a', 'tom'), 'outbox.json'),
JSON.stringify({
schemaVersion: 2,
items: {
[input.id]: {
...input,
teamName: 'other-team',
memberName: 'tom',
status: 'claimed',
attemptGeneration: claimed!.attemptGeneration,
claimedBy: 'dispatcher-a',
claimedAt: '2026-04-29T00:01:00.000Z',
updatedAt: '2026-04-29T00:01:00.000Z',
},
},
}),
'utf8'
);
const indexPath = join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json');
const index = JSON.parse(await readFile(indexPath, 'utf8'));
index.items[input.id] = {
...index.items[input.id],
memberKey: 'tom',
memberName: 'tom',
};
await writeFile(indexPath, JSON.stringify(index), 'utf8');
await store.markFailed({
teamName: 'team-a',
id: input.id,
attemptGeneration: claimed!.attemptGeneration,
error: 'delivery failed',
retryable: true,
nextAttemptAt: '2026-04-29T00:10:00.000Z',
nowIso: '2026-04-29T00:02:00.000Z',
});
const memberOutbox = JSON.parse(
await readFile(join(memberWorkSyncDir(root, 'team-a', 'bob'), 'outbox.json'), 'utf8')
);
expect(memberOutbox.items[input.id]).toMatchObject({
status: 'failed_retryable',
lastError: 'delivery failed',
nextAttemptAt: '2026-04-29T00:10:00.000Z',
});
const repaired = JSON.parse(await readFile(indexPath, 'utf8'));
expect(repaired.items[input.id]).toMatchObject({
memberKey: 'bob',
memberName: 'bob',
status: 'failed_retryable',
});
});
it('repairs wrong-member due outbox index routes before returning a limited claim', async () => {
const bobInput = {
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
teamName: 'team-a',
memberName: 'bob',
agendaFingerprint: 'agenda:v1:abc',
payloadHash: 'hash-a',
payload: makeNudgePayload(),
nowIso: '2026-04-29T00:00:00.000Z',
};
await store.ensurePending(bobInput);
await mkdir(memberWorkSyncDir(root, 'team-a', 'tom'), { recursive: true });
await writeFile(
join(memberWorkSyncDir(root, 'team-a', 'tom'), 'outbox.json'),
JSON.stringify({
schemaVersion: 2,
items: {
[bobInput.id]: {
...bobInput,
teamName: 'other-team',
memberName: 'tom',
status: 'pending',
createdAt: '2026-04-29T00:00:00.000Z',
updatedAt: '2026-04-29T00:00:00.000Z',
},
},
}),
'utf8'
);
const indexPath = join(root, 'team-a', '.member-work-sync', 'indexes', 'outbox-index.json');
const index = JSON.parse(await readFile(indexPath, 'utf8'));
index.items[bobInput.id] = {
...index.items[bobInput.id],
memberKey: 'tom',
memberName: 'tom',
};
await writeFile(indexPath, JSON.stringify(index), 'utf8');
const claimed = await store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-a',
nowIso: '2026-04-29T00:01:00.000Z',
limit: 1,
});
expect(claimed.map((item) => item.memberName)).toEqual(['bob']);
const repaired = JSON.parse(await readFile(indexPath, 'utf8'));
expect(repaired.items[bobInput.id]).toMatchObject({
memberKey: 'bob',
memberName: 'bob',
status: 'claimed',
});
});
it('repairs stale terminal outbox index routes when member-scoped item is due', async () => {
const input = {
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
teamName: 'team-a',
memberName: 'bob',
agendaFingerprint: 'agenda:v1:abc',
payloadHash: 'hash-a',
payload: makeNudgePayload(),
nowIso: '2026-04-29T00:00:00.000Z',
};
await store.ensurePending(input);
const [claimed] = await store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-a',
nowIso: '2026-04-29T00:01:00.000Z',
limit: 1,
});
await store.markDelivered({
teamName: 'team-a',
id: input.id,
attemptGeneration: claimed!.attemptGeneration,
deliveredMessageId: input.id,
nowIso: '2026-04-29T00:02:00.000Z',
});
const memberOutboxPath = join(memberWorkSyncDir(root, 'team-a', 'bob'), 'outbox.json');
const memberOutbox = JSON.parse(await readFile(memberOutboxPath, 'utf8'));
memberOutbox.items[input.id] = {
...memberOutbox.items[input.id],
status: 'pending',
updatedAt: '2026-04-29T00:03:00.000Z',
};
delete memberOutbox.items[input.id].deliveredMessageId;
await writeFile(memberOutboxPath, JSON.stringify(memberOutbox), 'utf8');
const [reclaimed] = await store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-b',
nowIso: '2026-04-29T00:04:00.000Z',
limit: 1,
});
expect(reclaimed).toMatchObject({
id: input.id,
status: 'claimed',
attemptGeneration: 2,
claimedBy: 'dispatcher-b',
});
});
it('falls back to legacy v1 status and materializes legacy outbox during claim', async () => {
const auditEvents: MemberWorkSyncAuditEvent[] = [];
store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(root), {

View file

@ -1,6 +1,5 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { MemberWorkSyncEventQueue } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
describe('MemberWorkSyncEventQueue', () => {
beforeEach(() => {
@ -370,4 +369,100 @@ describe('MemberWorkSyncEventQueue', () => {
expect(reconciles).toHaveLength(2);
await queue.stop();
});
it('retries a failed reconcile with bounded backoff', async () => {
const reconciles: unknown[] = [];
const auditEvents: string[] = [];
const queue = new MemberWorkSyncEventQueue({
quietWindowMs: 1,
retryDelayMs: 10,
maxRetryAttempts: 2,
reconcile: async (request) => {
reconciles.push(request);
if (reconciles.length === 1) {
throw new Error('transient');
}
},
isTeamActive: () => true,
auditJournal: {
append: async (event) => {
auditEvents.push(event.event);
},
},
});
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'turn_settled' });
await vi.advanceTimersByTimeAsync(1);
expect(reconciles).toHaveLength(1);
expect(queue.getDiagnostics()).toMatchObject({ failed: 1, queued: 1, reconciled: 0 });
expect(auditEvents).toEqual(['queue_enqueued', 'queue_retry_scheduled']);
await vi.advanceTimersByTimeAsync(9);
expect(reconciles).toHaveLength(1);
await vi.advanceTimersByTimeAsync(1);
expect(reconciles).toHaveLength(2);
expect(queue.getDiagnostics()).toMatchObject({ failed: 1, queued: 0, reconciled: 1 });
await queue.stop();
});
it('drops a failed reconcile after the retry budget is exhausted', async () => {
const reconcile = vi.fn(async () => {
throw new Error('still failing');
});
const queue = new MemberWorkSyncEventQueue({
quietWindowMs: 1,
retryDelayMs: 10,
maxRetryAttempts: 1,
reconcile,
isTeamActive: () => true,
});
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'turn_settled' });
await vi.advanceTimersByTimeAsync(1);
await vi.advanceTimersByTimeAsync(10);
await vi.advanceTimersByTimeAsync(1_000);
expect(reconcile).toHaveBeenCalledTimes(2);
expect(queue.getDiagnostics()).toMatchObject({
dropped: 1,
failed: 2,
queued: 0,
reconciled: 0,
});
await queue.stop();
});
it('resets retry budget when a fresh event joins a queued retry item', async () => {
const reconcile = vi.fn(async () => {
throw new Error('still failing');
});
const queue = new MemberWorkSyncEventQueue({
quietWindowMs: 1,
retryDelayMs: 10,
maxRetryAttempts: 1,
reconcile,
isTeamActive: () => true,
});
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'turn_settled' });
await vi.advanceTimersByTimeAsync(1);
expect(queue.getDiagnostics()).toMatchObject({ failed: 1, queued: 1, dropped: 0 });
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' });
await vi.advanceTimersByTimeAsync(10);
expect(reconcile).toHaveBeenCalledTimes(2);
expect(queue.getDiagnostics()).toMatchObject({
dropped: 0,
failed: 2,
queued: 1,
reconciled: 0,
});
await queue.stop();
});
});

View file

@ -1,6 +1,5 @@
import { describe, expect, it, vi } from 'vitest';
import { MemberWorkSyncNudgeDispatchScheduler } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncNudgeDispatchScheduler';
import { describe, expect, it, vi } from 'vitest';
describe('MemberWorkSyncNudgeDispatchScheduler', () => {
it('dispatches due nudges for unique active teams without overlapping runs', async () => {

View file

@ -711,9 +711,7 @@ type TeamProvisioningServicePrivateHarness = {
applyProcessBootstrapTransportOverlay: (
input: Record<string, unknown>
) => Record<string, unknown>;
reconcilePersistedLaunchState: (
teamName: string
) => Promise<{
reconcilePersistedLaunchState: (teamName: string) => Promise<{
snapshot: null;
statuses: Record<string, never>;
}>;
@ -1152,6 +1150,104 @@ describe('TeamProvisioningService', () => {
expect(nextRecord.status).toBe('retry_scheduled');
});
it('emits a terminal failure event when exhausted work-sync proof retries fail', async () => {
const svc = new TeamProvisioningService();
const taskRefs = [{ taskId: 'task-1', displayId: 'task-1', teamName: 'team-a' }];
const record = {
id: 'opencode-prompt:work-sync-proof-missing',
teamName: 'team-a',
memberName: 'atlas',
laneId: 'secondary:opencode:atlas',
runId: 'run-1',
runtimeSessionId: 'ses-1',
inboxMessageId: 'msg-work-sync-proof-missing',
inboxTimestamp: '2026-05-18T08:31:00.000Z',
source: 'watcher',
messageKind: 'member_work_sync_nudge',
workSyncIntent: 'agenda_sync',
replyRecipient: 'team-lead',
actionMode: 'do',
taskRefs,
payloadHash: 'sha256:work-sync',
status: 'retry_scheduled',
responseState: 'responded_non_visible_tool',
attempts: 3,
maxAttempts: 3,
acceptanceUnknown: false,
nextAttemptAt: null,
lastAttemptAt: '2026-05-18T08:31:30.000Z',
lastObservedAt: '2026-05-18T08:31:45.000Z',
acceptedAt: '2026-05-18T08:31:30.000Z',
respondedAt: '2026-05-18T08:31:45.000Z',
failedAt: null,
inboxReadCommittedAt: null,
inboxReadCommitError: null,
prePromptCursor: null,
postPromptCursor: null,
deliveredUserMessageId: 'delivered-1',
observedAssistantMessageId: 'assistant-1',
observedAssistantPreview: null,
observedToolCallNames: ['member_work_sync_status'],
observedVisibleMessageId: null,
visibleReplyMessageId: null,
visibleReplyInbox: null,
visibleReplyCorrelation: null,
lastReason: 'member_work_sync_report_required',
diagnostics: ['member_work_sync_report_required'],
createdAt: '2026-05-18T08:31:00.000Z',
updatedAt: '2026-05-18T08:31:45.000Z',
};
const failedRecord = {
...record,
status: 'failed_terminal',
failedAt: '2026-05-18T08:32:00.000Z',
updatedAt: '2026-05-18T08:32:00.000Z',
};
const ledger = {
markFailedTerminal: vi.fn(async () => failedRecord),
markNextAttemptScheduled: vi.fn(),
};
const harness = svc as unknown as {
scheduleOpenCodePromptDeliveryWatchdog: ReturnType<typeof vi.fn>;
logOpenCodePromptDeliveryEvent: ReturnType<typeof vi.fn>;
scheduleOpenCodePromptLedgerFollowUp(input: {
ledger: typeof ledger;
ledgerRecord: typeof record;
teamName: string;
memberName: string;
retry: boolean;
reason: string;
}): Promise<typeof failedRecord>;
};
harness.scheduleOpenCodePromptDeliveryWatchdog = vi.fn();
harness.logOpenCodePromptDeliveryEvent = vi.fn();
const nextRecord = await harness.scheduleOpenCodePromptLedgerFollowUp({
ledger,
ledgerRecord: record,
teamName: 'team-a',
memberName: 'atlas',
retry: true,
reason: 'member_work_sync_report_required',
});
expect(nextRecord).toBe(failedRecord);
expect(ledger.markFailedTerminal).toHaveBeenCalledWith(
expect.objectContaining({
id: record.id,
reason: 'member_work_sync_report_required',
})
);
expect(harness.logOpenCodePromptDeliveryEvent).toHaveBeenCalledWith(
'opencode_prompt_delivery_terminal_failure',
failedRecord,
expect.objectContaining({
reason: 'member_work_sync_report_required',
retry: true,
})
);
});
it('uses stamped OpenCode session-refresh evidence instead of stale historical diagnostics', async () => {
const svc = new TeamProvisioningService();
(svc as any).scheduleOpenCodePromptDeliveryWatchdog = vi.fn();
@ -16725,8 +16821,7 @@ describe('TeamProvisioningService', () => {
return launchIdentity;
});
(svc as any).buildTeamRuntimeLaunchArgsPlan = vi.fn(async (input) => ({
fastModeArgs:
input.launchIdentity === launchIdentity ? ['--test-codex-fast-mode'] : [],
fastModeArgs: input.launchIdentity === launchIdentity ? ['--test-codex-fast-mode'] : [],
runtimeTurnSettledHookArgs: [],
providerArgs: [],
settingsArgs: [],
@ -22260,9 +22355,10 @@ describe('TeamProvisioningService', () => {
expect(bobOutcome).toBeNull();
// The transcript tail is parsed once and shared: a single cache entry for the
// file rather than one parse per member.
expect((svc as unknown as Record<string, Map<string, unknown>>).parsedBootstrapTranscriptTailCache.size).toBe(
1
);
expect(
(svc as unknown as Record<string, Map<string, unknown>>).parsedBootstrapTranscriptTailCache
.size
).toBe(1);
});
it('caches persisted bootstrap transcript outcome lookup between close polling reads', async () => {
@ -24523,12 +24619,10 @@ describe('TeamProvisioningService', () => {
scheduled: true,
reason: 'scheduled',
}));
const sendMessageToRun = vi.fn(
async (targetRun: LeadRelayPriorityTestRun, message: string) => {
deliveredPrompt = message;
targetRun.leadRelayCapture?.resolveOnce('');
}
);
const sendMessageToRun = vi.fn(async (targetRun: LeadRelayPriorityTestRun, message: string) => {
deliveredPrompt = message;
targetRun.leadRelayCapture?.resolveOnce('');
});
harness.runs.set(run.runId, run);
harness.aliveRunByTeam.set(teamName, run.runId);
@ -25854,23 +25948,22 @@ describe('TeamProvisioningService', () => {
it('does not keep healed confirmed-bootstrap status alive when refreshed runtime metadata is an error', async () => {
const svc = new TeamProvisioningService();
const harness = privateHarness(svc);
harness.getLiveTeamAgentRuntimeMetadata = vi.fn(
() =>
Promise.resolve(
new Map([
[
'tom',
{
alive: false,
model: 'sonnet',
livenessKind: 'not_found',
pidSource: 'process_table',
runtimeDiagnostic: 'Runtime process crashed',
runtimeDiagnosticSeverity: 'error',
},
],
])
)
harness.getLiveTeamAgentRuntimeMetadata = vi.fn(() =>
Promise.resolve(
new Map([
[
'tom',
{
alive: false,
model: 'sonnet',
livenessKind: 'not_found',
pidSource: 'process_table',
runtimeDiagnostic: 'Runtime process crashed',
runtimeDiagnosticSeverity: 'error',
},
],
])
)
);
const result = await harness.attachLiveRuntimeMetadataToStatuses('signal-ops', {

View file

@ -3445,6 +3445,9 @@ Messages:
},
]);
const deliverSpy = vi.spyOn(service, 'deliverOpenCodeMemberMessage');
const logSpy = vi
.spyOn(service as any, 'logOpenCodePromptDeliveryEvent')
.mockImplementation(() => undefined);
const relay = await service.relayOpenCodeMemberInboxMessages(teamName, 'jack');
const expectedReason = 'opencode_inbox_attachment_payload_unavailable: att-1';
@ -3469,6 +3472,18 @@ Messages:
status: 'failed_terminal',
lastReason: expectedReason,
});
expect(logSpy).toHaveBeenCalledWith(
'opencode_prompt_delivery_terminal_failure',
expect.objectContaining({
inboxMessageId: 'opencode-attachment-1',
status: 'failed_terminal',
lastReason: expectedReason,
}),
expect.objectContaining({
attachmentPayloadUnavailable: true,
reason: expectedReason,
})
);
});
it('rebuilds missing OpenCode prompt ledger rows from unread inbox on startup scan', async () => {
@ -3721,6 +3736,101 @@ Messages:
}
});
it('keeps an already-read work-sync nudge pending when it is queued behind an active relay', async () => {
vi.useFakeTimers();
const service = new TeamProvisioningService();
const teamName = 'my-team';
try {
hoisted.files.set(
`/mock/teams/${teamName}/config.json`,
JSON.stringify({
name: teamName,
projectPath: '/tmp/my-team',
members: [
{ name: 'team-lead', agentType: 'team-lead' },
{ name: 'jack', role: 'developer', providerId: 'opencode', model: 'openrouter/test' },
],
})
);
seedMemberInbox(teamName, 'jack', [
{
from: 'bob',
to: 'jack',
text: 'Older watcher message.',
timestamp: '2026-02-23T17:00:00.000Z',
read: false,
messageId: 'opencode-inflight-old',
},
]);
const oldDeliveryStarted = createDeferred<void>();
const releaseOldDelivery = createDeferred<void>();
vi.spyOn(service, 'deliverOpenCodeMemberMessage').mockImplementation(
async (_teamName, input) => {
if (input.messageId === 'opencode-inflight-old') {
oldDeliveryStarted.resolve(undefined);
await releaseOldDelivery.promise;
}
return { delivered: true, diagnostics: [] };
}
);
const wakeSpy = vi
.spyOn(service, 'scheduleOpenCodeMemberInboxDeliveryWake')
.mockImplementation(() => undefined);
const watcherRelay = service.relayOpenCodeMemberInboxMessages(teamName, 'jack');
await oldDeliveryStarted.promise;
seedMemberInbox(teamName, 'jack', [
{
from: 'bob',
to: 'jack',
text: 'Older watcher message.',
timestamp: '2026-02-23T17:00:00.000Z',
read: false,
messageId: 'opencode-inflight-old',
},
{
from: 'system',
to: 'jack',
text: 'Call member_work_sync_status, then member_work_sync_report.',
timestamp: '2026-02-23T17:00:01.000Z',
read: true,
messageId: 'work-sync-read-queued',
messageKind: 'member_work_sync_nudge',
workSyncIntent: 'agenda_sync',
},
]);
await expect(
service.relayOpenCodeMemberInboxMessages(teamName, 'jack', {
onlyMessageId: 'work-sync-read-queued',
source: 'watchdog',
})
).resolves.toMatchObject({
attempted: 1,
delivered: 0,
failed: 0,
lastDelivery: {
delivered: true,
accepted: false,
responsePending: true,
reason: 'opencode_work_sync_read_commit_waiting_for_active_relay',
},
});
expect(wakeSpy).toHaveBeenCalledWith({
teamName,
memberName: 'jack',
messageId: 'work-sync-read-queued',
delayMs: 500,
});
releaseOldDelivery.resolve(undefined);
await watcherRelay;
} finally {
vi.useRealTimers();
}
});
it('treats an already-read specific OpenCode inbox row as delivered for UI-send relay', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';
@ -3762,6 +3872,68 @@ Messages:
expect(deliverSpy).not.toHaveBeenCalled();
});
it('does not treat an already-read work-sync nudge as delivered without the work-sync proof path', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';
hoisted.files.set(
`/mock/teams/${teamName}/config.json`,
JSON.stringify({
name: teamName,
projectPath: '/tmp/my-team',
members: [
{ name: 'team-lead', agentType: 'team-lead' },
{ name: 'jack', role: 'developer', providerId: 'opencode', model: 'openrouter/test' },
],
})
);
seedMemberInbox(teamName, 'jack', [
{
from: 'system',
to: 'jack',
text: 'Call member_work_sync_status, then member_work_sync_report.',
timestamp: '2026-02-23T17:02:00.000Z',
read: true,
messageId: 'work-sync-read-1',
messageKind: 'member_work_sync_nudge',
workSyncIntent: 'agenda_sync',
taskRefs: [{ taskId: 'task-1', teamName }],
},
]);
const deliverSpy = vi.spyOn(service, 'deliverOpenCodeMemberMessage').mockResolvedValue({
delivered: true,
accepted: false,
responsePending: true,
reason: 'member_work_sync_report_required',
diagnostics: ['member_work_sync_report_required'],
});
const relay = await service.relayOpenCodeMemberInboxMessages(teamName, 'jack', {
onlyMessageId: 'work-sync-read-1',
source: 'watchdog',
});
expect(deliverSpy).toHaveBeenCalledWith(
teamName,
expect.objectContaining({
memberName: 'jack',
messageId: 'work-sync-read-1',
messageKind: 'member_work_sync_nudge',
workSyncIntent: 'agenda_sync',
})
);
expect(relay).toMatchObject({
attempted: 1,
delivered: 0,
failed: 0,
lastDelivery: {
delivered: true,
accepted: false,
responsePending: true,
reason: 'member_work_sync_report_required',
},
});
});
it('routes watcher inbox changes for OpenCode members through direct runtime relay', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';
@ -4357,7 +4529,10 @@ Messages:
],
})
);
hoisted.files.set(`${teamsBasePath}/${teamName}/inboxes/${memberName}.json`, JSON.stringify([]));
hoisted.files.set(
`${teamsBasePath}/${teamName}/inboxes/${memberName}.json`,
JSON.stringify([])
);
(service as any).resolveOpenCodeMemberDeliveryIdentity = vi.fn(async () => ({
ok: true,
canonicalMemberName: memberName,
@ -4415,7 +4590,10 @@ Messages:
],
})
);
hoisted.files.set(`${teamsBasePath}/${teamName}/inboxes/${memberName}.json`, JSON.stringify([]));
hoisted.files.set(
`${teamsBasePath}/${teamName}/inboxes/${memberName}.json`,
JSON.stringify([])
);
(service as any).resolveOpenCodeMemberDeliveryIdentity = vi.fn(async () => ({
ok: true,
canonicalMemberName: memberName,