From 253ddf293db73afa4aeedc4a203ad055f659ce5a Mon Sep 17 00:00:00 2001 From: 777genius Date: Wed, 29 Apr 2026 14:02:49 +0300 Subject: [PATCH] feat(member-work-sync): replay pending report intents --- .../src/internal/workSync.js | 146 +++++++++++++++-- .../test/controller.test.js | 68 ++++++++ .../member-work-sync/contracts/types.ts | 14 ++ ...mberWorkSyncPendingReportIntentReplayer.ts | 81 ++++++++++ .../application/MemberWorkSyncReporter.ts | 15 +- .../core/application/index.ts | 1 + .../core/application/ports.ts | 8 + .../createMemberWorkSyncFeature.ts | 22 +++ .../infrastructure/JsonMemberWorkSyncStore.ts | 152 ++++++++++++++++-- .../MemberWorkSyncStorePaths.ts | 2 +- src/main/index.ts | 10 +- .../core/MemberWorkSyncUseCases.test.ts | 82 +++++++++- .../main/JsonMemberWorkSyncStore.test.ts | 72 +++++++++ 13 files changed, 630 insertions(+), 43 deletions(-) create mode 100644 src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts create mode 100644 test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts diff --git a/agent-teams-controller/src/internal/workSync.js b/agent-teams-controller/src/internal/workSync.js index 43c38c90..63036fc6 100644 --- a/agent-teams-controller/src/internal/workSync.js +++ b/agent-teams-controller/src/internal/workSync.js @@ -1,5 +1,6 @@ const fs = require('fs'); const path = require('path'); +const crypto = require('crypto'); const runtimeHelpers = require('./runtimeHelpers.js'); const DEFAULT_WAIT_TIMEOUT_MS = 10000; @@ -66,7 +67,9 @@ async function requestJson(baseUrl, pathname, options = {}) { payload && typeof payload.error === 'string' && payload.error.trim() ? payload.error.trim() : `${response.status} ${response.statusText}`.trim(); - throw new Error(detail || 'Team control API request failed'); + const error = new Error(detail || 'Team control API request failed'); + error.controlApiStatus = response.status; + throw error; } return payload; } finally { @@ -80,6 +83,9 @@ async function requestJsonWithFallback(baseUrls, pathname, options = {}) { try { return await requestJson(baseUrl, pathname, options); } catch (error) { + if (error && error.controlApiStatus) { + throw error; + } lastError = error; } } @@ -103,6 +109,111 @@ function compactReportBody(context, memberName, flags = {}) { }; } +function stableStringify(value) { + if (value == null || typeof value !== 'object') { + return JSON.stringify(value); + } + if (Array.isArray(value)) { + return `[${value.map(stableStringify).join(',')}]`; + } + return `{${Object.keys(value) + .sort() + .map((key) => `${JSON.stringify(key)}:${stableStringify(value[key])}`) + .join(',')}}`; +} + +function buildPendingIntentId(body) { + const taskIds = Array.isArray(body.taskIds) + ? Array.from(new Set(body.taskIds.map((taskId) => String(taskId)).filter(Boolean))).sort() + : []; + const payload = { + teamName: body.teamName, + memberName: String(body.memberName || '').trim().toLowerCase(), + state: body.state, + agendaFingerprint: body.agendaFingerprint, + reportToken: body.reportToken || '', + ...(taskIds.length > 0 ? { taskIds } : {}), + ...(body.note ? { note: body.note } : {}), + ...(body.leaseTtlMs ? { leaseTtlMs: body.leaseTtlMs } : {}), + ...(body.source ? { source: body.source } : {}), + }; + return `member-work-sync-intent:${crypto + .createHash('sha256') + .update(stableStringify(payload)) + .digest('hex')}`; +} + +function readPendingReportFile(filePath) { + try { + const parsed = JSON.parse(fs.readFileSync(filePath, 'utf8')); + if ( + parsed && + typeof parsed === 'object' && + parsed.schemaVersion === 1 && + parsed.intents && + typeof parsed.intents === 'object' && + !Array.isArray(parsed.intents) + ) { + return parsed; + } + } catch (error) { + if (!error || error.code !== 'ENOENT') { + throw error; + } + } + return { schemaVersion: 1, intents: {} }; +} + +function writePendingReportFile(filePath, data) { + fs.mkdirSync(path.dirname(filePath), { recursive: true }); + const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`; + fs.writeFileSync(tempPath, `${JSON.stringify(data, null, 2)}\n`, 'utf8'); + fs.renameSync(tempPath, filePath); +} + +function appendPendingReportIntent(context, body, reason) { + const filePath = path.join(context.paths.teamDir, '.member-work-sync', 'pending-reports.json'); + const data = readPendingReportFile(filePath); + const request = { + ...body, + source: 'mcp', + }; + const id = buildPendingIntentId(request); + const current = data.intents[id]; + if (!current || current.status === 'pending') { + data.intents[id] = { + id, + teamName: body.teamName, + memberName: body.memberName, + request, + reason, + status: 'pending', + recordedAt: current && current.recordedAt ? current.recordedAt : new Date().toISOString(), + }; + writePendingReportFile(filePath, data); + } + return { + accepted: false, + pendingValidation: true, + code: 'pending_validation', + message: + 'Member work sync report was recorded for app validation. Continue concrete task work; do not treat this as a confirmed lease yet.', + intentId: id, + }; +} + +function assertReportBody(body) { + if (!body.state || !['still_working', 'blocked', 'caught_up'].includes(body.state)) { + throw new Error('state must be still_working, blocked, or caught_up'); + } + if (!body.agendaFingerprint) { + throw new Error('agendaFingerprint is required'); + } + if (!body.reportToken) { + throw new Error('reportToken is required'); + } +} + async function memberWorkSyncStatus(context, flags = {}) { const memberName = runtimeHelpers.assertExplicitTeamMemberName( context.paths, @@ -125,16 +236,31 @@ async function memberWorkSyncReport(context, flags = {}) { flags.memberName || flags.member || flags.from, 'member work sync report member' ); - const baseUrls = resolveControlBaseUrls(context, flags); - return requestJsonWithFallback( - baseUrls, - `/api/teams/${encodeURIComponent(context.teamName)}/member-work-sync/report`, - { - method: 'POST', - body: compactReportBody(context, memberName, flags), - timeoutMs: normalizeTimeoutMs(flags.waitTimeoutMs || flags['wait-timeout-ms']), + const body = compactReportBody(context, memberName, flags); + assertReportBody(body); + + const pathname = `/api/teams/${encodeURIComponent(context.teamName)}/member-work-sync/report`; + const options = { + method: 'POST', + body, + timeoutMs: normalizeTimeoutMs(flags.waitTimeoutMs || flags['wait-timeout-ms']), + }; + + let baseUrls; + try { + baseUrls = resolveControlBaseUrls(context, flags); + } catch { + return appendPendingReportIntent(context, body, 'control_api_unavailable'); + } + + try { + return await requestJsonWithFallback(baseUrls, pathname, options); + } catch (error) { + if (error && error.controlApiStatus) { + throw error; } - ); + return appendPendingReportIntent(context, body, 'control_api_unavailable'); + } } module.exports = { diff --git a/agent-teams-controller/test/controller.test.js b/agent-teams-controller/test/controller.test.js index b3240e15..6a6a32b7 100644 --- a/agent-teams-controller/test/controller.test.js +++ b/agent-teams-controller/test/controller.test.js @@ -2329,6 +2329,74 @@ describe('agent-teams-controller API', () => { } }); + it('records member work sync report intents only when the app validator is unavailable', async () => { + const claudeDir = makeClaudeDir(); + const controller = createController({ teamName: 'my-team', claudeDir }); + + const pending = await controller.workSync.memberWorkSyncReport({ + memberName: 'bob', + state: 'still_working', + agendaFingerprint: 'agenda:v1:abc', + reportToken: 'wrs:v1.test.token', + taskIds: ['task-1'], + }); + + expect(pending.pendingValidation).toBe(true); + expect(pending.accepted).toBe(false); + + const intentFile = path.join( + claudeDir, + 'teams', + 'my-team', + '.member-work-sync', + 'pending-reports.json' + ); + const intents = JSON.parse(fs.readFileSync(intentFile, 'utf8')); + expect(Object.values(intents.intents)).toEqual([ + expect.objectContaining({ + teamName: 'my-team', + memberName: 'bob', + reason: 'control_api_unavailable', + status: 'pending', + request: expect.objectContaining({ + memberName: 'bob', + source: 'mcp', + reportToken: 'wrs:v1.test.token', + }), + }), + ]); + }); + + it('does not record pending work sync intents for app-side validation rejections', async () => { + const claudeDir = makeClaudeDir(); + const controller = createController({ teamName: 'my-team', claudeDir }); + + const server = await startControlServer(async () => ({ + statusCode: 400, + body: { error: 'stale_fingerprint' }, + })); + + try { + await expect( + controller.workSync.memberWorkSyncReport({ + controlUrl: server.baseUrl, + memberName: 'bob', + state: 'still_working', + agendaFingerprint: 'agenda:v1:stale', + reportToken: 'wrs:v1.test.token', + }) + ).rejects.toThrow('stale_fingerprint'); + + expect( + fs.existsSync( + path.join(claudeDir, 'teams', 'my-team', '.member-work-sync', 'pending-reports.json') + ) + ).toBe(false); + } finally { + await server.close(); + } + }); + it('prefers the published control endpoint over a stale env URL', async () => { const claudeDir = makeClaudeDir(); const controller = createController({ teamName: 'my-team', claudeDir }); diff --git a/src/features/member-work-sync/contracts/types.ts b/src/features/member-work-sync/contracts/types.ts index b3a7da68..e98fbe92 100644 --- a/src/features/member-work-sync/contracts/types.ts +++ b/src/features/member-work-sync/contracts/types.ts @@ -66,6 +66,20 @@ export interface MemberWorkSyncReport { rejectionCode?: string; } +export type MemberWorkSyncReportIntentStatus = 'pending' | 'accepted' | 'rejected' | 'superseded'; + +export interface MemberWorkSyncReportIntent { + id: string; + teamName: string; + memberName: string; + request: MemberWorkSyncReportRequest; + reason: string; + status: MemberWorkSyncReportIntentStatus; + recordedAt: string; + processedAt?: string; + resultCode?: string; +} + export interface MemberWorkSyncShadowDiagnostics { reconciledBy: 'request' | 'queue' | 'report'; wouldNudge: boolean; diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts b/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts new file mode 100644 index 00000000..bf6ae37c --- /dev/null +++ b/src/features/member-work-sync/core/application/MemberWorkSyncPendingReportIntentReplayer.ts @@ -0,0 +1,81 @@ +import type { MemberWorkSyncReportIntentStatus } from '../../contracts'; +import { MemberWorkSyncReporter } from './MemberWorkSyncReporter'; +import type { MemberWorkSyncUseCaseDeps } from './ports'; + +export interface MemberWorkSyncPendingReportReplaySummary { + processed: number; + accepted: number; + rejected: number; + superseded: number; +} + +function statusForResult(input: { + accepted: boolean; + code: string; +}): MemberWorkSyncReportIntentStatus { + if (input.accepted) { + return 'accepted'; + } + if (input.code === 'member_inactive' || input.code === 'team_runtime_inactive') { + return 'superseded'; + } + return 'rejected'; +} + +export class MemberWorkSyncPendingReportIntentReplayer { + private readonly reporter: MemberWorkSyncReporter; + + constructor(private readonly deps: MemberWorkSyncUseCaseDeps) { + this.reporter = new MemberWorkSyncReporter(deps); + } + + async replayTeam(teamName: string): Promise { + const store = this.deps.reportStore; + if (!store?.listPendingReports || !store.markPendingReportProcessed) { + return { processed: 0, accepted: 0, rejected: 0, superseded: 0 }; + } + + const intents = await store.listPendingReports(teamName); + const summary: MemberWorkSyncPendingReportReplaySummary = { + processed: 0, + accepted: 0, + rejected: 0, + superseded: 0, + }; + + for (const intent of intents) { + let status: MemberWorkSyncReportIntentStatus = 'rejected'; + let resultCode = 'replay_failed'; + try { + const result = await this.reporter.execute({ + ...intent.request, + source: intent.request.source ?? 'mcp', + }); + status = statusForResult(result); + resultCode = result.code; + } catch (error) { + this.deps.logger?.warn('member work sync pending report replay failed', { + teamName, + intentId: intent.id, + error: String(error), + }); + continue; + } + summary.processed += 1; + if (status === 'accepted') { + summary.accepted += 1; + } else if (status === 'superseded') { + summary.superseded += 1; + } else { + summary.rejected += 1; + } + await store.markPendingReportProcessed(teamName, intent.id, { + status, + resultCode, + processedAt: this.deps.clock.now().toISOString(), + }); + } + + return summary; + } +} diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts b/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts index f04e370f..3e401d5a 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts @@ -11,14 +11,6 @@ import { } from './MemberWorkSyncReconciler'; import type { MemberWorkSyncUseCaseDeps } from './ports'; -const TERMINAL_REPORT_REJECTION_CODES = new Set([ - 'reserved_or_invalid_member', - 'identity_mismatch', - 'member_inactive', - 'identity_untrusted', - 'invalid_report_token', -]); - export class MemberWorkSyncReporter { private readonly reconciler: MemberWorkSyncReconciler; @@ -29,9 +21,7 @@ export class MemberWorkSyncReporter { async execute(request: MemberWorkSyncReportRequest): Promise { const source = await this.deps.agendaSource.loadAgenda(request); const agenda = finalizeMemberWorkSyncAgenda(this.deps, source); - const nowIso = ( - request.reportedAt ? new Date(request.reportedAt) : this.deps.clock.now() - ).toISOString(); + const nowIso = this.deps.clock.now().toISOString(); const teamActive = this.deps.lifecycle ? await this.deps.lifecycle.isTeamActive(agenda.teamName) : true; @@ -63,9 +53,6 @@ export class MemberWorkSyncReporter { if (!validation.ok) { const status = await this.reconciler.execute(request); - if (!TERMINAL_REPORT_REJECTION_CODES.has(validation.code)) { - await this.deps.reportStore?.appendPendingReport?.(request, validation.code); - } return { accepted: false, code: validation.code, diff --git a/src/features/member-work-sync/core/application/index.ts b/src/features/member-work-sync/core/application/index.ts index ec75f2b2..6e5a81aa 100644 --- a/src/features/member-work-sync/core/application/index.ts +++ b/src/features/member-work-sync/core/application/index.ts @@ -1,4 +1,5 @@ export * from './MemberWorkSyncDiagnosticsReader'; +export * from './MemberWorkSyncPendingReportIntentReplayer'; export * from './MemberWorkSyncReconciler'; export * from './MemberWorkSyncReporter'; export * from './ports'; diff --git a/src/features/member-work-sync/core/application/ports.ts b/src/features/member-work-sync/core/application/ports.ts index 665fafb1..32aacc1f 100644 --- a/src/features/member-work-sync/core/application/ports.ts +++ b/src/features/member-work-sync/core/application/ports.ts @@ -2,6 +2,8 @@ import type { MemberWorkSyncAgenda, MemberWorkSyncProviderId, MemberWorkSyncReport, + MemberWorkSyncReportIntent, + MemberWorkSyncReportIntentStatus, MemberWorkSyncReportRequest, MemberWorkSyncStatus, } from '../../contracts'; @@ -75,6 +77,12 @@ export interface MemberWorkSyncStatusStorePort { export interface MemberWorkSyncReportStorePort { appendPendingReport?(request: MemberWorkSyncReportRequest, reason: string): Promise; + listPendingReports?(teamName: string): Promise; + markPendingReportProcessed?( + teamName: string, + id: string, + result: { status: MemberWorkSyncReportIntentStatus; resultCode: string; processedAt: string } + ): Promise; } export interface MemberWorkSyncUseCaseDeps { diff --git a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts index decdd310..ffdbf949 100644 --- a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts +++ b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts @@ -6,6 +6,8 @@ import type { } from '../../contracts'; import { MemberWorkSyncDiagnosticsReader, + MemberWorkSyncPendingReportIntentReplayer, + type MemberWorkSyncPendingReportReplaySummary, MemberWorkSyncReconciler, MemberWorkSyncReporter, type MemberWorkSyncReconcileContext, @@ -34,6 +36,7 @@ export interface MemberWorkSyncFeatureFacade { report(request: MemberWorkSyncReportRequest): Promise; noteTeamChange(event: TeamChangeEvent): void; enqueueStartupScan(teamNames: string[]): Promise; + replayPendingReports(teamNames: string[]): Promise; getQueueDiagnostics(): MemberWorkSyncQueueDiagnostics; dispose(): Promise; } @@ -73,6 +76,7 @@ export function createMemberWorkSyncFeature(deps: { const diagnosticsReader = new MemberWorkSyncDiagnosticsReader(useCaseDeps); const reporter = new MemberWorkSyncReporter(useCaseDeps); const reconciler = new MemberWorkSyncReconciler(useCaseDeps); + const pendingReportReplayer = new MemberWorkSyncPendingReportIntentReplayer(useCaseDeps); const queue = new MemberWorkSyncEventQueue({ reconcile: async (request, context: MemberWorkSyncReconcileContext) => { await reconciler.execute(request, context); @@ -87,6 +91,24 @@ export function createMemberWorkSyncFeature(deps: { report: (request) => reporter.execute(request), noteTeamChange: (event) => router.noteTeamChange(event), enqueueStartupScan: (teamNames) => router.enqueueStartupScan(teamNames), + replayPendingReports: async (teamNames) => { + const summaries = await Promise.allSettled( + teamNames.map((teamName) => pendingReportReplayer.replayTeam(teamName)) + ); + return summaries.reduce( + (accumulator, summary) => { + if (summary.status !== 'fulfilled') { + return accumulator; + } + accumulator.processed += summary.value.processed; + accumulator.accepted += summary.value.accepted; + accumulator.rejected += summary.value.rejected; + accumulator.superseded += summary.value.superseded; + return accumulator; + }, + { processed: 0, accepted: 0, rejected: 0, superseded: 0 } + ); + }, getQueueDiagnostics: () => queue.getDiagnostics(), dispose: () => queue.stop(), }; diff --git a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts index 9fa9954f..f5329e4d 100644 --- a/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts +++ b/src/features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore.ts @@ -1,7 +1,12 @@ import { atomicWriteAsync } from '@main/utils/atomicWrite'; -import { mkdir, readFile, appendFile } from 'fs/promises'; +import { createHash } from 'crypto'; +import { mkdir, readFile, rename } from 'fs/promises'; -import type { MemberWorkSyncReportRequest, MemberWorkSyncStatus } from '../../contracts'; +import type { + MemberWorkSyncReportIntent, + MemberWorkSyncReportRequest, + MemberWorkSyncStatus, +} from '../../contracts'; import type { MemberWorkSyncReportStorePort, MemberWorkSyncStatusStorePort, @@ -13,6 +18,11 @@ interface StoreFile { members: Record; } +interface PendingReportFile { + schemaVersion: 1; + intents: Record; +} + function normalizeMemberKey(memberName: string): string { return memberName.trim().toLowerCase(); } @@ -28,6 +38,57 @@ function isStoreFile(value: unknown): value is StoreFile { ); } +function isPendingReportFile(value: unknown): value is PendingReportFile { + return ( + value != null && + typeof value === 'object' && + (value as PendingReportFile).schemaVersion === 1 && + (value as PendingReportFile).intents != null && + typeof (value as PendingReportFile).intents === 'object' && + !Array.isArray((value as PendingReportFile).intents) + ); +} + +function stableStringify(value: unknown): string { + if (value == null || typeof value !== 'object') { + return JSON.stringify(value); + } + if (Array.isArray(value)) { + return `[${value.map(stableStringify).join(',')}]`; + } + const record = value as Record; + return `{${Object.keys(record) + .sort() + .map((key) => `${JSON.stringify(key)}:${stableStringify(record[key])}`) + .join(',')}}`; +} + +function buildPendingReportIntentId(request: MemberWorkSyncReportRequest): string { + const taskIds = [...new Set(request.taskIds ?? [])].sort(); + const payload = { + teamName: request.teamName, + memberName: normalizeMemberKey(request.memberName), + state: request.state, + agendaFingerprint: request.agendaFingerprint, + reportToken: request.reportToken ?? '', + ...(taskIds.length > 0 ? { taskIds } : {}), + ...(request.note ? { note: request.note } : {}), + ...(request.leaseTtlMs ? { leaseTtlMs: request.leaseTtlMs } : {}), + ...(request.source ? { source: request.source } : {}), + }; + return `member-work-sync-intent:${createHash('sha256') + .update(stableStringify(payload)) + .digest('hex')}`; +} + +async function quarantineFile(filePath: string): Promise { + try { + await rename(filePath, `${filePath}.invalid.${Date.now()}`); + } catch { + // If quarantine fails, keep the feature degraded but do not block team operation. + } +} + export class JsonMemberWorkSyncStore implements MemberWorkSyncStatusStorePort, MemberWorkSyncReportStorePort { @@ -56,29 +117,100 @@ export class JsonMemberWorkSyncStore } async appendPendingReport(request: MemberWorkSyncReportRequest, reason: string): Promise { - await mkdir(this.paths.getTeamDir(request.teamName), { recursive: true }); - await appendFile( - this.paths.getPendingReportsPath(request.teamName), - `${JSON.stringify({ schemaVersion: 1, reason, request, recordedAt: new Date().toISOString() })}\n`, - 'utf8' - ); + const id = buildPendingReportIntentId(request); + await this.enqueue(request.teamName, async () => { + const existing = await this.readPendingFile(request.teamName); + const current = existing.intents[id]; + if (current && current.status !== 'pending') { + return; + } + existing.intents[id] = { + id, + teamName: request.teamName, + memberName: request.memberName, + request, + reason: current?.reason ?? reason, + status: 'pending', + recordedAt: current?.recordedAt ?? new Date().toISOString(), + }; + await this.writePendingFile(request.teamName, existing); + }); + } + + async listPendingReports(teamName: string): Promise { + const file = await this.readPendingFile(teamName); + return Object.values(file.intents) + .filter((intent) => intent.status === 'pending') + .sort((left, right) => left.recordedAt.localeCompare(right.recordedAt)); + } + + async markPendingReportProcessed( + teamName: string, + id: string, + result: { + status: MemberWorkSyncReportIntent['status']; + resultCode: string; + processedAt: string; + } + ): Promise { + await this.enqueue(teamName, async () => { + const existing = await this.readPendingFile(teamName); + const current = existing.intents[id]; + if (!current || current.status !== 'pending') { + return; + } + existing.intents[id] = { + ...current, + status: result.status, + resultCode: result.resultCode, + processedAt: result.processedAt, + }; + await this.writePendingFile(teamName, existing); + }); } private async readFile(teamName: string): Promise { + const filePath = this.paths.getStatusPath(teamName); try { - const raw = await readFile(this.paths.getStatusPath(teamName), 'utf8'); + const raw = await readFile(filePath, 'utf8'); const parsed = JSON.parse(raw); if (isStoreFile(parsed)) { return parsed; } + await quarantineFile(filePath); } catch (error) { if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { - throw error; + await quarantineFile(filePath); } } return { schemaVersion: 1, members: {} }; } + private async readPendingFile(teamName: string): Promise { + const filePath = this.paths.getPendingReportsPath(teamName); + try { + const raw = await readFile(filePath, 'utf8'); + const parsed = JSON.parse(raw); + if (isPendingReportFile(parsed)) { + return parsed; + } + await quarantineFile(filePath); + } catch (error) { + if ((error as NodeJS.ErrnoException).code !== 'ENOENT') { + await quarantineFile(filePath); + } + } + return { schemaVersion: 1, intents: {} }; + } + + private async writePendingFile(teamName: string, file: PendingReportFile): Promise { + await mkdir(this.paths.getTeamDir(teamName), { recursive: true }); + await atomicWriteAsync( + this.paths.getPendingReportsPath(teamName), + JSON.stringify(file, null, 2) + ); + } + private async enqueue(teamName: string, operation: () => Promise): Promise { const previous = this.writeQueues.get(teamName) ?? Promise.resolve(); const next = previous.then(operation, operation); diff --git a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths.ts b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths.ts index 79811045..6d72b670 100644 --- a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths.ts +++ b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths.ts @@ -12,7 +12,7 @@ export class MemberWorkSyncStorePaths { } getPendingReportsPath(teamName: string): string { - return join(this.getTeamDir(teamName), 'pending-reports.jsonl'); + return join(this.getTeamDir(teamName), 'pending-reports.json'); } getReportTokenSecretPath(teamName: string): string { diff --git a/src/main/index.ts b/src/main/index.ts index 399c8f2c..eb8f057f 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -1240,11 +1240,11 @@ async function initializeServices(): Promise { }); void teamDataService .listTeams() - .then((teams) => - memberWorkSyncFeature?.enqueueStartupScan( - teams.filter((team) => !team.deletedAt).map((team) => team.teamName) - ) - ) + .then(async (teams) => { + const activeTeamNames = teams.filter((team) => !team.deletedAt).map((team) => team.teamName); + await memberWorkSyncFeature?.replayPendingReports(activeTeamNames); + await memberWorkSyncFeature?.enqueueStartupScan(activeTeamNames); + }) .catch((error: unknown) => logger.warn(`[Init] Member work sync startup scan failed: ${String(error)}`) ); diff --git a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts index 2999fc71..45e2d2cb 100644 --- a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts +++ b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts @@ -2,6 +2,7 @@ import { describe, expect, it } from 'vitest'; import { MemberWorkSyncDiagnosticsReader, + MemberWorkSyncPendingReportIntentReplayer, MemberWorkSyncReporter, type MemberWorkSyncAgendaSourceResult, type MemberWorkSyncStatusStorePort, @@ -9,6 +10,7 @@ import { } from '@features/member-work-sync/core/application'; import type { MemberWorkSyncActionableWorkItem, + MemberWorkSyncReportIntent, MemberWorkSyncReportRequest, MemberWorkSyncStatus, } from '@features/member-work-sync/contracts'; @@ -42,6 +44,7 @@ class MutableClock { class InMemoryStatusStore implements MemberWorkSyncStatusStorePort { readonly writes: MemberWorkSyncStatus[] = []; readonly pendingReports: Array<{ request: MemberWorkSyncReportRequest; reason: string }> = []; + readonly pendingIntents = new Map(); async read(): Promise { return this.writes.at(-1) ?? null; @@ -54,6 +57,25 @@ class InMemoryStatusStore implements MemberWorkSyncStatusStorePort { async appendPendingReport(request: MemberWorkSyncReportRequest, reason: string): Promise { this.pendingReports.push({ request, reason }); } + + async listPendingReports(): Promise { + return [...this.pendingIntents.values()].filter((intent) => intent.status === 'pending'); + } + + async markPendingReportProcessed( + _teamName: string, + id: string, + result: { + status: MemberWorkSyncReportIntent['status']; + resultCode: string; + processedAt: string; + } + ): Promise { + const current = this.pendingIntents.get(id); + if (current) { + this.pendingIntents.set(id, { ...current, ...result }); + } + } } function createDeps(options?: { @@ -157,7 +179,29 @@ describe('MemberWorkSync use cases', () => { expect(expired.diagnostics).toContain('report_lease_expired'); }); - it('rejects stale or unsafe reports and records pending intent only', async () => { + it('uses app clock instead of model supplied reportedAt for lease timing', async () => { + const { deps } = createDeps(); + const reader = new MemberWorkSyncDiagnosticsReader(deps); + const reporter = new MemberWorkSyncReporter(deps); + const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' }); + + const result = await reporter.execute({ + teamName: 'team-a', + memberName: 'bob', + state: 'still_working', + agendaFingerprint: current.agenda.fingerprint, + reportToken: current.reportToken, + reportedAt: '2099-01-01T00:00:00.000Z', + leaseTtlMs: 120_000, + source: 'test', + }); + + expect(result.accepted).toBe(true); + expect(result.status.report?.reportedAt).toBe('2026-04-29T00:00:00.000Z'); + expect(result.status.report?.expiresAt).toBe('2026-04-29T00:02:00.000Z'); + }); + + it('rejects stale reports without turning app-side validation failures into pending intents', async () => { const { deps, store } = createDeps(); const result = await new MemberWorkSyncReporter(deps).execute({ teamName: 'team-a', @@ -170,8 +214,7 @@ describe('MemberWorkSync use cases', () => { expect(result.accepted).toBe(false); expect(result.code).toBe('stale_fingerprint'); expect(result.status.state).toBe('needs_sync'); - expect(store.pendingReports).toHaveLength(1); - expect(store.pendingReports[0].reason).toBe('stale_fingerprint'); + expect(store.pendingReports).toHaveLength(0); }); it('accepts caught_up only when the app-side agenda is empty', async () => { @@ -247,4 +290,37 @@ describe('MemberWorkSync use cases', () => { expect(result.code).toBe('invalid_report_token'); expect(store.pendingReports).toHaveLength(0); }); + + it('replays pending controller intents through the same app validator', async () => { + const { deps, store } = createDeps(); + const reader = new MemberWorkSyncDiagnosticsReader(deps); + const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' }); + store.pendingIntents.set('intent-1', { + id: 'intent-1', + teamName: 'team-a', + memberName: 'bob', + status: 'pending', + reason: 'control_api_unavailable', + recordedAt: '2026-04-29T00:00:01.000Z', + request: { + teamName: 'team-a', + memberName: 'bob', + state: 'still_working', + agendaFingerprint: current.agenda.fingerprint, + reportToken: current.reportToken, + leaseTtlMs: 120_000, + source: 'mcp', + }, + }); + + const summary = await new MemberWorkSyncPendingReportIntentReplayer(deps).replayTeam('team-a'); + + expect(summary).toEqual({ processed: 1, accepted: 1, rejected: 0, superseded: 0 }); + expect(store.pendingIntents.get('intent-1')).toMatchObject({ + status: 'accepted', + resultCode: 'accepted', + processedAt: '2026-04-29T00:00:00.000Z', + }); + expect(store.writes.at(-1)?.state).toBe('still_working'); + }); }); diff --git a/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts b/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts new file mode 100644 index 00000000..847766fc --- /dev/null +++ b/test/features/member-work-sync/main/JsonMemberWorkSyncStore.test.ts @@ -0,0 +1,72 @@ +import { mkdir, mkdtemp, readFile, readdir, rm, writeFile } from 'fs/promises'; +import { join } from 'path'; +import { tmpdir } from 'os'; +import { afterEach, beforeEach, describe, expect, it } from 'vitest'; + +import { JsonMemberWorkSyncStore } from '@features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore'; +import { MemberWorkSyncStorePaths } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths'; + +describe('JsonMemberWorkSyncStore', () => { + let root: string; + let store: JsonMemberWorkSyncStore; + + beforeEach(async () => { + root = await mkdtemp(join(tmpdir(), 'member-work-sync-store-')); + store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(root)); + }); + + afterEach(async () => { + await rm(root, { recursive: true, force: true }); + }); + + it('quarantines invalid status JSON and returns empty state', async () => { + const statusPath = join(root, 'team-a', '.member-work-sync', 'status.json'); + await mkdir(join(root, 'team-a', '.member-work-sync'), { recursive: true }); + await writeFile(statusPath, '{bad json', 'utf8'); + + await expect(store.read({ teamName: 'team-a', memberName: 'bob' })).resolves.toBeNull(); + + const teamDir = join(root, 'team-a', '.member-work-sync'); + const entries = await readdir(teamDir); + expect(entries.some((entry) => entry.startsWith('status.json.invalid.'))).toBe(true); + }); + + it('deduplicates pending report intents and marks them processed', async () => { + const request = { + teamName: 'team-a', + memberName: 'bob', + state: 'still_working' as const, + agendaFingerprint: 'agenda:v1:abc', + reportToken: 'wrs:v1.test', + taskIds: ['task-2', 'task-1', 'task-1'], + source: 'mcp' as const, + }; + + await store.appendPendingReport(request, 'control_api_unavailable'); + await store.appendPendingReport({ ...request, taskIds: ['task-1', 'task-2'] }, 'duplicate'); + + const pending = await store.listPendingReports('team-a'); + expect(pending).toHaveLength(1); + expect(pending[0]).toMatchObject({ + teamName: 'team-a', + memberName: 'bob', + reason: 'control_api_unavailable', + status: 'pending', + }); + + await store.markPendingReportProcessed('team-a', pending[0].id, { + status: 'accepted', + resultCode: 'accepted', + processedAt: '2026-04-29T00:00:00.000Z', + }); + + expect(await store.listPendingReports('team-a')).toEqual([]); + const file = JSON.parse( + await readFile(join(root, 'team-a', '.member-work-sync', 'pending-reports.json'), 'utf8') + ); + expect(file.intents[pending[0].id]).toMatchObject({ + status: 'accepted', + resultCode: 'accepted', + }); + }); +});