feat(member-work-sync): add durable nudge outbox store

This commit is contained in:
777genius 2026-04-29 15:15:42 +03:00
parent c5a97fd796
commit ec7935e593
6 changed files with 465 additions and 2 deletions

View file

@ -34,6 +34,7 @@ Current implementation note:
- Phase 1 is intentionally shadow-only: it computes agendas, fingerprints, report tokens, reports, persisted status, passive queue reconciliation, startup replay, diagnostics, metrics, and a neutral read-only member details surface.
- Phase 1 does not insert inbox messages, send nudges, mark tasks/messages read, or change `TeamTaskStallMonitor` semantics.
- Phase 1.5 exposes a machine-readable `phase2Readiness` assessment from shadow metrics. It can say `collecting_shadow_data`, `blocked`, or `shadow_ready`; it still does not dispatch nudges.
- Phase 2 storage foundation is implemented as an inert durable outbox: idempotency key, payload hash conflict checks, claim generation fencing, retry/terminal states. It is not wired to dispatch inbox nudges yet.
- Phase 2 must not start until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low.
Patterns used:

View file

@ -205,3 +205,96 @@ export interface MemberWorkSyncStatusRequest {
export interface MemberWorkSyncMetricsRequest {
teamName: string;
}
export type MemberWorkSyncOutboxStatus =
| 'pending'
| 'claimed'
| 'delivered'
| 'superseded'
| 'failed_retryable'
| 'failed_terminal';
export interface MemberWorkSyncNudgePayload {
from: 'system';
to: string;
messageKind: 'member_work_sync_nudge';
source: 'member-work-sync';
actionMode: 'do';
text: string;
taskRefs: Array<{
taskId: string;
displayId: string;
teamName: string;
}>;
}
export interface MemberWorkSyncOutboxItem {
id: string;
teamName: string;
memberName: string;
agendaFingerprint: string;
payloadHash: string;
payload: MemberWorkSyncNudgePayload;
status: MemberWorkSyncOutboxStatus;
attemptGeneration: number;
claimedBy?: string;
claimedAt?: string;
deliveredMessageId?: string;
lastError?: string;
nextAttemptAt?: string;
createdAt: string;
updatedAt: string;
}
export type MemberWorkSyncOutboxEnsureResult =
| { ok: true; outcome: 'created' | 'existing'; item: MemberWorkSyncOutboxItem }
| {
ok: false;
outcome: 'payload_conflict';
item: MemberWorkSyncOutboxItem;
existingPayloadHash: string;
requestedPayloadHash: string;
};
export interface MemberWorkSyncOutboxEnsureInput {
id: string;
teamName: string;
memberName: string;
agendaFingerprint: string;
payloadHash: string;
payload: MemberWorkSyncNudgePayload;
nowIso: string;
nextAttemptAt?: string;
}
export interface MemberWorkSyncOutboxClaimInput {
teamName: string;
claimedBy: string;
nowIso: string;
limit: number;
}
export interface MemberWorkSyncOutboxMarkDeliveredInput {
teamName: string;
id: string;
attemptGeneration: number;
deliveredMessageId: string;
nowIso: string;
}
export interface MemberWorkSyncOutboxMarkSupersededInput {
teamName: string;
id: string;
reason: string;
nowIso: string;
}
export interface MemberWorkSyncOutboxMarkFailedInput {
teamName: string;
id: string;
attemptGeneration: number;
error: string;
retryable: boolean;
nowIso: string;
nextAttemptAt?: string;
}

View file

@ -2,6 +2,13 @@ import type {
MemberWorkSyncAgenda,
MemberWorkSyncTeamMetrics,
MemberWorkSyncProviderId,
MemberWorkSyncOutboxClaimInput,
MemberWorkSyncOutboxEnsureInput,
MemberWorkSyncOutboxEnsureResult,
MemberWorkSyncOutboxItem,
MemberWorkSyncOutboxMarkDeliveredInput,
MemberWorkSyncOutboxMarkFailedInput,
MemberWorkSyncOutboxMarkSupersededInput,
MemberWorkSyncReport,
MemberWorkSyncReportIntent,
MemberWorkSyncReportIntentStatus,
@ -87,12 +94,21 @@ export interface MemberWorkSyncReportStorePort {
): Promise<void>;
}
export interface MemberWorkSyncOutboxStorePort {
ensurePending(input: MemberWorkSyncOutboxEnsureInput): Promise<MemberWorkSyncOutboxEnsureResult>;
claimDue(input: MemberWorkSyncOutboxClaimInput): Promise<MemberWorkSyncOutboxItem[]>;
markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise<void>;
markSuperseded(input: MemberWorkSyncOutboxMarkSupersededInput): Promise<void>;
markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise<void>;
}
export interface MemberWorkSyncUseCaseDeps {
clock: MemberWorkSyncClockPort;
hash: MemberWorkSyncHashPort;
agendaSource: MemberWorkSyncAgendaSourcePort;
statusStore: MemberWorkSyncStatusStorePort;
reportStore?: MemberWorkSyncReportStorePort;
outboxStore?: MemberWorkSyncOutboxStorePort;
reportToken?: MemberWorkSyncReportTokenPort;
lifecycle?: MemberWorkSyncLifecyclePort;
logger?: MemberWorkSyncLoggerPort;

View file

@ -5,6 +5,13 @@ import { mkdir, readFile, rename } from 'fs/promises';
import { withFileLock } from '@main/services/team/fileLock';
import type {
MemberWorkSyncMetricEvent,
MemberWorkSyncOutboxClaimInput,
MemberWorkSyncOutboxEnsureInput,
MemberWorkSyncOutboxEnsureResult,
MemberWorkSyncOutboxItem,
MemberWorkSyncOutboxMarkDeliveredInput,
MemberWorkSyncOutboxMarkFailedInput,
MemberWorkSyncOutboxMarkSupersededInput,
MemberWorkSyncReportIntent,
MemberWorkSyncReportRequest,
MemberWorkSyncStatus,
@ -13,6 +20,7 @@ import type {
} from '../../contracts';
import { assessMemberWorkSyncPhase2Readiness } from '../../core/domain';
import type {
MemberWorkSyncOutboxStorePort,
MemberWorkSyncReportStorePort,
MemberWorkSyncStatusStorePort,
} from '../../core/application';
@ -31,6 +39,11 @@ interface PendingReportFile {
intents: Record<string, MemberWorkSyncReportIntent>;
}
interface OutboxFile {
schemaVersion: 1;
items: Record<string, MemberWorkSyncOutboxItem>;
}
function normalizeMemberKey(memberName: string): string {
return memberName.trim().toLowerCase();
}
@ -68,6 +81,31 @@ function isPendingReportFile(value: unknown): value is PendingReportFile {
);
}
function isOutboxFile(value: unknown): value is OutboxFile {
return (
value != null &&
typeof value === 'object' &&
(value as OutboxFile).schemaVersion === 1 &&
(value as OutboxFile).items != null &&
typeof (value as OutboxFile).items === 'object' &&
!Array.isArray((value as OutboxFile).items)
);
}
function isOutboxTerminal(status: MemberWorkSyncOutboxItem['status']): boolean {
return status === 'delivered' || status === 'superseded' || status === 'failed_terminal';
}
function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boolean {
if (item.status !== 'pending' && item.status !== 'failed_retryable') {
return false;
}
if (!item.nextAttemptAt) {
return true;
}
return item.nextAttemptAt <= nowIso;
}
function stableStringify(value: unknown): string {
if (value == null || typeof value !== 'object') {
return JSON.stringify(value);
@ -194,7 +232,10 @@ async function quarantineFile(filePath: string): Promise<void> {
}
export class JsonMemberWorkSyncStore
implements MemberWorkSyncStatusStorePort, MemberWorkSyncReportStorePort
implements
MemberWorkSyncStatusStorePort,
MemberWorkSyncReportStorePort,
MemberWorkSyncOutboxStorePort
{
private readonly writeQueues = new Map<string, Promise<void>>();
@ -317,6 +358,161 @@ export class JsonMemberWorkSyncStore
});
}
async ensurePending(
input: MemberWorkSyncOutboxEnsureInput
): Promise<MemberWorkSyncOutboxEnsureResult> {
let result: MemberWorkSyncOutboxEnsureResult | null = null;
await this.enqueue(input.teamName, async () => {
await withFileLock(this.paths.getOutboxPath(input.teamName), async () => {
const existing = await this.readOutboxFile(input.teamName);
const current = existing.items[input.id];
if (current) {
if (current.payloadHash !== input.payloadHash) {
result = {
ok: false,
outcome: 'payload_conflict',
item: current,
existingPayloadHash: current.payloadHash,
requestedPayloadHash: input.payloadHash,
};
return;
}
if (!isOutboxTerminal(current.status) && current.status !== 'pending') {
const next: MemberWorkSyncOutboxItem = {
...current,
status: 'pending',
updatedAt: input.nowIso,
};
const nextAttemptAt = input.nextAttemptAt ?? current.nextAttemptAt;
if (nextAttemptAt) {
next.nextAttemptAt = nextAttemptAt;
} else {
delete next.nextAttemptAt;
}
delete next.claimedBy;
delete next.claimedAt;
delete next.lastError;
existing.items[input.id] = next;
await this.writeOutboxFile(input.teamName, existing);
result = { ok: true, outcome: 'existing', item: existing.items[input.id] };
return;
}
result = { ok: true, outcome: 'existing', item: current };
return;
}
const item: MemberWorkSyncOutboxItem = {
id: input.id,
teamName: input.teamName,
memberName: input.memberName,
agendaFingerprint: input.agendaFingerprint,
payloadHash: input.payloadHash,
payload: input.payload,
status: 'pending',
attemptGeneration: 0,
...(input.nextAttemptAt ? { nextAttemptAt: input.nextAttemptAt } : {}),
createdAt: input.nowIso,
updatedAt: input.nowIso,
};
existing.items[input.id] = item;
await this.writeOutboxFile(input.teamName, existing);
result = { ok: true, outcome: 'created', item };
});
});
if (!result) {
throw new Error('Member work sync outbox write did not produce a result');
}
return result;
}
async claimDue(input: MemberWorkSyncOutboxClaimInput): Promise<MemberWorkSyncOutboxItem[]> {
const claimed: MemberWorkSyncOutboxItem[] = [];
await this.enqueue(input.teamName, async () => {
await withFileLock(this.paths.getOutboxPath(input.teamName), async () => {
const existing = await this.readOutboxFile(input.teamName);
const due = Object.values(existing.items)
.filter((item) => canClaimOutboxItem(item, input.nowIso))
.sort((left, right) => {
const leftTime = left.nextAttemptAt ?? left.updatedAt;
const rightTime = right.nextAttemptAt ?? right.updatedAt;
return leftTime.localeCompare(rightTime);
})
.slice(0, Math.max(0, input.limit));
for (const item of due) {
const next: MemberWorkSyncOutboxItem = {
...item,
status: 'claimed',
attemptGeneration: item.attemptGeneration + 1,
claimedBy: input.claimedBy,
claimedAt: input.nowIso,
updatedAt: input.nowIso,
};
delete next.lastError;
existing.items[item.id] = next;
claimed.push(next);
}
if (due.length > 0) {
await this.writeOutboxFile(input.teamName, existing);
}
});
});
return claimed;
}
async markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise<void> {
await this.updateOutboxItem(input.teamName, input.id, (current) => {
if (!current || current.attemptGeneration !== input.attemptGeneration) {
return current;
}
const next: MemberWorkSyncOutboxItem = {
...current,
status: 'delivered',
deliveredMessageId: input.deliveredMessageId,
updatedAt: input.nowIso,
};
delete next.lastError;
return next;
});
}
async markSuperseded(input: MemberWorkSyncOutboxMarkSupersededInput): Promise<void> {
await this.updateOutboxItem(input.teamName, input.id, (current) => {
if (!current || isOutboxTerminal(current.status)) {
return current;
}
return {
...current,
status: 'superseded',
lastError: input.reason,
updatedAt: input.nowIso,
};
});
}
async markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise<void> {
await this.updateOutboxItem(input.teamName, input.id, (current) => {
if (!current || current.attemptGeneration !== input.attemptGeneration) {
return current;
}
const next: MemberWorkSyncOutboxItem = {
...current,
status: input.retryable ? 'failed_retryable' : 'failed_terminal',
lastError: input.error,
...(input.retryable && input.nextAttemptAt ? { nextAttemptAt: input.nextAttemptAt } : {}),
updatedAt: input.nowIso,
};
if (!input.retryable) {
delete next.nextAttemptAt;
}
return next;
});
}
private async readFile(teamName: string): Promise<StoreFile> {
const filePath = this.paths.getStatusPath(teamName);
try {
@ -351,6 +547,48 @@ export class JsonMemberWorkSyncStore
return { schemaVersion: 1, intents: {} };
}
private async readOutboxFile(teamName: string): Promise<OutboxFile> {
const filePath = this.paths.getOutboxPath(teamName);
try {
const raw = await readFile(filePath, 'utf8');
const parsed = JSON.parse(raw);
if (isOutboxFile(parsed)) {
return parsed;
}
await quarantineFile(filePath);
} catch (error) {
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
await quarantineFile(filePath);
}
}
return { schemaVersion: 1, items: {} };
}
private async writeOutboxFile(teamName: string, file: OutboxFile): Promise<void> {
await mkdir(this.paths.getTeamDir(teamName), { recursive: true });
await atomicWriteAsync(this.paths.getOutboxPath(teamName), JSON.stringify(file, null, 2));
}
private async updateOutboxItem(
teamName: string,
id: string,
updater: (
current: MemberWorkSyncOutboxItem | undefined
) => MemberWorkSyncOutboxItem | undefined
): Promise<void> {
await this.enqueue(teamName, async () => {
await withFileLock(this.paths.getOutboxPath(teamName), async () => {
const existing = await this.readOutboxFile(teamName);
const next = updater(existing.items[id]);
if (!next) {
return;
}
existing.items[id] = next;
await this.writeOutboxFile(teamName, existing);
});
});
}
private async writePendingFile(teamName: string, file: PendingReportFile): Promise<void> {
await mkdir(this.paths.getTeamDir(teamName), { recursive: true });
await atomicWriteAsync(

View file

@ -15,6 +15,10 @@ export class MemberWorkSyncStorePaths {
return join(this.getTeamDir(teamName), 'pending-reports.json');
}
getOutboxPath(teamName: string): string {
return join(this.getTeamDir(teamName), 'outbox.json');
}
getReportTokenSecretPath(teamName: string): string {
return join(this.getTeamDir(teamName), 'report-token-secret.json');
}

View file

@ -5,7 +5,10 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { JsonMemberWorkSyncStore } from '@features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore';
import { MemberWorkSyncStorePaths } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths';
import type { MemberWorkSyncStatus } from '@features/member-work-sync/contracts';
import type {
MemberWorkSyncNudgePayload,
MemberWorkSyncStatus,
} from '@features/member-work-sync/contracts';
function makeStatus(overrides: Partial<MemberWorkSyncStatus>): MemberWorkSyncStatus {
return {
@ -42,6 +45,19 @@ function makeStatus(overrides: Partial<MemberWorkSyncStatus>): MemberWorkSyncSta
};
}
function makeNudgePayload(overrides: Partial<MemberWorkSyncNudgePayload> = {}): MemberWorkSyncNudgePayload {
return {
from: 'system',
to: 'bob',
messageKind: 'member_work_sync_nudge',
source: 'member-work-sync',
actionMode: 'do',
text: 'Work sync check: continue the current task or report a blocker.',
taskRefs: [{ teamName: 'team-a', taskId: 'task-1', displayId: '11111111' }],
...overrides,
};
}
describe('JsonMemberWorkSyncStore', () => {
let root: string;
let store: JsonMemberWorkSyncStore;
@ -152,4 +168,99 @@ describe('JsonMemberWorkSyncStore', () => {
]),
});
});
it('deduplicates outbox items by id and rejects payload hash conflicts', async () => {
const input = {
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
teamName: 'team-a',
memberName: 'bob',
agendaFingerprint: 'agenda:v1:abc',
payloadHash: 'hash-a',
payload: makeNudgePayload(),
nowIso: '2026-04-29T00:00:00.000Z',
};
await expect(store.ensurePending(input)).resolves.toMatchObject({
ok: true,
outcome: 'created',
item: { status: 'pending', attemptGeneration: 0 },
});
await expect(store.ensurePending(input)).resolves.toMatchObject({
ok: true,
outcome: 'existing',
});
await expect(store.ensurePending({ ...input, payloadHash: 'hash-b' })).resolves.toMatchObject({
ok: false,
outcome: 'payload_conflict',
existingPayloadHash: 'hash-a',
requestedPayloadHash: 'hash-b',
});
});
it('claims due outbox items and fences terminal updates by attempt generation', async () => {
const input = {
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
teamName: 'team-a',
memberName: 'bob',
agendaFingerprint: 'agenda:v1:abc',
payloadHash: 'hash-a',
payload: makeNudgePayload(),
nowIso: '2026-04-29T00:00:00.000Z',
};
await store.ensurePending(input);
const claimed = await store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-a',
nowIso: '2026-04-29T00:01:00.000Z',
limit: 5,
});
expect(claimed).toHaveLength(1);
expect(claimed[0]).toMatchObject({
id: input.id,
status: 'claimed',
attemptGeneration: 1,
claimedBy: 'dispatcher-a',
});
await store.markDelivered({
teamName: 'team-a',
id: input.id,
attemptGeneration: 0,
deliveredMessageId: 'wrong-generation',
nowIso: '2026-04-29T00:02:00.000Z',
});
await expect(
store.ensurePending({
...input,
nowIso: '2026-04-29T00:03:00.000Z',
})
).resolves.toMatchObject({
ok: true,
item: { status: 'pending', attemptGeneration: 1 },
});
const claimedAgain = await store.claimDue({
teamName: 'team-a',
claimedBy: 'dispatcher-a',
nowIso: '2026-04-29T00:04:00.000Z',
limit: 1,
});
await store.markDelivered({
teamName: 'team-a',
id: input.id,
attemptGeneration: claimedAgain[0].attemptGeneration,
deliveredMessageId: 'message-1',
nowIso: '2026-04-29T00:05:00.000Z',
});
const file = JSON.parse(
await readFile(join(root, 'team-a', '.member-work-sync', 'outbox.json'), 'utf8')
);
expect(file.items[input.id]).toMatchObject({
status: 'delivered',
deliveredMessageId: 'message-1',
attemptGeneration: 2,
});
});
});