diff --git a/src/features/member-work-sync/contracts/types.ts b/src/features/member-work-sync/contracts/types.ts index 8c2528f4..b3a7da68 100644 --- a/src/features/member-work-sync/contracts/types.ts +++ b/src/features/member-work-sync/contracts/types.ts @@ -66,6 +66,14 @@ export interface MemberWorkSyncReport { rejectionCode?: string; } +export interface MemberWorkSyncShadowDiagnostics { + reconciledBy: 'request' | 'queue' | 'report'; + wouldNudge: boolean; + fingerprintChanged: boolean; + previousFingerprint?: string; + triggerReasons?: string[]; +} + export interface MemberWorkSyncStatus { teamName: string; memberName: string; @@ -74,6 +82,7 @@ export interface MemberWorkSyncStatus { report?: MemberWorkSyncReport; reportToken?: string; reportTokenExpiresAt?: string; + shadow?: MemberWorkSyncShadowDiagnostics; evaluatedAt: string; diagnostics: string[]; providerId?: MemberWorkSyncProviderId; diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncReconciler.ts b/src/features/member-work-sync/core/application/MemberWorkSyncReconciler.ts index 9ef7519f..a98d9cf6 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncReconciler.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncReconciler.ts @@ -7,6 +7,11 @@ import { import type { MemberWorkSyncStatus, MemberWorkSyncStatusRequest } from '../../contracts'; import type { MemberWorkSyncAgendaSourceResult, MemberWorkSyncUseCaseDeps } from './ports'; +export interface MemberWorkSyncReconcileContext { + reconciledBy?: 'request' | 'queue'; + triggerReasons?: string[]; +} + export function finalizeMemberWorkSyncAgenda( deps: MemberWorkSyncUseCaseDeps, source: MemberWorkSyncAgendaSourceResult @@ -30,16 +35,22 @@ export function finalizeMemberWorkSyncAgenda( export class MemberWorkSyncReconciler { constructor(private readonly deps: MemberWorkSyncUseCaseDeps) {} - async execute(request: MemberWorkSyncStatusRequest): Promise { + async execute( + request: MemberWorkSyncStatusRequest, + context: MemberWorkSyncReconcileContext = {} + ): Promise { const source = await this.deps.agendaSource.loadAgenda(request); const agenda = finalizeMemberWorkSyncAgenda(this.deps, source); const previous = await this.deps.statusStore.read(request); const nowIso = this.deps.clock.now().toISOString(); + const teamActive = this.deps.lifecycle + ? await this.deps.lifecycle.isTeamActive(agenda.teamName) + : true; const decision = decideMemberWorkSyncStatus({ agenda, latestAcceptedReport: previous?.report?.accepted ? previous.report : null, nowIso, - inactive: source.inactive, + inactive: source.inactive || !teamActive, }); const status = await attachMemberWorkSyncReportToken(this.deps, { @@ -48,8 +59,25 @@ export class MemberWorkSyncReconciler { state: decision.state, agenda, ...(decision.acceptedReport ? { report: decision.acceptedReport } : {}), + shadow: { + reconciledBy: context.reconciledBy ?? 'request', + wouldNudge: decision.state === 'needs_sync' && agenda.items.length > 0, + fingerprintChanged: + Boolean(previous?.agenda.fingerprint) && + previous?.agenda.fingerprint !== agenda.fingerprint, + ...(previous?.agenda.fingerprint + ? { previousFingerprint: previous.agenda.fingerprint } + : {}), + ...(context.triggerReasons?.length + ? { triggerReasons: [...new Set(context.triggerReasons)].sort() } + : {}), + }, evaluatedAt: nowIso, - diagnostics: [...agenda.diagnostics, ...decision.diagnostics], + diagnostics: [ + ...agenda.diagnostics, + ...(!teamActive ? ['team_runtime_inactive'] : []), + ...decision.diagnostics, + ], ...(source.providerId ? { providerId: source.providerId } : {}), }); diff --git a/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts b/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts index 06c733ae..f04e370f 100644 --- a/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts +++ b/src/features/member-work-sync/core/application/MemberWorkSyncReporter.ts @@ -32,6 +32,18 @@ export class MemberWorkSyncReporter { const nowIso = ( request.reportedAt ? new Date(request.reportedAt) : this.deps.clock.now() ).toISOString(); + const teamActive = this.deps.lifecycle + ? await this.deps.lifecycle.isTeamActive(agenda.teamName) + : true; + if (!teamActive) { + const status = await this.reconciler.execute(request); + return { + accepted: false, + code: 'team_runtime_inactive', + message: 'Team runtime is not active. Restart the team before reporting work sync state.', + status, + }; + } const tokenValidation = this.deps.reportToken ? await this.deps.reportToken.verify({ token: request.reportToken, @@ -86,6 +98,11 @@ export class MemberWorkSyncReporter { : ('still_working' as const), agenda, report, + shadow: { + reconciledBy: 'report', + wouldNudge: false, + fingerprintChanged: false, + }, evaluatedAt: nowIso, diagnostics: [...agenda.diagnostics, 'report_accepted'], ...(source.providerId ? { providerId: source.providerId } : {}), diff --git a/src/features/member-work-sync/core/application/ports.ts b/src/features/member-work-sync/core/application/ports.ts index 1df714db..665fafb1 100644 --- a/src/features/member-work-sync/core/application/ports.ts +++ b/src/features/member-work-sync/core/application/ports.ts @@ -43,6 +43,10 @@ export interface MemberWorkSyncReportTokenPort { ): Promise; } +export interface MemberWorkSyncLifecyclePort { + isTeamActive(teamName: string): Promise | boolean; +} + export interface MemberWorkSyncLoggerPort { debug(message: string, metadata?: Record): void; warn(message: string, metadata?: Record): void; @@ -80,6 +84,7 @@ export interface MemberWorkSyncUseCaseDeps { statusStore: MemberWorkSyncStatusStorePort; reportStore?: MemberWorkSyncReportStorePort; reportToken?: MemberWorkSyncReportTokenPort; + lifecycle?: MemberWorkSyncLifecyclePort; logger?: MemberWorkSyncLoggerPort; } diff --git a/src/features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter.ts b/src/features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter.ts new file mode 100644 index 00000000..24a2a899 --- /dev/null +++ b/src/features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter.ts @@ -0,0 +1,113 @@ +import type { + MemberWorkSyncEventQueue, + MemberWorkSyncTriggerReason, +} from '../../infrastructure/MemberWorkSyncEventQueue'; +import type { TeamChangeEvent, ToolActivityEventPayload } from '@shared/types'; + +interface MemberWorkSyncRosterSource { + loadActiveMemberNames(teamName: string): Promise; +} + +const TEAM_WIDE_REASONS: Partial> = { + config: 'config_changed', + task: 'task_changed', + 'task-log-change': 'runtime_activity', + 'log-source-change': 'runtime_activity', + process: 'runtime_activity', + 'lead-activity': 'runtime_activity', +}; + +function parseInboxRecipient(detail: string | undefined): string | null { + if (!detail) { + return null; + } + const match = /^inboxes\/(.+)\.json$/.exec(detail); + return match?.[1]?.trim() || null; +} + +function parseToolActivity(detail: string | undefined): ToolActivityEventPayload | null { + if (!detail) { + return null; + } + try { + const parsed = JSON.parse(detail) as ToolActivityEventPayload; + return parsed && typeof parsed === 'object' ? parsed : null; + } catch { + return null; + } +} + +export class MemberWorkSyncTeamChangeRouter { + constructor( + private readonly rosterSource: MemberWorkSyncRosterSource, + private readonly queue: MemberWorkSyncEventQueue + ) {} + + async enqueueStartupScan(teamNames: string[]): Promise { + await Promise.allSettled( + teamNames.map((teamName) => this.enqueueTeam(teamName, 'startup_scan', 30_000)) + ); + } + + noteTeamChange(event: TeamChangeEvent): void { + if (event.type === 'lead-activity' && event.detail === 'offline') { + this.queue.dropTeam(event.teamName); + return; + } + + if (event.type === 'member-spawn') { + const memberName = event.detail?.trim(); + if (memberName) { + this.queue.enqueue({ + teamName: event.teamName, + memberName, + triggerReason: 'member_spawned', + runAfterMs: 30_000, + }); + } else { + void this.enqueueTeam(event.teamName, 'member_spawned', 30_000).catch(() => undefined); + } + return; + } + + if (event.type === 'tool-activity') { + const payload = parseToolActivity(event.detail); + if (payload?.action === 'finish' && payload.memberName) { + this.queue.enqueue({ + teamName: event.teamName, + memberName: payload.memberName, + triggerReason: 'tool_finished', + }); + } + return; + } + + if (event.type === 'inbox' || event.type === 'lead-message') { + const recipient = parseInboxRecipient(event.detail); + if (recipient) { + this.queue.enqueue({ + teamName: event.teamName, + memberName: recipient, + triggerReason: 'inbox_changed', + }); + } + return; + } + + const teamWideReason = TEAM_WIDE_REASONS[event.type]; + if (teamWideReason) { + void this.enqueueTeam(event.teamName, teamWideReason).catch(() => undefined); + } + } + + private async enqueueTeam( + teamName: string, + triggerReason: MemberWorkSyncTriggerReason, + runAfterMs?: number + ): Promise { + const activeMembers = await this.rosterSource.loadActiveMemberNames(teamName); + for (const memberName of activeMembers) { + this.queue.enqueue({ teamName, memberName, triggerReason, runAfterMs }); + } + } +} diff --git a/src/features/member-work-sync/main/adapters/output/TeamTaskAgendaSource.ts b/src/features/member-work-sync/main/adapters/output/TeamTaskAgendaSource.ts index 33aae5bc..8293f4cd 100644 --- a/src/features/member-work-sync/main/adapters/output/TeamTaskAgendaSource.ts +++ b/src/features/member-work-sync/main/adapters/output/TeamTaskAgendaSource.ts @@ -1,5 +1,6 @@ import { buildActionableWorkAgenda, + isReservedMemberName, normalizeMemberName, type MemberWorkSyncMemberLike, } from '../../../core/domain'; @@ -65,6 +66,20 @@ function toMemberLike(member: TeamMember): MemberWorkSyncMemberLike { export class TeamTaskAgendaSource implements MemberWorkSyncAgendaSourcePort { constructor(private readonly deps: TeamTaskAgendaSourceDeps) {} + async loadActiveMemberNames(teamName: string): Promise { + const config = await this.deps.configReader.getConfig(teamName); + if (!config || config.deletedAt) { + return []; + } + + const metaMembers = await this.deps.membersMetaStore.getMembers(teamName); + return mergeMembers(config.members ?? [], metaMembers) + .filter((member) => !member.removedAt) + .map((member) => normalizeMemberName(member.name)) + .filter((memberName) => memberName.length > 0 && !isReservedMemberName(memberName)) + .sort((left, right) => left.localeCompare(right)); + } + async loadAgenda(input: { teamName: string; memberName: string; diff --git a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts index d667cf1b..decdd310 100644 --- a/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts +++ b/src/features/member-work-sync/main/composition/createMemberWorkSyncFeature.ts @@ -4,9 +4,19 @@ import type { MemberWorkSyncStatus, MemberWorkSyncStatusRequest, } from '../../contracts'; -import { MemberWorkSyncDiagnosticsReader, MemberWorkSyncReporter } from '../../core/application'; +import { + MemberWorkSyncDiagnosticsReader, + MemberWorkSyncReconciler, + MemberWorkSyncReporter, + type MemberWorkSyncReconcileContext, +} from '../../core/application'; +import { MemberWorkSyncTeamChangeRouter } from '../adapters/input/MemberWorkSyncTeamChangeRouter'; import { TeamTaskAgendaSource } from '../adapters/output/TeamTaskAgendaSource'; import { HmacMemberWorkSyncReportTokenAdapter } from '../infrastructure/HmacMemberWorkSyncReportTokenAdapter'; +import { + MemberWorkSyncEventQueue, + type MemberWorkSyncQueueDiagnostics, +} from '../infrastructure/MemberWorkSyncEventQueue'; import { JsonMemberWorkSyncStore } from '../infrastructure/JsonMemberWorkSyncStore'; import { MemberWorkSyncStorePaths } from '../infrastructure/MemberWorkSyncStorePaths'; import { NodeHashAdapter } from '../infrastructure/NodeHashAdapter'; @@ -16,11 +26,16 @@ import type { TeamConfigReader } from '@main/services/team/TeamConfigReader'; import type { TeamKanbanManager } from '@main/services/team/TeamKanbanManager'; import type { TeamMembersMetaStore } from '@main/services/team/TeamMembersMetaStore'; import type { TeamTaskReader } from '@main/services/team/TeamTaskReader'; +import type { TeamChangeEvent } from '@shared/types'; import type { MemberWorkSyncLoggerPort } from '../../core/application'; export interface MemberWorkSyncFeatureFacade { getStatus(request: MemberWorkSyncStatusRequest): Promise; report(request: MemberWorkSyncReportRequest): Promise; + noteTeamChange(event: TeamChangeEvent): void; + enqueueStartupScan(teamNames: string[]): Promise; + getQueueDiagnostics(): MemberWorkSyncQueueDiagnostics; + dispose(): Promise; } export function createMemberWorkSyncFeature(deps: { @@ -29,6 +44,7 @@ export function createMemberWorkSyncFeature(deps: { taskReader: TeamTaskReader; kanbanManager: TeamKanbanManager; membersMetaStore: TeamMembersMetaStore; + isTeamActive?: (teamName: string) => Promise | boolean; logger?: MemberWorkSyncLoggerPort; }): MemberWorkSyncFeatureFacade { const clock = new SystemClockAdapter(); @@ -51,13 +67,27 @@ export function createMemberWorkSyncFeature(deps: { statusStore: store, reportStore: store, reportToken, + ...(deps.isTeamActive ? { lifecycle: { isTeamActive: deps.isTeamActive } } : {}), logger: deps.logger, }; const diagnosticsReader = new MemberWorkSyncDiagnosticsReader(useCaseDeps); const reporter = new MemberWorkSyncReporter(useCaseDeps); + const reconciler = new MemberWorkSyncReconciler(useCaseDeps); + const queue = new MemberWorkSyncEventQueue({ + reconcile: async (request, context: MemberWorkSyncReconcileContext) => { + await reconciler.execute(request, context); + }, + isTeamActive: deps.isTeamActive ?? (() => true), + logger: deps.logger, + }); + const router = new MemberWorkSyncTeamChangeRouter(agendaSource, queue); return { getStatus: (request) => diagnosticsReader.execute(request), report: (request) => reporter.execute(request), + noteTeamChange: (event) => router.noteTeamChange(event), + enqueueStartupScan: (teamNames) => router.enqueueStartupScan(teamNames), + getQueueDiagnostics: () => queue.getDiagnostics(), + dispose: () => queue.stop(), }; } diff --git a/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts new file mode 100644 index 00000000..6c50a694 --- /dev/null +++ b/src/features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue.ts @@ -0,0 +1,245 @@ +import type { MemberWorkSyncReconcileContext } from '../../core/application/MemberWorkSyncReconciler'; +import type { MemberWorkSyncLoggerPort } from '../../core/application'; + +export type MemberWorkSyncTriggerReason = + | 'startup_scan' + | 'config_changed' + | 'task_changed' + | 'inbox_changed' + | 'member_spawned' + | 'tool_finished' + | 'runtime_activity' + | 'manual_refresh'; + +export interface MemberWorkSyncQueueDiagnostics { + queued: number; + running: number; + enqueued: number; + coalesced: number; + reconciled: number; + dropped: number; + failed: number; +} + +interface QueueItem { + teamName: string; + memberName: string; + runAt: number; + triggerReasons: Set; +} + +interface RunningItem { + rerunRequested: boolean; + triggerReasons: Set; +} + +export interface MemberWorkSyncEventQueueDeps { + reconcile( + input: { teamName: string; memberName: string }, + context: MemberWorkSyncReconcileContext + ): Promise; + isTeamActive(teamName: string): Promise | boolean; + quietWindowMs?: number; + concurrency?: number; + now?: () => number; + logger?: MemberWorkSyncLoggerPort; +} + +function keyOf(teamName: string, memberName: string): string { + return `${teamName}\0${memberName.trim().toLowerCase()}`; +} + +function unrefTimer(timer: ReturnType): void { + timer.unref?.(); +} + +export class MemberWorkSyncEventQueue { + private readonly items = new Map(); + private readonly running = new Map(); + private readonly inFlight = new Set>(); + private readonly quietWindowMs: number; + private readonly concurrency: number; + private readonly now: () => number; + private timer: ReturnType | null = null; + private stopped = false; + private counters = { + enqueued: 0, + coalesced: 0, + reconciled: 0, + dropped: 0, + failed: 0, + }; + + constructor(private readonly deps: MemberWorkSyncEventQueueDeps) { + this.quietWindowMs = deps.quietWindowMs ?? 90_000; + this.concurrency = Math.max(1, deps.concurrency ?? 2); + this.now = deps.now ?? Date.now; + } + + enqueue(input: { + teamName: string; + memberName: string; + triggerReason: MemberWorkSyncTriggerReason; + runAfterMs?: number; + }): void { + if (this.stopped) { + return; + } + + const memberName = input.memberName.trim(); + if (!input.teamName.trim() || !memberName) { + this.counters.dropped += 1; + return; + } + + const key = keyOf(input.teamName, memberName); + const runAt = this.now() + (input.runAfterMs ?? this.quietWindowMs); + const running = this.running.get(key); + if (running) { + running.rerunRequested = true; + running.triggerReasons.add(input.triggerReason); + this.counters.coalesced += 1; + return; + } + + const existing = this.items.get(key); + if (existing) { + existing.triggerReasons.add(input.triggerReason); + existing.runAt = Math.max(existing.runAt, runAt); + this.counters.coalesced += 1; + this.schedule(); + return; + } + + this.items.set(key, { + teamName: input.teamName, + memberName, + runAt, + triggerReasons: new Set([input.triggerReason]), + }); + this.counters.enqueued += 1; + this.schedule(); + } + + dropTeam(teamName: string): void { + for (const [key, item] of this.items) { + if (item.teamName === teamName) { + this.items.delete(key); + this.counters.dropped += 1; + } + } + this.schedule(); + } + + getDiagnostics(): MemberWorkSyncQueueDiagnostics { + return { + queued: this.items.size, + running: this.running.size, + ...this.counters, + }; + } + + async stop(): Promise { + this.stopped = true; + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + this.items.clear(); + await Promise.allSettled([...this.inFlight]); + } + + private schedule(): void { + if (this.stopped) { + return; + } + if (this.timer) { + clearTimeout(this.timer); + this.timer = null; + } + if (this.items.size === 0) { + return; + } + if (this.running.size >= this.concurrency) { + return; + } + + const nextRunAt = Math.min(...[...this.items.values()].map((item) => item.runAt)); + const delayMs = Math.max(0, nextRunAt - this.now()); + this.timer = setTimeout(() => { + this.timer = null; + this.pump(); + }, delayMs); + unrefTimer(this.timer); + } + + private pump(): void { + if (this.stopped) { + return; + } + + const due = [...this.items.entries()] + .filter(([, item]) => item.runAt <= this.now()) + .sort((left, right) => left[1].runAt - right[1].runAt); + + for (const [key, item] of due) { + if (this.running.size >= this.concurrency) { + break; + } + this.items.delete(key); + this.runItem(key, item); + } + + this.schedule(); + } + + private runItem(key: string, item: QueueItem): void { + const running: RunningItem = { + rerunRequested: false, + triggerReasons: new Set(item.triggerReasons), + }; + this.running.set(key, running); + + const promise = this.executeItem(key, item, running) + .catch((error: unknown) => { + this.counters.failed += 1; + this.deps.logger?.warn('member work sync queue reconcile failed', { + teamName: item.teamName, + memberName: item.memberName, + error: String(error), + }); + }) + .finally(() => { + this.running.delete(key); + this.inFlight.delete(promise); + if (running.rerunRequested && !this.stopped) { + for (const reason of running.triggerReasons) { + this.enqueue({ + teamName: item.teamName, + memberName: item.memberName, + triggerReason: reason, + }); + } + } + this.pump(); + }); + + this.inFlight.add(promise); + } + + private async executeItem(_key: string, item: QueueItem, running: RunningItem): Promise { + if (!(await this.deps.isTeamActive(item.teamName))) { + this.counters.dropped += 1; + return; + } + + await this.deps.reconcile( + { teamName: item.teamName, memberName: item.memberName }, + { + reconciledBy: 'queue', + triggerReasons: [...running.triggerReasons].sort(), + } + ); + this.counters.reconciled += 1; + } +} diff --git a/src/main/index.ts b/src/main/index.ts index f7669d44..399c8f2c 100644 --- a/src/main/index.ts +++ b/src/main/index.ts @@ -567,7 +567,7 @@ let codexAccountFeature: CodexAccountFeatureFacade | null = null; let codexModelCatalogFeature: CodexModelCatalogFeatureFacade | null = null; let recentProjectsFeature: RecentProjectsFeatureFacade; let runtimeProviderManagementFeature: RuntimeProviderManagementFeatureFacade; -let memberWorkSyncFeature: MemberWorkSyncFeatureFacade; +let memberWorkSyncFeature: MemberWorkSyncFeatureFacade | null = null; let teamDataService: TeamDataService; let teamProvisioningService: TeamProvisioningService; let cliInstallerService: CliInstallerService; @@ -787,6 +787,7 @@ function wireFileWatcherEvents(context: ServiceContext): void { if (typeof row.teamName !== 'string' || row.teamName.trim().length === 0) return; const teamName = row.teamName.trim(); const detail = typeof row.detail === 'string' ? row.detail : ''; + memberWorkSyncFeature?.noteTeamChange(row as TeamChangeEvent); if ( teamDataService && @@ -1182,6 +1183,7 @@ async function initializeServices(): Promise { const teamChangeEmitter = (event: TeamChangeEvent): void => { forwardTeamChange(event); teamTaskStallMonitor?.noteTeamChange(event); + memberWorkSyncFeature?.noteTeamChange(event); if (event.type === 'lead-activity' && event.detail === 'offline') { teammateToolTracker?.handleTeamOffline(event.teamName); } @@ -1231,8 +1233,21 @@ async function initializeServices(): Promise { taskReader: new TeamTaskReader(), kanbanManager: new TeamKanbanManager(), membersMetaStore: new TeamMembersMetaStore(), + isTeamActive: (teamName) => + teamProvisioningService.isTeamAlive(teamName) || + teamProvisioningService.hasProvisioningRun(teamName), logger: createLogger('Feature:MemberWorkSync'), }); + void teamDataService + .listTeams() + .then((teams) => + memberWorkSyncFeature?.enqueueStartupScan( + teams.filter((team) => !team.deletedAt).map((team) => team.teamName) + ) + ) + .catch((error: unknown) => + logger.warn(`[Init] Member work sync startup scan failed: ${String(error)}`) + ); codexAccountFeature = createCodexAccountFeature({ logger: createLogger('Feature:CodexAccount'), configManager, @@ -1354,7 +1369,7 @@ async function startHttpServer( chunkBuilder: activeContext.chunkBuilder, dataCache: activeContext.dataCache, recentProjectsFeature, - memberWorkSyncFeature, + memberWorkSyncFeature: memberWorkSyncFeature ?? undefined, updaterService, sshConnectionManager, teamDataService, @@ -1477,6 +1492,8 @@ async function shutdownServices(): Promise { codexModelCatalogFeature = null; await runShutdownStep('Codex account dispose', () => codexAccountFeature?.dispose()); codexAccountFeature = null; + await runShutdownStep('member work sync dispose', () => memberWorkSyncFeature?.dispose()); + memberWorkSyncFeature = null; if (ptyTerminalService) { await runShutdownStep('PTY terminals kill', () => ptyTerminalService.killAll()); diff --git a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts index afa598e9..2999fc71 100644 --- a/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts +++ b/test/features/member-work-sync/core/MemberWorkSyncUseCases.test.ts @@ -60,6 +60,7 @@ function createDeps(options?: { items?: MemberWorkSyncActionableWorkItem[]; activeMemberNames?: string[]; inactive?: boolean; + teamActive?: boolean; providerId?: 'opencode' | 'codex'; }) { const clock = new MutableClock(); @@ -97,6 +98,9 @@ function createDeps(options?: { ? { ok: true } : { ok: false, reason: input.token ? 'invalid' : 'missing' }, }, + lifecycle: { + isTeamActive: () => options?.teamActive ?? true, + }, }; return { clock, deps, source, store }; } @@ -112,6 +116,12 @@ describe('MemberWorkSync use cases', () => { expect(status.state).toBe('needs_sync'); expect(status.agenda.items).toEqual([workItem]); expect(status.diagnostics).toContain('no_current_report'); + expect(status.reportToken).toBe(`token:team-a:bob:${status.agenda.fingerprint}`); + expect(status.shadow).toMatchObject({ + reconciledBy: 'request', + wouldNudge: true, + fingerprintChanged: false, + }); expect(store.pendingReports).toEqual([]); }); @@ -134,6 +144,7 @@ describe('MemberWorkSync use cases', () => { expect(result.accepted).toBe(true); expect(result.status.state).toBe('still_working'); + expect(result.status.shadow).toMatchObject({ reconciledBy: 'report', wouldNudge: false }); clock.set('2026-04-29T00:01:59.000Z'); expect((await reader.execute({ teamName: 'team-a', memberName: 'bob' })).state).toBe( @@ -182,6 +193,41 @@ describe('MemberWorkSync use cases', () => { expect(result.status.state).toBe('caught_up'); }); + it('marks status inactive when the team runtime is not active', async () => { + const { deps } = createDeps({ teamActive: false }); + const status = await new MemberWorkSyncDiagnosticsReader(deps).execute({ + teamName: 'team-a', + memberName: 'bob', + }); + + expect(status.state).toBe('inactive'); + expect(status.diagnostics).toContain('team_runtime_inactive'); + expect(status.shadow?.wouldNudge).toBe(false); + }); + + it('records fingerprint transitions without treating them as progress proof', async () => { + const { deps, source } = createDeps(); + const reader = new MemberWorkSyncDiagnosticsReader(deps); + await reader.execute({ teamName: 'team-a', memberName: 'bob' }); + + source.agenda.items = [ + { + ...workItem, + taskId: 'task-2', + displayId: '22222222', + subject: 'New work', + }, + ]; + const changed = await reader.execute({ teamName: 'team-a', memberName: 'bob' }); + + expect(changed.shadow).toMatchObject({ + fingerprintChanged: true, + wouldNudge: true, + }); + expect(changed.shadow?.previousFingerprint).toMatch(/^agenda:v1:/); + expect(changed.state).toBe('needs_sync'); + }); + it('rejects invalid report tokens without recording replayable intents', async () => { const { deps, store } = createDeps(); const reader = new MemberWorkSyncDiagnosticsReader(deps); diff --git a/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts b/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts new file mode 100644 index 00000000..dce25bb0 --- /dev/null +++ b/test/features/member-work-sync/main/MemberWorkSyncEventQueue.test.ts @@ -0,0 +1,123 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { MemberWorkSyncEventQueue } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue'; + +describe('MemberWorkSyncEventQueue', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('coalesces duplicate member events into one queue reconcile', async () => { + const reconciles: unknown[] = []; + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 100, + reconcile: async (request, context) => { + reconciles.push({ request, context }); + }, + isTeamActive: () => true, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' }); + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'inbox_changed' }); + + await vi.advanceTimersByTimeAsync(100); + + expect(reconciles).toHaveLength(1); + expect(reconciles[0]).toMatchObject({ + request: { teamName: 'team-a', memberName: 'bob' }, + context: { + reconciledBy: 'queue', + triggerReasons: ['inbox_changed', 'task_changed'], + }, + }); + expect(queue.getDiagnostics()).toMatchObject({ reconciled: 1, coalesced: 1 }); + await queue.stop(); + }); + + it('drops queued work for inactive teams without reconciling', async () => { + const reconcile = vi.fn(); + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 1, + reconcile, + isTeamActive: () => false, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' }); + await vi.advanceTimersByTimeAsync(1); + + expect(reconcile).not.toHaveBeenCalled(); + expect(queue.getDiagnostics()).toMatchObject({ dropped: 1, reconciled: 0 }); + await queue.stop(); + }); + + it('runs one follow-up pass when events arrive during an active reconcile', async () => { + let release: () => void = () => { + throw new Error('reconcile did not start'); + }; + const reconciles: unknown[] = []; + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 1, + reconcile: async (request, context) => { + reconciles.push({ request, context }); + if (reconciles.length === 1) { + await new Promise((resolve) => { + release = resolve; + }); + } + }, + isTeamActive: () => true, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' }); + await vi.advanceTimersByTimeAsync(1); + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'tool_finished' }); + + release(); + await vi.advanceTimersByTimeAsync(1); + + expect(reconciles).toHaveLength(2); + expect(reconciles[1]).toMatchObject({ + context: { reconciledBy: 'queue', triggerReasons: ['task_changed', 'tool_finished'] }, + }); + await queue.stop(); + }); + + it('does not spin timers while concurrency is saturated', async () => { + let release: () => void = () => { + throw new Error('reconcile did not start'); + }; + const reconciles: unknown[] = []; + const queue = new MemberWorkSyncEventQueue({ + quietWindowMs: 1, + concurrency: 1, + reconcile: async (request) => { + reconciles.push(request); + if (reconciles.length === 1) { + await new Promise((resolve) => { + release = resolve; + }); + } + }, + isTeamActive: () => true, + }); + + queue.enqueue({ teamName: 'team-a', memberName: 'alice', triggerReason: 'task_changed' }); + queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' }); + + await vi.advanceTimersByTimeAsync(1); + await vi.advanceTimersByTimeAsync(1_000); + + expect(reconciles).toHaveLength(1); + expect(queue.getDiagnostics()).toMatchObject({ queued: 1, running: 1 }); + + release(); + await vi.advanceTimersByTimeAsync(1); + + expect(reconciles).toHaveLength(2); + await queue.stop(); + }); +}); diff --git a/test/features/member-work-sync/main/MemberWorkSyncTeamChangeRouter.test.ts b/test/features/member-work-sync/main/MemberWorkSyncTeamChangeRouter.test.ts new file mode 100644 index 00000000..f1b3421e --- /dev/null +++ b/test/features/member-work-sync/main/MemberWorkSyncTeamChangeRouter.test.ts @@ -0,0 +1,70 @@ +import { describe, expect, it, vi } from 'vitest'; + +import { MemberWorkSyncTeamChangeRouter } from '@features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter'; + +function createRouter(activeMembers: string[] = ['alice', 'bob']) { + const queue = { + enqueue: vi.fn(), + dropTeam: vi.fn(), + }; + const router = new MemberWorkSyncTeamChangeRouter( + { + loadActiveMemberNames: async () => activeMembers, + }, + queue as never + ); + return { queue, router }; +} + +describe('MemberWorkSyncTeamChangeRouter', () => { + it('routes task and config events to all active members', async () => { + const { queue, router } = createRouter(); + + router.noteTeamChange({ type: 'task', teamName: 'team-a', detail: 'task-1.json' }); + await Promise.resolve(); + + expect(queue.enqueue).toHaveBeenCalledWith({ + teamName: 'team-a', + memberName: 'alice', + triggerReason: 'task_changed', + runAfterMs: undefined, + }); + expect(queue.enqueue).toHaveBeenCalledWith({ + teamName: 'team-a', + memberName: 'bob', + triggerReason: 'task_changed', + runAfterMs: undefined, + }); + }); + + it('routes inbox and tool-finish events to the addressed member only', () => { + const { queue, router } = createRouter(); + + router.noteTeamChange({ type: 'inbox', teamName: 'team-a', detail: 'inboxes/bob.json' }); + router.noteTeamChange({ + type: 'tool-activity', + teamName: 'team-a', + detail: JSON.stringify({ action: 'finish', memberName: 'alice', toolUseId: 'tool-1' }), + }); + + expect(queue.enqueue).toHaveBeenCalledWith({ + teamName: 'team-a', + memberName: 'bob', + triggerReason: 'inbox_changed', + }); + expect(queue.enqueue).toHaveBeenCalledWith({ + teamName: 'team-a', + memberName: 'alice', + triggerReason: 'tool_finished', + }); + }); + + it('drops queued work when the team goes offline', () => { + const { queue, router } = createRouter(); + + router.noteTeamChange({ type: 'lead-activity', teamName: 'team-a', detail: 'offline' }); + + expect(queue.dropTeam).toHaveBeenCalledWith('team-a'); + expect(queue.enqueue).not.toHaveBeenCalled(); + }); +});