feat(member-work-sync): replay pending report intents
This commit is contained in:
parent
10e405573e
commit
253ddf293d
13 changed files with 630 additions and 43 deletions
|
|
@ -1,5 +1,6 @@
|
|||
const fs = require('fs');
|
||||
const path = require('path');
|
||||
const crypto = require('crypto');
|
||||
const runtimeHelpers = require('./runtimeHelpers.js');
|
||||
|
||||
const DEFAULT_WAIT_TIMEOUT_MS = 10000;
|
||||
|
|
@ -66,7 +67,9 @@ async function requestJson(baseUrl, pathname, options = {}) {
|
|||
payload && typeof payload.error === 'string' && payload.error.trim()
|
||||
? payload.error.trim()
|
||||
: `${response.status} ${response.statusText}`.trim();
|
||||
throw new Error(detail || 'Team control API request failed');
|
||||
const error = new Error(detail || 'Team control API request failed');
|
||||
error.controlApiStatus = response.status;
|
||||
throw error;
|
||||
}
|
||||
return payload;
|
||||
} finally {
|
||||
|
|
@ -80,6 +83,9 @@ async function requestJsonWithFallback(baseUrls, pathname, options = {}) {
|
|||
try {
|
||||
return await requestJson(baseUrl, pathname, options);
|
||||
} catch (error) {
|
||||
if (error && error.controlApiStatus) {
|
||||
throw error;
|
||||
}
|
||||
lastError = error;
|
||||
}
|
||||
}
|
||||
|
|
@ -103,6 +109,111 @@ function compactReportBody(context, memberName, flags = {}) {
|
|||
};
|
||||
}
|
||||
|
||||
function stableStringify(value) {
|
||||
if (value == null || typeof value !== 'object') {
|
||||
return JSON.stringify(value);
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
return `[${value.map(stableStringify).join(',')}]`;
|
||||
}
|
||||
return `{${Object.keys(value)
|
||||
.sort()
|
||||
.map((key) => `${JSON.stringify(key)}:${stableStringify(value[key])}`)
|
||||
.join(',')}}`;
|
||||
}
|
||||
|
||||
function buildPendingIntentId(body) {
|
||||
const taskIds = Array.isArray(body.taskIds)
|
||||
? Array.from(new Set(body.taskIds.map((taskId) => String(taskId)).filter(Boolean))).sort()
|
||||
: [];
|
||||
const payload = {
|
||||
teamName: body.teamName,
|
||||
memberName: String(body.memberName || '').trim().toLowerCase(),
|
||||
state: body.state,
|
||||
agendaFingerprint: body.agendaFingerprint,
|
||||
reportToken: body.reportToken || '',
|
||||
...(taskIds.length > 0 ? { taskIds } : {}),
|
||||
...(body.note ? { note: body.note } : {}),
|
||||
...(body.leaseTtlMs ? { leaseTtlMs: body.leaseTtlMs } : {}),
|
||||
...(body.source ? { source: body.source } : {}),
|
||||
};
|
||||
return `member-work-sync-intent:${crypto
|
||||
.createHash('sha256')
|
||||
.update(stableStringify(payload))
|
||||
.digest('hex')}`;
|
||||
}
|
||||
|
||||
function readPendingReportFile(filePath) {
|
||||
try {
|
||||
const parsed = JSON.parse(fs.readFileSync(filePath, 'utf8'));
|
||||
if (
|
||||
parsed &&
|
||||
typeof parsed === 'object' &&
|
||||
parsed.schemaVersion === 1 &&
|
||||
parsed.intents &&
|
||||
typeof parsed.intents === 'object' &&
|
||||
!Array.isArray(parsed.intents)
|
||||
) {
|
||||
return parsed;
|
||||
}
|
||||
} catch (error) {
|
||||
if (!error || error.code !== 'ENOENT') {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
return { schemaVersion: 1, intents: {} };
|
||||
}
|
||||
|
||||
function writePendingReportFile(filePath, data) {
|
||||
fs.mkdirSync(path.dirname(filePath), { recursive: true });
|
||||
const tempPath = `${filePath}.${process.pid}.${Date.now()}.tmp`;
|
||||
fs.writeFileSync(tempPath, `${JSON.stringify(data, null, 2)}\n`, 'utf8');
|
||||
fs.renameSync(tempPath, filePath);
|
||||
}
|
||||
|
||||
function appendPendingReportIntent(context, body, reason) {
|
||||
const filePath = path.join(context.paths.teamDir, '.member-work-sync', 'pending-reports.json');
|
||||
const data = readPendingReportFile(filePath);
|
||||
const request = {
|
||||
...body,
|
||||
source: 'mcp',
|
||||
};
|
||||
const id = buildPendingIntentId(request);
|
||||
const current = data.intents[id];
|
||||
if (!current || current.status === 'pending') {
|
||||
data.intents[id] = {
|
||||
id,
|
||||
teamName: body.teamName,
|
||||
memberName: body.memberName,
|
||||
request,
|
||||
reason,
|
||||
status: 'pending',
|
||||
recordedAt: current && current.recordedAt ? current.recordedAt : new Date().toISOString(),
|
||||
};
|
||||
writePendingReportFile(filePath, data);
|
||||
}
|
||||
return {
|
||||
accepted: false,
|
||||
pendingValidation: true,
|
||||
code: 'pending_validation',
|
||||
message:
|
||||
'Member work sync report was recorded for app validation. Continue concrete task work; do not treat this as a confirmed lease yet.',
|
||||
intentId: id,
|
||||
};
|
||||
}
|
||||
|
||||
function assertReportBody(body) {
|
||||
if (!body.state || !['still_working', 'blocked', 'caught_up'].includes(body.state)) {
|
||||
throw new Error('state must be still_working, blocked, or caught_up');
|
||||
}
|
||||
if (!body.agendaFingerprint) {
|
||||
throw new Error('agendaFingerprint is required');
|
||||
}
|
||||
if (!body.reportToken) {
|
||||
throw new Error('reportToken is required');
|
||||
}
|
||||
}
|
||||
|
||||
async function memberWorkSyncStatus(context, flags = {}) {
|
||||
const memberName = runtimeHelpers.assertExplicitTeamMemberName(
|
||||
context.paths,
|
||||
|
|
@ -125,16 +236,31 @@ async function memberWorkSyncReport(context, flags = {}) {
|
|||
flags.memberName || flags.member || flags.from,
|
||||
'member work sync report member'
|
||||
);
|
||||
const baseUrls = resolveControlBaseUrls(context, flags);
|
||||
return requestJsonWithFallback(
|
||||
baseUrls,
|
||||
`/api/teams/${encodeURIComponent(context.teamName)}/member-work-sync/report`,
|
||||
{
|
||||
method: 'POST',
|
||||
body: compactReportBody(context, memberName, flags),
|
||||
timeoutMs: normalizeTimeoutMs(flags.waitTimeoutMs || flags['wait-timeout-ms']),
|
||||
const body = compactReportBody(context, memberName, flags);
|
||||
assertReportBody(body);
|
||||
|
||||
const pathname = `/api/teams/${encodeURIComponent(context.teamName)}/member-work-sync/report`;
|
||||
const options = {
|
||||
method: 'POST',
|
||||
body,
|
||||
timeoutMs: normalizeTimeoutMs(flags.waitTimeoutMs || flags['wait-timeout-ms']),
|
||||
};
|
||||
|
||||
let baseUrls;
|
||||
try {
|
||||
baseUrls = resolveControlBaseUrls(context, flags);
|
||||
} catch {
|
||||
return appendPendingReportIntent(context, body, 'control_api_unavailable');
|
||||
}
|
||||
|
||||
try {
|
||||
return await requestJsonWithFallback(baseUrls, pathname, options);
|
||||
} catch (error) {
|
||||
if (error && error.controlApiStatus) {
|
||||
throw error;
|
||||
}
|
||||
);
|
||||
return appendPendingReportIntent(context, body, 'control_api_unavailable');
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
|
|
|||
|
|
@ -2329,6 +2329,74 @@ describe('agent-teams-controller API', () => {
|
|||
}
|
||||
});
|
||||
|
||||
it('records member work sync report intents only when the app validator is unavailable', async () => {
|
||||
const claudeDir = makeClaudeDir();
|
||||
const controller = createController({ teamName: 'my-team', claudeDir });
|
||||
|
||||
const pending = await controller.workSync.memberWorkSyncReport({
|
||||
memberName: 'bob',
|
||||
state: 'still_working',
|
||||
agendaFingerprint: 'agenda:v1:abc',
|
||||
reportToken: 'wrs:v1.test.token',
|
||||
taskIds: ['task-1'],
|
||||
});
|
||||
|
||||
expect(pending.pendingValidation).toBe(true);
|
||||
expect(pending.accepted).toBe(false);
|
||||
|
||||
const intentFile = path.join(
|
||||
claudeDir,
|
||||
'teams',
|
||||
'my-team',
|
||||
'.member-work-sync',
|
||||
'pending-reports.json'
|
||||
);
|
||||
const intents = JSON.parse(fs.readFileSync(intentFile, 'utf8'));
|
||||
expect(Object.values(intents.intents)).toEqual([
|
||||
expect.objectContaining({
|
||||
teamName: 'my-team',
|
||||
memberName: 'bob',
|
||||
reason: 'control_api_unavailable',
|
||||
status: 'pending',
|
||||
request: expect.objectContaining({
|
||||
memberName: 'bob',
|
||||
source: 'mcp',
|
||||
reportToken: 'wrs:v1.test.token',
|
||||
}),
|
||||
}),
|
||||
]);
|
||||
});
|
||||
|
||||
it('does not record pending work sync intents for app-side validation rejections', async () => {
|
||||
const claudeDir = makeClaudeDir();
|
||||
const controller = createController({ teamName: 'my-team', claudeDir });
|
||||
|
||||
const server = await startControlServer(async () => ({
|
||||
statusCode: 400,
|
||||
body: { error: 'stale_fingerprint' },
|
||||
}));
|
||||
|
||||
try {
|
||||
await expect(
|
||||
controller.workSync.memberWorkSyncReport({
|
||||
controlUrl: server.baseUrl,
|
||||
memberName: 'bob',
|
||||
state: 'still_working',
|
||||
agendaFingerprint: 'agenda:v1:stale',
|
||||
reportToken: 'wrs:v1.test.token',
|
||||
})
|
||||
).rejects.toThrow('stale_fingerprint');
|
||||
|
||||
expect(
|
||||
fs.existsSync(
|
||||
path.join(claudeDir, 'teams', 'my-team', '.member-work-sync', 'pending-reports.json')
|
||||
)
|
||||
).toBe(false);
|
||||
} finally {
|
||||
await server.close();
|
||||
}
|
||||
});
|
||||
|
||||
it('prefers the published control endpoint over a stale env URL', async () => {
|
||||
const claudeDir = makeClaudeDir();
|
||||
const controller = createController({ teamName: 'my-team', claudeDir });
|
||||
|
|
|
|||
|
|
@ -66,6 +66,20 @@ export interface MemberWorkSyncReport {
|
|||
rejectionCode?: string;
|
||||
}
|
||||
|
||||
export type MemberWorkSyncReportIntentStatus = 'pending' | 'accepted' | 'rejected' | 'superseded';
|
||||
|
||||
export interface MemberWorkSyncReportIntent {
|
||||
id: string;
|
||||
teamName: string;
|
||||
memberName: string;
|
||||
request: MemberWorkSyncReportRequest;
|
||||
reason: string;
|
||||
status: MemberWorkSyncReportIntentStatus;
|
||||
recordedAt: string;
|
||||
processedAt?: string;
|
||||
resultCode?: string;
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncShadowDiagnostics {
|
||||
reconciledBy: 'request' | 'queue' | 'report';
|
||||
wouldNudge: boolean;
|
||||
|
|
|
|||
|
|
@ -0,0 +1,81 @@
|
|||
import type { MemberWorkSyncReportIntentStatus } from '../../contracts';
|
||||
import { MemberWorkSyncReporter } from './MemberWorkSyncReporter';
|
||||
import type { MemberWorkSyncUseCaseDeps } from './ports';
|
||||
|
||||
export interface MemberWorkSyncPendingReportReplaySummary {
|
||||
processed: number;
|
||||
accepted: number;
|
||||
rejected: number;
|
||||
superseded: number;
|
||||
}
|
||||
|
||||
function statusForResult(input: {
|
||||
accepted: boolean;
|
||||
code: string;
|
||||
}): MemberWorkSyncReportIntentStatus {
|
||||
if (input.accepted) {
|
||||
return 'accepted';
|
||||
}
|
||||
if (input.code === 'member_inactive' || input.code === 'team_runtime_inactive') {
|
||||
return 'superseded';
|
||||
}
|
||||
return 'rejected';
|
||||
}
|
||||
|
||||
export class MemberWorkSyncPendingReportIntentReplayer {
|
||||
private readonly reporter: MemberWorkSyncReporter;
|
||||
|
||||
constructor(private readonly deps: MemberWorkSyncUseCaseDeps) {
|
||||
this.reporter = new MemberWorkSyncReporter(deps);
|
||||
}
|
||||
|
||||
async replayTeam(teamName: string): Promise<MemberWorkSyncPendingReportReplaySummary> {
|
||||
const store = this.deps.reportStore;
|
||||
if (!store?.listPendingReports || !store.markPendingReportProcessed) {
|
||||
return { processed: 0, accepted: 0, rejected: 0, superseded: 0 };
|
||||
}
|
||||
|
||||
const intents = await store.listPendingReports(teamName);
|
||||
const summary: MemberWorkSyncPendingReportReplaySummary = {
|
||||
processed: 0,
|
||||
accepted: 0,
|
||||
rejected: 0,
|
||||
superseded: 0,
|
||||
};
|
||||
|
||||
for (const intent of intents) {
|
||||
let status: MemberWorkSyncReportIntentStatus = 'rejected';
|
||||
let resultCode = 'replay_failed';
|
||||
try {
|
||||
const result = await this.reporter.execute({
|
||||
...intent.request,
|
||||
source: intent.request.source ?? 'mcp',
|
||||
});
|
||||
status = statusForResult(result);
|
||||
resultCode = result.code;
|
||||
} catch (error) {
|
||||
this.deps.logger?.warn('member work sync pending report replay failed', {
|
||||
teamName,
|
||||
intentId: intent.id,
|
||||
error: String(error),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
summary.processed += 1;
|
||||
if (status === 'accepted') {
|
||||
summary.accepted += 1;
|
||||
} else if (status === 'superseded') {
|
||||
summary.superseded += 1;
|
||||
} else {
|
||||
summary.rejected += 1;
|
||||
}
|
||||
await store.markPendingReportProcessed(teamName, intent.id, {
|
||||
status,
|
||||
resultCode,
|
||||
processedAt: this.deps.clock.now().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
return summary;
|
||||
}
|
||||
}
|
||||
|
|
@ -11,14 +11,6 @@ import {
|
|||
} from './MemberWorkSyncReconciler';
|
||||
import type { MemberWorkSyncUseCaseDeps } from './ports';
|
||||
|
||||
const TERMINAL_REPORT_REJECTION_CODES = new Set([
|
||||
'reserved_or_invalid_member',
|
||||
'identity_mismatch',
|
||||
'member_inactive',
|
||||
'identity_untrusted',
|
||||
'invalid_report_token',
|
||||
]);
|
||||
|
||||
export class MemberWorkSyncReporter {
|
||||
private readonly reconciler: MemberWorkSyncReconciler;
|
||||
|
||||
|
|
@ -29,9 +21,7 @@ export class MemberWorkSyncReporter {
|
|||
async execute(request: MemberWorkSyncReportRequest): Promise<MemberWorkSyncReportResult> {
|
||||
const source = await this.deps.agendaSource.loadAgenda(request);
|
||||
const agenda = finalizeMemberWorkSyncAgenda(this.deps, source);
|
||||
const nowIso = (
|
||||
request.reportedAt ? new Date(request.reportedAt) : this.deps.clock.now()
|
||||
).toISOString();
|
||||
const nowIso = this.deps.clock.now().toISOString();
|
||||
const teamActive = this.deps.lifecycle
|
||||
? await this.deps.lifecycle.isTeamActive(agenda.teamName)
|
||||
: true;
|
||||
|
|
@ -63,9 +53,6 @@ export class MemberWorkSyncReporter {
|
|||
|
||||
if (!validation.ok) {
|
||||
const status = await this.reconciler.execute(request);
|
||||
if (!TERMINAL_REPORT_REJECTION_CODES.has(validation.code)) {
|
||||
await this.deps.reportStore?.appendPendingReport?.(request, validation.code);
|
||||
}
|
||||
return {
|
||||
accepted: false,
|
||||
code: validation.code,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
export * from './MemberWorkSyncDiagnosticsReader';
|
||||
export * from './MemberWorkSyncPendingReportIntentReplayer';
|
||||
export * from './MemberWorkSyncReconciler';
|
||||
export * from './MemberWorkSyncReporter';
|
||||
export * from './ports';
|
||||
|
|
|
|||
|
|
@ -2,6 +2,8 @@ import type {
|
|||
MemberWorkSyncAgenda,
|
||||
MemberWorkSyncProviderId,
|
||||
MemberWorkSyncReport,
|
||||
MemberWorkSyncReportIntent,
|
||||
MemberWorkSyncReportIntentStatus,
|
||||
MemberWorkSyncReportRequest,
|
||||
MemberWorkSyncStatus,
|
||||
} from '../../contracts';
|
||||
|
|
@ -75,6 +77,12 @@ export interface MemberWorkSyncStatusStorePort {
|
|||
|
||||
export interface MemberWorkSyncReportStorePort {
|
||||
appendPendingReport?(request: MemberWorkSyncReportRequest, reason: string): Promise<void>;
|
||||
listPendingReports?(teamName: string): Promise<MemberWorkSyncReportIntent[]>;
|
||||
markPendingReportProcessed?(
|
||||
teamName: string,
|
||||
id: string,
|
||||
result: { status: MemberWorkSyncReportIntentStatus; resultCode: string; processedAt: string }
|
||||
): Promise<void>;
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncUseCaseDeps {
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@ import type {
|
|||
} from '../../contracts';
|
||||
import {
|
||||
MemberWorkSyncDiagnosticsReader,
|
||||
MemberWorkSyncPendingReportIntentReplayer,
|
||||
type MemberWorkSyncPendingReportReplaySummary,
|
||||
MemberWorkSyncReconciler,
|
||||
MemberWorkSyncReporter,
|
||||
type MemberWorkSyncReconcileContext,
|
||||
|
|
@ -34,6 +36,7 @@ export interface MemberWorkSyncFeatureFacade {
|
|||
report(request: MemberWorkSyncReportRequest): Promise<MemberWorkSyncReportResult>;
|
||||
noteTeamChange(event: TeamChangeEvent): void;
|
||||
enqueueStartupScan(teamNames: string[]): Promise<void>;
|
||||
replayPendingReports(teamNames: string[]): Promise<MemberWorkSyncPendingReportReplaySummary>;
|
||||
getQueueDiagnostics(): MemberWorkSyncQueueDiagnostics;
|
||||
dispose(): Promise<void>;
|
||||
}
|
||||
|
|
@ -73,6 +76,7 @@ export function createMemberWorkSyncFeature(deps: {
|
|||
const diagnosticsReader = new MemberWorkSyncDiagnosticsReader(useCaseDeps);
|
||||
const reporter = new MemberWorkSyncReporter(useCaseDeps);
|
||||
const reconciler = new MemberWorkSyncReconciler(useCaseDeps);
|
||||
const pendingReportReplayer = new MemberWorkSyncPendingReportIntentReplayer(useCaseDeps);
|
||||
const queue = new MemberWorkSyncEventQueue({
|
||||
reconcile: async (request, context: MemberWorkSyncReconcileContext) => {
|
||||
await reconciler.execute(request, context);
|
||||
|
|
@ -87,6 +91,24 @@ export function createMemberWorkSyncFeature(deps: {
|
|||
report: (request) => reporter.execute(request),
|
||||
noteTeamChange: (event) => router.noteTeamChange(event),
|
||||
enqueueStartupScan: (teamNames) => router.enqueueStartupScan(teamNames),
|
||||
replayPendingReports: async (teamNames) => {
|
||||
const summaries = await Promise.allSettled(
|
||||
teamNames.map((teamName) => pendingReportReplayer.replayTeam(teamName))
|
||||
);
|
||||
return summaries.reduce<MemberWorkSyncPendingReportReplaySummary>(
|
||||
(accumulator, summary) => {
|
||||
if (summary.status !== 'fulfilled') {
|
||||
return accumulator;
|
||||
}
|
||||
accumulator.processed += summary.value.processed;
|
||||
accumulator.accepted += summary.value.accepted;
|
||||
accumulator.rejected += summary.value.rejected;
|
||||
accumulator.superseded += summary.value.superseded;
|
||||
return accumulator;
|
||||
},
|
||||
{ processed: 0, accepted: 0, rejected: 0, superseded: 0 }
|
||||
);
|
||||
},
|
||||
getQueueDiagnostics: () => queue.getDiagnostics(),
|
||||
dispose: () => queue.stop(),
|
||||
};
|
||||
|
|
|
|||
|
|
@ -1,7 +1,12 @@
|
|||
import { atomicWriteAsync } from '@main/utils/atomicWrite';
|
||||
import { mkdir, readFile, appendFile } from 'fs/promises';
|
||||
import { createHash } from 'crypto';
|
||||
import { mkdir, readFile, rename } from 'fs/promises';
|
||||
|
||||
import type { MemberWorkSyncReportRequest, MemberWorkSyncStatus } from '../../contracts';
|
||||
import type {
|
||||
MemberWorkSyncReportIntent,
|
||||
MemberWorkSyncReportRequest,
|
||||
MemberWorkSyncStatus,
|
||||
} from '../../contracts';
|
||||
import type {
|
||||
MemberWorkSyncReportStorePort,
|
||||
MemberWorkSyncStatusStorePort,
|
||||
|
|
@ -13,6 +18,11 @@ interface StoreFile {
|
|||
members: Record<string, MemberWorkSyncStatus>;
|
||||
}
|
||||
|
||||
interface PendingReportFile {
|
||||
schemaVersion: 1;
|
||||
intents: Record<string, MemberWorkSyncReportIntent>;
|
||||
}
|
||||
|
||||
function normalizeMemberKey(memberName: string): string {
|
||||
return memberName.trim().toLowerCase();
|
||||
}
|
||||
|
|
@ -28,6 +38,57 @@ function isStoreFile(value: unknown): value is StoreFile {
|
|||
);
|
||||
}
|
||||
|
||||
function isPendingReportFile(value: unknown): value is PendingReportFile {
|
||||
return (
|
||||
value != null &&
|
||||
typeof value === 'object' &&
|
||||
(value as PendingReportFile).schemaVersion === 1 &&
|
||||
(value as PendingReportFile).intents != null &&
|
||||
typeof (value as PendingReportFile).intents === 'object' &&
|
||||
!Array.isArray((value as PendingReportFile).intents)
|
||||
);
|
||||
}
|
||||
|
||||
function stableStringify(value: unknown): string {
|
||||
if (value == null || typeof value !== 'object') {
|
||||
return JSON.stringify(value);
|
||||
}
|
||||
if (Array.isArray(value)) {
|
||||
return `[${value.map(stableStringify).join(',')}]`;
|
||||
}
|
||||
const record = value as Record<string, unknown>;
|
||||
return `{${Object.keys(record)
|
||||
.sort()
|
||||
.map((key) => `${JSON.stringify(key)}:${stableStringify(record[key])}`)
|
||||
.join(',')}}`;
|
||||
}
|
||||
|
||||
function buildPendingReportIntentId(request: MemberWorkSyncReportRequest): string {
|
||||
const taskIds = [...new Set(request.taskIds ?? [])].sort();
|
||||
const payload = {
|
||||
teamName: request.teamName,
|
||||
memberName: normalizeMemberKey(request.memberName),
|
||||
state: request.state,
|
||||
agendaFingerprint: request.agendaFingerprint,
|
||||
reportToken: request.reportToken ?? '',
|
||||
...(taskIds.length > 0 ? { taskIds } : {}),
|
||||
...(request.note ? { note: request.note } : {}),
|
||||
...(request.leaseTtlMs ? { leaseTtlMs: request.leaseTtlMs } : {}),
|
||||
...(request.source ? { source: request.source } : {}),
|
||||
};
|
||||
return `member-work-sync-intent:${createHash('sha256')
|
||||
.update(stableStringify(payload))
|
||||
.digest('hex')}`;
|
||||
}
|
||||
|
||||
async function quarantineFile(filePath: string): Promise<void> {
|
||||
try {
|
||||
await rename(filePath, `${filePath}.invalid.${Date.now()}`);
|
||||
} catch {
|
||||
// If quarantine fails, keep the feature degraded but do not block team operation.
|
||||
}
|
||||
}
|
||||
|
||||
export class JsonMemberWorkSyncStore
|
||||
implements MemberWorkSyncStatusStorePort, MemberWorkSyncReportStorePort
|
||||
{
|
||||
|
|
@ -56,29 +117,100 @@ export class JsonMemberWorkSyncStore
|
|||
}
|
||||
|
||||
async appendPendingReport(request: MemberWorkSyncReportRequest, reason: string): Promise<void> {
|
||||
await mkdir(this.paths.getTeamDir(request.teamName), { recursive: true });
|
||||
await appendFile(
|
||||
this.paths.getPendingReportsPath(request.teamName),
|
||||
`${JSON.stringify({ schemaVersion: 1, reason, request, recordedAt: new Date().toISOString() })}\n`,
|
||||
'utf8'
|
||||
);
|
||||
const id = buildPendingReportIntentId(request);
|
||||
await this.enqueue(request.teamName, async () => {
|
||||
const existing = await this.readPendingFile(request.teamName);
|
||||
const current = existing.intents[id];
|
||||
if (current && current.status !== 'pending') {
|
||||
return;
|
||||
}
|
||||
existing.intents[id] = {
|
||||
id,
|
||||
teamName: request.teamName,
|
||||
memberName: request.memberName,
|
||||
request,
|
||||
reason: current?.reason ?? reason,
|
||||
status: 'pending',
|
||||
recordedAt: current?.recordedAt ?? new Date().toISOString(),
|
||||
};
|
||||
await this.writePendingFile(request.teamName, existing);
|
||||
});
|
||||
}
|
||||
|
||||
async listPendingReports(teamName: string): Promise<MemberWorkSyncReportIntent[]> {
|
||||
const file = await this.readPendingFile(teamName);
|
||||
return Object.values(file.intents)
|
||||
.filter((intent) => intent.status === 'pending')
|
||||
.sort((left, right) => left.recordedAt.localeCompare(right.recordedAt));
|
||||
}
|
||||
|
||||
async markPendingReportProcessed(
|
||||
teamName: string,
|
||||
id: string,
|
||||
result: {
|
||||
status: MemberWorkSyncReportIntent['status'];
|
||||
resultCode: string;
|
||||
processedAt: string;
|
||||
}
|
||||
): Promise<void> {
|
||||
await this.enqueue(teamName, async () => {
|
||||
const existing = await this.readPendingFile(teamName);
|
||||
const current = existing.intents[id];
|
||||
if (!current || current.status !== 'pending') {
|
||||
return;
|
||||
}
|
||||
existing.intents[id] = {
|
||||
...current,
|
||||
status: result.status,
|
||||
resultCode: result.resultCode,
|
||||
processedAt: result.processedAt,
|
||||
};
|
||||
await this.writePendingFile(teamName, existing);
|
||||
});
|
||||
}
|
||||
|
||||
private async readFile(teamName: string): Promise<StoreFile> {
|
||||
const filePath = this.paths.getStatusPath(teamName);
|
||||
try {
|
||||
const raw = await readFile(this.paths.getStatusPath(teamName), 'utf8');
|
||||
const raw = await readFile(filePath, 'utf8');
|
||||
const parsed = JSON.parse(raw);
|
||||
if (isStoreFile(parsed)) {
|
||||
return parsed;
|
||||
}
|
||||
await quarantineFile(filePath);
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
|
||||
throw error;
|
||||
await quarantineFile(filePath);
|
||||
}
|
||||
}
|
||||
return { schemaVersion: 1, members: {} };
|
||||
}
|
||||
|
||||
private async readPendingFile(teamName: string): Promise<PendingReportFile> {
|
||||
const filePath = this.paths.getPendingReportsPath(teamName);
|
||||
try {
|
||||
const raw = await readFile(filePath, 'utf8');
|
||||
const parsed = JSON.parse(raw);
|
||||
if (isPendingReportFile(parsed)) {
|
||||
return parsed;
|
||||
}
|
||||
await quarantineFile(filePath);
|
||||
} catch (error) {
|
||||
if ((error as NodeJS.ErrnoException).code !== 'ENOENT') {
|
||||
await quarantineFile(filePath);
|
||||
}
|
||||
}
|
||||
return { schemaVersion: 1, intents: {} };
|
||||
}
|
||||
|
||||
private async writePendingFile(teamName: string, file: PendingReportFile): Promise<void> {
|
||||
await mkdir(this.paths.getTeamDir(teamName), { recursive: true });
|
||||
await atomicWriteAsync(
|
||||
this.paths.getPendingReportsPath(teamName),
|
||||
JSON.stringify(file, null, 2)
|
||||
);
|
||||
}
|
||||
|
||||
private async enqueue(teamName: string, operation: () => Promise<void>): Promise<void> {
|
||||
const previous = this.writeQueues.get(teamName) ?? Promise.resolve();
|
||||
const next = previous.then(operation, operation);
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ export class MemberWorkSyncStorePaths {
|
|||
}
|
||||
|
||||
getPendingReportsPath(teamName: string): string {
|
||||
return join(this.getTeamDir(teamName), 'pending-reports.jsonl');
|
||||
return join(this.getTeamDir(teamName), 'pending-reports.json');
|
||||
}
|
||||
|
||||
getReportTokenSecretPath(teamName: string): string {
|
||||
|
|
|
|||
|
|
@ -1240,11 +1240,11 @@ async function initializeServices(): Promise<void> {
|
|||
});
|
||||
void teamDataService
|
||||
.listTeams()
|
||||
.then((teams) =>
|
||||
memberWorkSyncFeature?.enqueueStartupScan(
|
||||
teams.filter((team) => !team.deletedAt).map((team) => team.teamName)
|
||||
)
|
||||
)
|
||||
.then(async (teams) => {
|
||||
const activeTeamNames = teams.filter((team) => !team.deletedAt).map((team) => team.teamName);
|
||||
await memberWorkSyncFeature?.replayPendingReports(activeTeamNames);
|
||||
await memberWorkSyncFeature?.enqueueStartupScan(activeTeamNames);
|
||||
})
|
||||
.catch((error: unknown) =>
|
||||
logger.warn(`[Init] Member work sync startup scan failed: ${String(error)}`)
|
||||
);
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ import { describe, expect, it } from 'vitest';
|
|||
|
||||
import {
|
||||
MemberWorkSyncDiagnosticsReader,
|
||||
MemberWorkSyncPendingReportIntentReplayer,
|
||||
MemberWorkSyncReporter,
|
||||
type MemberWorkSyncAgendaSourceResult,
|
||||
type MemberWorkSyncStatusStorePort,
|
||||
|
|
@ -9,6 +10,7 @@ import {
|
|||
} from '@features/member-work-sync/core/application';
|
||||
import type {
|
||||
MemberWorkSyncActionableWorkItem,
|
||||
MemberWorkSyncReportIntent,
|
||||
MemberWorkSyncReportRequest,
|
||||
MemberWorkSyncStatus,
|
||||
} from '@features/member-work-sync/contracts';
|
||||
|
|
@ -42,6 +44,7 @@ class MutableClock {
|
|||
class InMemoryStatusStore implements MemberWorkSyncStatusStorePort {
|
||||
readonly writes: MemberWorkSyncStatus[] = [];
|
||||
readonly pendingReports: Array<{ request: MemberWorkSyncReportRequest; reason: string }> = [];
|
||||
readonly pendingIntents = new Map<string, MemberWorkSyncReportIntent>();
|
||||
|
||||
async read(): Promise<MemberWorkSyncStatus | null> {
|
||||
return this.writes.at(-1) ?? null;
|
||||
|
|
@ -54,6 +57,25 @@ class InMemoryStatusStore implements MemberWorkSyncStatusStorePort {
|
|||
async appendPendingReport(request: MemberWorkSyncReportRequest, reason: string): Promise<void> {
|
||||
this.pendingReports.push({ request, reason });
|
||||
}
|
||||
|
||||
async listPendingReports(): Promise<MemberWorkSyncReportIntent[]> {
|
||||
return [...this.pendingIntents.values()].filter((intent) => intent.status === 'pending');
|
||||
}
|
||||
|
||||
async markPendingReportProcessed(
|
||||
_teamName: string,
|
||||
id: string,
|
||||
result: {
|
||||
status: MemberWorkSyncReportIntent['status'];
|
||||
resultCode: string;
|
||||
processedAt: string;
|
||||
}
|
||||
): Promise<void> {
|
||||
const current = this.pendingIntents.get(id);
|
||||
if (current) {
|
||||
this.pendingIntents.set(id, { ...current, ...result });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function createDeps(options?: {
|
||||
|
|
@ -157,7 +179,29 @@ describe('MemberWorkSync use cases', () => {
|
|||
expect(expired.diagnostics).toContain('report_lease_expired');
|
||||
});
|
||||
|
||||
it('rejects stale or unsafe reports and records pending intent only', async () => {
|
||||
it('uses app clock instead of model supplied reportedAt for lease timing', async () => {
|
||||
const { deps } = createDeps();
|
||||
const reader = new MemberWorkSyncDiagnosticsReader(deps);
|
||||
const reporter = new MemberWorkSyncReporter(deps);
|
||||
const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' });
|
||||
|
||||
const result = await reporter.execute({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
state: 'still_working',
|
||||
agendaFingerprint: current.agenda.fingerprint,
|
||||
reportToken: current.reportToken,
|
||||
reportedAt: '2099-01-01T00:00:00.000Z',
|
||||
leaseTtlMs: 120_000,
|
||||
source: 'test',
|
||||
});
|
||||
|
||||
expect(result.accepted).toBe(true);
|
||||
expect(result.status.report?.reportedAt).toBe('2026-04-29T00:00:00.000Z');
|
||||
expect(result.status.report?.expiresAt).toBe('2026-04-29T00:02:00.000Z');
|
||||
});
|
||||
|
||||
it('rejects stale reports without turning app-side validation failures into pending intents', async () => {
|
||||
const { deps, store } = createDeps();
|
||||
const result = await new MemberWorkSyncReporter(deps).execute({
|
||||
teamName: 'team-a',
|
||||
|
|
@ -170,8 +214,7 @@ describe('MemberWorkSync use cases', () => {
|
|||
expect(result.accepted).toBe(false);
|
||||
expect(result.code).toBe('stale_fingerprint');
|
||||
expect(result.status.state).toBe('needs_sync');
|
||||
expect(store.pendingReports).toHaveLength(1);
|
||||
expect(store.pendingReports[0].reason).toBe('stale_fingerprint');
|
||||
expect(store.pendingReports).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('accepts caught_up only when the app-side agenda is empty', async () => {
|
||||
|
|
@ -247,4 +290,37 @@ describe('MemberWorkSync use cases', () => {
|
|||
expect(result.code).toBe('invalid_report_token');
|
||||
expect(store.pendingReports).toHaveLength(0);
|
||||
});
|
||||
|
||||
it('replays pending controller intents through the same app validator', async () => {
|
||||
const { deps, store } = createDeps();
|
||||
const reader = new MemberWorkSyncDiagnosticsReader(deps);
|
||||
const current = await reader.execute({ teamName: 'team-a', memberName: 'bob' });
|
||||
store.pendingIntents.set('intent-1', {
|
||||
id: 'intent-1',
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
status: 'pending',
|
||||
reason: 'control_api_unavailable',
|
||||
recordedAt: '2026-04-29T00:00:01.000Z',
|
||||
request: {
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
state: 'still_working',
|
||||
agendaFingerprint: current.agenda.fingerprint,
|
||||
reportToken: current.reportToken,
|
||||
leaseTtlMs: 120_000,
|
||||
source: 'mcp',
|
||||
},
|
||||
});
|
||||
|
||||
const summary = await new MemberWorkSyncPendingReportIntentReplayer(deps).replayTeam('team-a');
|
||||
|
||||
expect(summary).toEqual({ processed: 1, accepted: 1, rejected: 0, superseded: 0 });
|
||||
expect(store.pendingIntents.get('intent-1')).toMatchObject({
|
||||
status: 'accepted',
|
||||
resultCode: 'accepted',
|
||||
processedAt: '2026-04-29T00:00:00.000Z',
|
||||
});
|
||||
expect(store.writes.at(-1)?.state).toBe('still_working');
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -0,0 +1,72 @@
|
|||
import { mkdir, mkdtemp, readFile, readdir, rm, writeFile } from 'fs/promises';
|
||||
import { join } from 'path';
|
||||
import { tmpdir } from 'os';
|
||||
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
|
||||
|
||||
import { JsonMemberWorkSyncStore } from '@features/member-work-sync/main/infrastructure/JsonMemberWorkSyncStore';
|
||||
import { MemberWorkSyncStorePaths } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncStorePaths';
|
||||
|
||||
describe('JsonMemberWorkSyncStore', () => {
|
||||
let root: string;
|
||||
let store: JsonMemberWorkSyncStore;
|
||||
|
||||
beforeEach(async () => {
|
||||
root = await mkdtemp(join(tmpdir(), 'member-work-sync-store-'));
|
||||
store = new JsonMemberWorkSyncStore(new MemberWorkSyncStorePaths(root));
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
await rm(root, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
it('quarantines invalid status JSON and returns empty state', async () => {
|
||||
const statusPath = join(root, 'team-a', '.member-work-sync', 'status.json');
|
||||
await mkdir(join(root, 'team-a', '.member-work-sync'), { recursive: true });
|
||||
await writeFile(statusPath, '{bad json', 'utf8');
|
||||
|
||||
await expect(store.read({ teamName: 'team-a', memberName: 'bob' })).resolves.toBeNull();
|
||||
|
||||
const teamDir = join(root, 'team-a', '.member-work-sync');
|
||||
const entries = await readdir(teamDir);
|
||||
expect(entries.some((entry) => entry.startsWith('status.json.invalid.'))).toBe(true);
|
||||
});
|
||||
|
||||
it('deduplicates pending report intents and marks them processed', async () => {
|
||||
const request = {
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
state: 'still_working' as const,
|
||||
agendaFingerprint: 'agenda:v1:abc',
|
||||
reportToken: 'wrs:v1.test',
|
||||
taskIds: ['task-2', 'task-1', 'task-1'],
|
||||
source: 'mcp' as const,
|
||||
};
|
||||
|
||||
await store.appendPendingReport(request, 'control_api_unavailable');
|
||||
await store.appendPendingReport({ ...request, taskIds: ['task-1', 'task-2'] }, 'duplicate');
|
||||
|
||||
const pending = await store.listPendingReports('team-a');
|
||||
expect(pending).toHaveLength(1);
|
||||
expect(pending[0]).toMatchObject({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
reason: 'control_api_unavailable',
|
||||
status: 'pending',
|
||||
});
|
||||
|
||||
await store.markPendingReportProcessed('team-a', pending[0].id, {
|
||||
status: 'accepted',
|
||||
resultCode: 'accepted',
|
||||
processedAt: '2026-04-29T00:00:00.000Z',
|
||||
});
|
||||
|
||||
expect(await store.listPendingReports('team-a')).toEqual([]);
|
||||
const file = JSON.parse(
|
||||
await readFile(join(root, 'team-a', '.member-work-sync', 'pending-reports.json'), 'utf8')
|
||||
);
|
||||
expect(file.intents[pending[0].id]).toMatchObject({
|
||||
status: 'accepted',
|
||||
resultCode: 'accepted',
|
||||
});
|
||||
});
|
||||
});
|
||||
Loading…
Reference in a new issue