feat(member-work-sync): add passive event queue
This commit is contained in:
parent
b346a1146c
commit
10e405573e
12 changed files with 724 additions and 6 deletions
|
|
@ -66,6 +66,14 @@ export interface MemberWorkSyncReport {
|
|||
rejectionCode?: string;
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncShadowDiagnostics {
|
||||
reconciledBy: 'request' | 'queue' | 'report';
|
||||
wouldNudge: boolean;
|
||||
fingerprintChanged: boolean;
|
||||
previousFingerprint?: string;
|
||||
triggerReasons?: string[];
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncStatus {
|
||||
teamName: string;
|
||||
memberName: string;
|
||||
|
|
@ -74,6 +82,7 @@ export interface MemberWorkSyncStatus {
|
|||
report?: MemberWorkSyncReport;
|
||||
reportToken?: string;
|
||||
reportTokenExpiresAt?: string;
|
||||
shadow?: MemberWorkSyncShadowDiagnostics;
|
||||
evaluatedAt: string;
|
||||
diagnostics: string[];
|
||||
providerId?: MemberWorkSyncProviderId;
|
||||
|
|
|
|||
|
|
@ -7,6 +7,11 @@ import {
|
|||
import type { MemberWorkSyncStatus, MemberWorkSyncStatusRequest } from '../../contracts';
|
||||
import type { MemberWorkSyncAgendaSourceResult, MemberWorkSyncUseCaseDeps } from './ports';
|
||||
|
||||
export interface MemberWorkSyncReconcileContext {
|
||||
reconciledBy?: 'request' | 'queue';
|
||||
triggerReasons?: string[];
|
||||
}
|
||||
|
||||
export function finalizeMemberWorkSyncAgenda(
|
||||
deps: MemberWorkSyncUseCaseDeps,
|
||||
source: MemberWorkSyncAgendaSourceResult
|
||||
|
|
@ -30,16 +35,22 @@ export function finalizeMemberWorkSyncAgenda(
|
|||
export class MemberWorkSyncReconciler {
|
||||
constructor(private readonly deps: MemberWorkSyncUseCaseDeps) {}
|
||||
|
||||
async execute(request: MemberWorkSyncStatusRequest): Promise<MemberWorkSyncStatus> {
|
||||
async execute(
|
||||
request: MemberWorkSyncStatusRequest,
|
||||
context: MemberWorkSyncReconcileContext = {}
|
||||
): Promise<MemberWorkSyncStatus> {
|
||||
const source = await this.deps.agendaSource.loadAgenda(request);
|
||||
const agenda = finalizeMemberWorkSyncAgenda(this.deps, source);
|
||||
const previous = await this.deps.statusStore.read(request);
|
||||
const nowIso = this.deps.clock.now().toISOString();
|
||||
const teamActive = this.deps.lifecycle
|
||||
? await this.deps.lifecycle.isTeamActive(agenda.teamName)
|
||||
: true;
|
||||
const decision = decideMemberWorkSyncStatus({
|
||||
agenda,
|
||||
latestAcceptedReport: previous?.report?.accepted ? previous.report : null,
|
||||
nowIso,
|
||||
inactive: source.inactive,
|
||||
inactive: source.inactive || !teamActive,
|
||||
});
|
||||
|
||||
const status = await attachMemberWorkSyncReportToken(this.deps, {
|
||||
|
|
@ -48,8 +59,25 @@ export class MemberWorkSyncReconciler {
|
|||
state: decision.state,
|
||||
agenda,
|
||||
...(decision.acceptedReport ? { report: decision.acceptedReport } : {}),
|
||||
shadow: {
|
||||
reconciledBy: context.reconciledBy ?? 'request',
|
||||
wouldNudge: decision.state === 'needs_sync' && agenda.items.length > 0,
|
||||
fingerprintChanged:
|
||||
Boolean(previous?.agenda.fingerprint) &&
|
||||
previous?.agenda.fingerprint !== agenda.fingerprint,
|
||||
...(previous?.agenda.fingerprint
|
||||
? { previousFingerprint: previous.agenda.fingerprint }
|
||||
: {}),
|
||||
...(context.triggerReasons?.length
|
||||
? { triggerReasons: [...new Set(context.triggerReasons)].sort() }
|
||||
: {}),
|
||||
},
|
||||
evaluatedAt: nowIso,
|
||||
diagnostics: [...agenda.diagnostics, ...decision.diagnostics],
|
||||
diagnostics: [
|
||||
...agenda.diagnostics,
|
||||
...(!teamActive ? ['team_runtime_inactive'] : []),
|
||||
...decision.diagnostics,
|
||||
],
|
||||
...(source.providerId ? { providerId: source.providerId } : {}),
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -32,6 +32,18 @@ export class MemberWorkSyncReporter {
|
|||
const nowIso = (
|
||||
request.reportedAt ? new Date(request.reportedAt) : this.deps.clock.now()
|
||||
).toISOString();
|
||||
const teamActive = this.deps.lifecycle
|
||||
? await this.deps.lifecycle.isTeamActive(agenda.teamName)
|
||||
: true;
|
||||
if (!teamActive) {
|
||||
const status = await this.reconciler.execute(request);
|
||||
return {
|
||||
accepted: false,
|
||||
code: 'team_runtime_inactive',
|
||||
message: 'Team runtime is not active. Restart the team before reporting work sync state.',
|
||||
status,
|
||||
};
|
||||
}
|
||||
const tokenValidation = this.deps.reportToken
|
||||
? await this.deps.reportToken.verify({
|
||||
token: request.reportToken,
|
||||
|
|
@ -86,6 +98,11 @@ export class MemberWorkSyncReporter {
|
|||
: ('still_working' as const),
|
||||
agenda,
|
||||
report,
|
||||
shadow: {
|
||||
reconciledBy: 'report',
|
||||
wouldNudge: false,
|
||||
fingerprintChanged: false,
|
||||
},
|
||||
evaluatedAt: nowIso,
|
||||
diagnostics: [...agenda.diagnostics, 'report_accepted'],
|
||||
...(source.providerId ? { providerId: source.providerId } : {}),
|
||||
|
|
|
|||
|
|
@ -43,6 +43,10 @@ export interface MemberWorkSyncReportTokenPort {
|
|||
): Promise<MemberWorkSyncReportTokenVerification>;
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncLifecyclePort {
|
||||
isTeamActive(teamName: string): Promise<boolean> | boolean;
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncLoggerPort {
|
||||
debug(message: string, metadata?: Record<string, unknown>): void;
|
||||
warn(message: string, metadata?: Record<string, unknown>): void;
|
||||
|
|
@ -80,6 +84,7 @@ export interface MemberWorkSyncUseCaseDeps {
|
|||
statusStore: MemberWorkSyncStatusStorePort;
|
||||
reportStore?: MemberWorkSyncReportStorePort;
|
||||
reportToken?: MemberWorkSyncReportTokenPort;
|
||||
lifecycle?: MemberWorkSyncLifecyclePort;
|
||||
logger?: MemberWorkSyncLoggerPort;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,113 @@
|
|||
import type {
|
||||
MemberWorkSyncEventQueue,
|
||||
MemberWorkSyncTriggerReason,
|
||||
} from '../../infrastructure/MemberWorkSyncEventQueue';
|
||||
import type { TeamChangeEvent, ToolActivityEventPayload } from '@shared/types';
|
||||
|
||||
interface MemberWorkSyncRosterSource {
|
||||
loadActiveMemberNames(teamName: string): Promise<string[]>;
|
||||
}
|
||||
|
||||
const TEAM_WIDE_REASONS: Partial<Record<TeamChangeEvent['type'], MemberWorkSyncTriggerReason>> = {
|
||||
config: 'config_changed',
|
||||
task: 'task_changed',
|
||||
'task-log-change': 'runtime_activity',
|
||||
'log-source-change': 'runtime_activity',
|
||||
process: 'runtime_activity',
|
||||
'lead-activity': 'runtime_activity',
|
||||
};
|
||||
|
||||
function parseInboxRecipient(detail: string | undefined): string | null {
|
||||
if (!detail) {
|
||||
return null;
|
||||
}
|
||||
const match = /^inboxes\/(.+)\.json$/.exec(detail);
|
||||
return match?.[1]?.trim() || null;
|
||||
}
|
||||
|
||||
function parseToolActivity(detail: string | undefined): ToolActivityEventPayload | null {
|
||||
if (!detail) {
|
||||
return null;
|
||||
}
|
||||
try {
|
||||
const parsed = JSON.parse(detail) as ToolActivityEventPayload;
|
||||
return parsed && typeof parsed === 'object' ? parsed : null;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
export class MemberWorkSyncTeamChangeRouter {
|
||||
constructor(
|
||||
private readonly rosterSource: MemberWorkSyncRosterSource,
|
||||
private readonly queue: MemberWorkSyncEventQueue
|
||||
) {}
|
||||
|
||||
async enqueueStartupScan(teamNames: string[]): Promise<void> {
|
||||
await Promise.allSettled(
|
||||
teamNames.map((teamName) => this.enqueueTeam(teamName, 'startup_scan', 30_000))
|
||||
);
|
||||
}
|
||||
|
||||
noteTeamChange(event: TeamChangeEvent): void {
|
||||
if (event.type === 'lead-activity' && event.detail === 'offline') {
|
||||
this.queue.dropTeam(event.teamName);
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type === 'member-spawn') {
|
||||
const memberName = event.detail?.trim();
|
||||
if (memberName) {
|
||||
this.queue.enqueue({
|
||||
teamName: event.teamName,
|
||||
memberName,
|
||||
triggerReason: 'member_spawned',
|
||||
runAfterMs: 30_000,
|
||||
});
|
||||
} else {
|
||||
void this.enqueueTeam(event.teamName, 'member_spawned', 30_000).catch(() => undefined);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type === 'tool-activity') {
|
||||
const payload = parseToolActivity(event.detail);
|
||||
if (payload?.action === 'finish' && payload.memberName) {
|
||||
this.queue.enqueue({
|
||||
teamName: event.teamName,
|
||||
memberName: payload.memberName,
|
||||
triggerReason: 'tool_finished',
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.type === 'inbox' || event.type === 'lead-message') {
|
||||
const recipient = parseInboxRecipient(event.detail);
|
||||
if (recipient) {
|
||||
this.queue.enqueue({
|
||||
teamName: event.teamName,
|
||||
memberName: recipient,
|
||||
triggerReason: 'inbox_changed',
|
||||
});
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const teamWideReason = TEAM_WIDE_REASONS[event.type];
|
||||
if (teamWideReason) {
|
||||
void this.enqueueTeam(event.teamName, teamWideReason).catch(() => undefined);
|
||||
}
|
||||
}
|
||||
|
||||
private async enqueueTeam(
|
||||
teamName: string,
|
||||
triggerReason: MemberWorkSyncTriggerReason,
|
||||
runAfterMs?: number
|
||||
): Promise<void> {
|
||||
const activeMembers = await this.rosterSource.loadActiveMemberNames(teamName);
|
||||
for (const memberName of activeMembers) {
|
||||
this.queue.enqueue({ teamName, memberName, triggerReason, runAfterMs });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
import {
|
||||
buildActionableWorkAgenda,
|
||||
isReservedMemberName,
|
||||
normalizeMemberName,
|
||||
type MemberWorkSyncMemberLike,
|
||||
} from '../../../core/domain';
|
||||
|
|
@ -65,6 +66,20 @@ function toMemberLike(member: TeamMember): MemberWorkSyncMemberLike {
|
|||
export class TeamTaskAgendaSource implements MemberWorkSyncAgendaSourcePort {
|
||||
constructor(private readonly deps: TeamTaskAgendaSourceDeps) {}
|
||||
|
||||
async loadActiveMemberNames(teamName: string): Promise<string[]> {
|
||||
const config = await this.deps.configReader.getConfig(teamName);
|
||||
if (!config || config.deletedAt) {
|
||||
return [];
|
||||
}
|
||||
|
||||
const metaMembers = await this.deps.membersMetaStore.getMembers(teamName);
|
||||
return mergeMembers(config.members ?? [], metaMembers)
|
||||
.filter((member) => !member.removedAt)
|
||||
.map((member) => normalizeMemberName(member.name))
|
||||
.filter((memberName) => memberName.length > 0 && !isReservedMemberName(memberName))
|
||||
.sort((left, right) => left.localeCompare(right));
|
||||
}
|
||||
|
||||
async loadAgenda(input: {
|
||||
teamName: string;
|
||||
memberName: string;
|
||||
|
|
|
|||
|
|
@ -4,9 +4,19 @@ import type {
|
|||
MemberWorkSyncStatus,
|
||||
MemberWorkSyncStatusRequest,
|
||||
} from '../../contracts';
|
||||
import { MemberWorkSyncDiagnosticsReader, MemberWorkSyncReporter } from '../../core/application';
|
||||
import {
|
||||
MemberWorkSyncDiagnosticsReader,
|
||||
MemberWorkSyncReconciler,
|
||||
MemberWorkSyncReporter,
|
||||
type MemberWorkSyncReconcileContext,
|
||||
} from '../../core/application';
|
||||
import { MemberWorkSyncTeamChangeRouter } from '../adapters/input/MemberWorkSyncTeamChangeRouter';
|
||||
import { TeamTaskAgendaSource } from '../adapters/output/TeamTaskAgendaSource';
|
||||
import { HmacMemberWorkSyncReportTokenAdapter } from '../infrastructure/HmacMemberWorkSyncReportTokenAdapter';
|
||||
import {
|
||||
MemberWorkSyncEventQueue,
|
||||
type MemberWorkSyncQueueDiagnostics,
|
||||
} from '../infrastructure/MemberWorkSyncEventQueue';
|
||||
import { JsonMemberWorkSyncStore } from '../infrastructure/JsonMemberWorkSyncStore';
|
||||
import { MemberWorkSyncStorePaths } from '../infrastructure/MemberWorkSyncStorePaths';
|
||||
import { NodeHashAdapter } from '../infrastructure/NodeHashAdapter';
|
||||
|
|
@ -16,11 +26,16 @@ import type { TeamConfigReader } from '@main/services/team/TeamConfigReader';
|
|||
import type { TeamKanbanManager } from '@main/services/team/TeamKanbanManager';
|
||||
import type { TeamMembersMetaStore } from '@main/services/team/TeamMembersMetaStore';
|
||||
import type { TeamTaskReader } from '@main/services/team/TeamTaskReader';
|
||||
import type { TeamChangeEvent } from '@shared/types';
|
||||
import type { MemberWorkSyncLoggerPort } from '../../core/application';
|
||||
|
||||
export interface MemberWorkSyncFeatureFacade {
|
||||
getStatus(request: MemberWorkSyncStatusRequest): Promise<MemberWorkSyncStatus>;
|
||||
report(request: MemberWorkSyncReportRequest): Promise<MemberWorkSyncReportResult>;
|
||||
noteTeamChange(event: TeamChangeEvent): void;
|
||||
enqueueStartupScan(teamNames: string[]): Promise<void>;
|
||||
getQueueDiagnostics(): MemberWorkSyncQueueDiagnostics;
|
||||
dispose(): Promise<void>;
|
||||
}
|
||||
|
||||
export function createMemberWorkSyncFeature(deps: {
|
||||
|
|
@ -29,6 +44,7 @@ export function createMemberWorkSyncFeature(deps: {
|
|||
taskReader: TeamTaskReader;
|
||||
kanbanManager: TeamKanbanManager;
|
||||
membersMetaStore: TeamMembersMetaStore;
|
||||
isTeamActive?: (teamName: string) => Promise<boolean> | boolean;
|
||||
logger?: MemberWorkSyncLoggerPort;
|
||||
}): MemberWorkSyncFeatureFacade {
|
||||
const clock = new SystemClockAdapter();
|
||||
|
|
@ -51,13 +67,27 @@ export function createMemberWorkSyncFeature(deps: {
|
|||
statusStore: store,
|
||||
reportStore: store,
|
||||
reportToken,
|
||||
...(deps.isTeamActive ? { lifecycle: { isTeamActive: deps.isTeamActive } } : {}),
|
||||
logger: deps.logger,
|
||||
};
|
||||
const diagnosticsReader = new MemberWorkSyncDiagnosticsReader(useCaseDeps);
|
||||
const reporter = new MemberWorkSyncReporter(useCaseDeps);
|
||||
const reconciler = new MemberWorkSyncReconciler(useCaseDeps);
|
||||
const queue = new MemberWorkSyncEventQueue({
|
||||
reconcile: async (request, context: MemberWorkSyncReconcileContext) => {
|
||||
await reconciler.execute(request, context);
|
||||
},
|
||||
isTeamActive: deps.isTeamActive ?? (() => true),
|
||||
logger: deps.logger,
|
||||
});
|
||||
const router = new MemberWorkSyncTeamChangeRouter(agendaSource, queue);
|
||||
|
||||
return {
|
||||
getStatus: (request) => diagnosticsReader.execute(request),
|
||||
report: (request) => reporter.execute(request),
|
||||
noteTeamChange: (event) => router.noteTeamChange(event),
|
||||
enqueueStartupScan: (teamNames) => router.enqueueStartupScan(teamNames),
|
||||
getQueueDiagnostics: () => queue.getDiagnostics(),
|
||||
dispose: () => queue.stop(),
|
||||
};
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,245 @@
|
|||
import type { MemberWorkSyncReconcileContext } from '../../core/application/MemberWorkSyncReconciler';
|
||||
import type { MemberWorkSyncLoggerPort } from '../../core/application';
|
||||
|
||||
export type MemberWorkSyncTriggerReason =
|
||||
| 'startup_scan'
|
||||
| 'config_changed'
|
||||
| 'task_changed'
|
||||
| 'inbox_changed'
|
||||
| 'member_spawned'
|
||||
| 'tool_finished'
|
||||
| 'runtime_activity'
|
||||
| 'manual_refresh';
|
||||
|
||||
export interface MemberWorkSyncQueueDiagnostics {
|
||||
queued: number;
|
||||
running: number;
|
||||
enqueued: number;
|
||||
coalesced: number;
|
||||
reconciled: number;
|
||||
dropped: number;
|
||||
failed: number;
|
||||
}
|
||||
|
||||
interface QueueItem {
|
||||
teamName: string;
|
||||
memberName: string;
|
||||
runAt: number;
|
||||
triggerReasons: Set<MemberWorkSyncTriggerReason>;
|
||||
}
|
||||
|
||||
interface RunningItem {
|
||||
rerunRequested: boolean;
|
||||
triggerReasons: Set<MemberWorkSyncTriggerReason>;
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncEventQueueDeps {
|
||||
reconcile(
|
||||
input: { teamName: string; memberName: string },
|
||||
context: MemberWorkSyncReconcileContext
|
||||
): Promise<void>;
|
||||
isTeamActive(teamName: string): Promise<boolean> | boolean;
|
||||
quietWindowMs?: number;
|
||||
concurrency?: number;
|
||||
now?: () => number;
|
||||
logger?: MemberWorkSyncLoggerPort;
|
||||
}
|
||||
|
||||
function keyOf(teamName: string, memberName: string): string {
|
||||
return `${teamName}\0${memberName.trim().toLowerCase()}`;
|
||||
}
|
||||
|
||||
function unrefTimer(timer: ReturnType<typeof setTimeout>): void {
|
||||
timer.unref?.();
|
||||
}
|
||||
|
||||
export class MemberWorkSyncEventQueue {
|
||||
private readonly items = new Map<string, QueueItem>();
|
||||
private readonly running = new Map<string, RunningItem>();
|
||||
private readonly inFlight = new Set<Promise<void>>();
|
||||
private readonly quietWindowMs: number;
|
||||
private readonly concurrency: number;
|
||||
private readonly now: () => number;
|
||||
private timer: ReturnType<typeof setTimeout> | null = null;
|
||||
private stopped = false;
|
||||
private counters = {
|
||||
enqueued: 0,
|
||||
coalesced: 0,
|
||||
reconciled: 0,
|
||||
dropped: 0,
|
||||
failed: 0,
|
||||
};
|
||||
|
||||
constructor(private readonly deps: MemberWorkSyncEventQueueDeps) {
|
||||
this.quietWindowMs = deps.quietWindowMs ?? 90_000;
|
||||
this.concurrency = Math.max(1, deps.concurrency ?? 2);
|
||||
this.now = deps.now ?? Date.now;
|
||||
}
|
||||
|
||||
enqueue(input: {
|
||||
teamName: string;
|
||||
memberName: string;
|
||||
triggerReason: MemberWorkSyncTriggerReason;
|
||||
runAfterMs?: number;
|
||||
}): void {
|
||||
if (this.stopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
const memberName = input.memberName.trim();
|
||||
if (!input.teamName.trim() || !memberName) {
|
||||
this.counters.dropped += 1;
|
||||
return;
|
||||
}
|
||||
|
||||
const key = keyOf(input.teamName, memberName);
|
||||
const runAt = this.now() + (input.runAfterMs ?? this.quietWindowMs);
|
||||
const running = this.running.get(key);
|
||||
if (running) {
|
||||
running.rerunRequested = true;
|
||||
running.triggerReasons.add(input.triggerReason);
|
||||
this.counters.coalesced += 1;
|
||||
return;
|
||||
}
|
||||
|
||||
const existing = this.items.get(key);
|
||||
if (existing) {
|
||||
existing.triggerReasons.add(input.triggerReason);
|
||||
existing.runAt = Math.max(existing.runAt, runAt);
|
||||
this.counters.coalesced += 1;
|
||||
this.schedule();
|
||||
return;
|
||||
}
|
||||
|
||||
this.items.set(key, {
|
||||
teamName: input.teamName,
|
||||
memberName,
|
||||
runAt,
|
||||
triggerReasons: new Set([input.triggerReason]),
|
||||
});
|
||||
this.counters.enqueued += 1;
|
||||
this.schedule();
|
||||
}
|
||||
|
||||
dropTeam(teamName: string): void {
|
||||
for (const [key, item] of this.items) {
|
||||
if (item.teamName === teamName) {
|
||||
this.items.delete(key);
|
||||
this.counters.dropped += 1;
|
||||
}
|
||||
}
|
||||
this.schedule();
|
||||
}
|
||||
|
||||
getDiagnostics(): MemberWorkSyncQueueDiagnostics {
|
||||
return {
|
||||
queued: this.items.size,
|
||||
running: this.running.size,
|
||||
...this.counters,
|
||||
};
|
||||
}
|
||||
|
||||
async stop(): Promise<void> {
|
||||
this.stopped = true;
|
||||
if (this.timer) {
|
||||
clearTimeout(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
this.items.clear();
|
||||
await Promise.allSettled([...this.inFlight]);
|
||||
}
|
||||
|
||||
private schedule(): void {
|
||||
if (this.stopped) {
|
||||
return;
|
||||
}
|
||||
if (this.timer) {
|
||||
clearTimeout(this.timer);
|
||||
this.timer = null;
|
||||
}
|
||||
if (this.items.size === 0) {
|
||||
return;
|
||||
}
|
||||
if (this.running.size >= this.concurrency) {
|
||||
return;
|
||||
}
|
||||
|
||||
const nextRunAt = Math.min(...[...this.items.values()].map((item) => item.runAt));
|
||||
const delayMs = Math.max(0, nextRunAt - this.now());
|
||||
this.timer = setTimeout(() => {
|
||||
this.timer = null;
|
||||
this.pump();
|
||||
}, delayMs);
|
||||
unrefTimer(this.timer);
|
||||
}
|
||||
|
||||
private pump(): void {
|
||||
if (this.stopped) {
|
||||
return;
|
||||
}
|
||||
|
||||
const due = [...this.items.entries()]
|
||||
.filter(([, item]) => item.runAt <= this.now())
|
||||
.sort((left, right) => left[1].runAt - right[1].runAt);
|
||||
|
||||
for (const [key, item] of due) {
|
||||
if (this.running.size >= this.concurrency) {
|
||||
break;
|
||||
}
|
||||
this.items.delete(key);
|
||||
this.runItem(key, item);
|
||||
}
|
||||
|
||||
this.schedule();
|
||||
}
|
||||
|
||||
private runItem(key: string, item: QueueItem): void {
|
||||
const running: RunningItem = {
|
||||
rerunRequested: false,
|
||||
triggerReasons: new Set(item.triggerReasons),
|
||||
};
|
||||
this.running.set(key, running);
|
||||
|
||||
const promise = this.executeItem(key, item, running)
|
||||
.catch((error: unknown) => {
|
||||
this.counters.failed += 1;
|
||||
this.deps.logger?.warn('member work sync queue reconcile failed', {
|
||||
teamName: item.teamName,
|
||||
memberName: item.memberName,
|
||||
error: String(error),
|
||||
});
|
||||
})
|
||||
.finally(() => {
|
||||
this.running.delete(key);
|
||||
this.inFlight.delete(promise);
|
||||
if (running.rerunRequested && !this.stopped) {
|
||||
for (const reason of running.triggerReasons) {
|
||||
this.enqueue({
|
||||
teamName: item.teamName,
|
||||
memberName: item.memberName,
|
||||
triggerReason: reason,
|
||||
});
|
||||
}
|
||||
}
|
||||
this.pump();
|
||||
});
|
||||
|
||||
this.inFlight.add(promise);
|
||||
}
|
||||
|
||||
private async executeItem(_key: string, item: QueueItem, running: RunningItem): Promise<void> {
|
||||
if (!(await this.deps.isTeamActive(item.teamName))) {
|
||||
this.counters.dropped += 1;
|
||||
return;
|
||||
}
|
||||
|
||||
await this.deps.reconcile(
|
||||
{ teamName: item.teamName, memberName: item.memberName },
|
||||
{
|
||||
reconciledBy: 'queue',
|
||||
triggerReasons: [...running.triggerReasons].sort(),
|
||||
}
|
||||
);
|
||||
this.counters.reconciled += 1;
|
||||
}
|
||||
}
|
||||
|
|
@ -567,7 +567,7 @@ let codexAccountFeature: CodexAccountFeatureFacade | null = null;
|
|||
let codexModelCatalogFeature: CodexModelCatalogFeatureFacade | null = null;
|
||||
let recentProjectsFeature: RecentProjectsFeatureFacade;
|
||||
let runtimeProviderManagementFeature: RuntimeProviderManagementFeatureFacade;
|
||||
let memberWorkSyncFeature: MemberWorkSyncFeatureFacade;
|
||||
let memberWorkSyncFeature: MemberWorkSyncFeatureFacade | null = null;
|
||||
let teamDataService: TeamDataService;
|
||||
let teamProvisioningService: TeamProvisioningService;
|
||||
let cliInstallerService: CliInstallerService;
|
||||
|
|
@ -787,6 +787,7 @@ function wireFileWatcherEvents(context: ServiceContext): void {
|
|||
if (typeof row.teamName !== 'string' || row.teamName.trim().length === 0) return;
|
||||
const teamName = row.teamName.trim();
|
||||
const detail = typeof row.detail === 'string' ? row.detail : '';
|
||||
memberWorkSyncFeature?.noteTeamChange(row as TeamChangeEvent);
|
||||
|
||||
if (
|
||||
teamDataService &&
|
||||
|
|
@ -1182,6 +1183,7 @@ async function initializeServices(): Promise<void> {
|
|||
const teamChangeEmitter = (event: TeamChangeEvent): void => {
|
||||
forwardTeamChange(event);
|
||||
teamTaskStallMonitor?.noteTeamChange(event);
|
||||
memberWorkSyncFeature?.noteTeamChange(event);
|
||||
if (event.type === 'lead-activity' && event.detail === 'offline') {
|
||||
teammateToolTracker?.handleTeamOffline(event.teamName);
|
||||
}
|
||||
|
|
@ -1231,8 +1233,21 @@ async function initializeServices(): Promise<void> {
|
|||
taskReader: new TeamTaskReader(),
|
||||
kanbanManager: new TeamKanbanManager(),
|
||||
membersMetaStore: new TeamMembersMetaStore(),
|
||||
isTeamActive: (teamName) =>
|
||||
teamProvisioningService.isTeamAlive(teamName) ||
|
||||
teamProvisioningService.hasProvisioningRun(teamName),
|
||||
logger: createLogger('Feature:MemberWorkSync'),
|
||||
});
|
||||
void teamDataService
|
||||
.listTeams()
|
||||
.then((teams) =>
|
||||
memberWorkSyncFeature?.enqueueStartupScan(
|
||||
teams.filter((team) => !team.deletedAt).map((team) => team.teamName)
|
||||
)
|
||||
)
|
||||
.catch((error: unknown) =>
|
||||
logger.warn(`[Init] Member work sync startup scan failed: ${String(error)}`)
|
||||
);
|
||||
codexAccountFeature = createCodexAccountFeature({
|
||||
logger: createLogger('Feature:CodexAccount'),
|
||||
configManager,
|
||||
|
|
@ -1354,7 +1369,7 @@ async function startHttpServer(
|
|||
chunkBuilder: activeContext.chunkBuilder,
|
||||
dataCache: activeContext.dataCache,
|
||||
recentProjectsFeature,
|
||||
memberWorkSyncFeature,
|
||||
memberWorkSyncFeature: memberWorkSyncFeature ?? undefined,
|
||||
updaterService,
|
||||
sshConnectionManager,
|
||||
teamDataService,
|
||||
|
|
@ -1477,6 +1492,8 @@ async function shutdownServices(): Promise<void> {
|
|||
codexModelCatalogFeature = null;
|
||||
await runShutdownStep('Codex account dispose', () => codexAccountFeature?.dispose());
|
||||
codexAccountFeature = null;
|
||||
await runShutdownStep('member work sync dispose', () => memberWorkSyncFeature?.dispose());
|
||||
memberWorkSyncFeature = null;
|
||||
|
||||
if (ptyTerminalService) {
|
||||
await runShutdownStep('PTY terminals kill', () => ptyTerminalService.killAll());
|
||||
|
|
|
|||
|
|
@ -60,6 +60,7 @@ function createDeps(options?: {
|
|||
items?: MemberWorkSyncActionableWorkItem[];
|
||||
activeMemberNames?: string[];
|
||||
inactive?: boolean;
|
||||
teamActive?: boolean;
|
||||
providerId?: 'opencode' | 'codex';
|
||||
}) {
|
||||
const clock = new MutableClock();
|
||||
|
|
@ -97,6 +98,9 @@ function createDeps(options?: {
|
|||
? { ok: true }
|
||||
: { ok: false, reason: input.token ? 'invalid' : 'missing' },
|
||||
},
|
||||
lifecycle: {
|
||||
isTeamActive: () => options?.teamActive ?? true,
|
||||
},
|
||||
};
|
||||
return { clock, deps, source, store };
|
||||
}
|
||||
|
|
@ -112,6 +116,12 @@ describe('MemberWorkSync use cases', () => {
|
|||
expect(status.state).toBe('needs_sync');
|
||||
expect(status.agenda.items).toEqual([workItem]);
|
||||
expect(status.diagnostics).toContain('no_current_report');
|
||||
expect(status.reportToken).toBe(`token:team-a:bob:${status.agenda.fingerprint}`);
|
||||
expect(status.shadow).toMatchObject({
|
||||
reconciledBy: 'request',
|
||||
wouldNudge: true,
|
||||
fingerprintChanged: false,
|
||||
});
|
||||
expect(store.pendingReports).toEqual([]);
|
||||
});
|
||||
|
||||
|
|
@ -134,6 +144,7 @@ describe('MemberWorkSync use cases', () => {
|
|||
|
||||
expect(result.accepted).toBe(true);
|
||||
expect(result.status.state).toBe('still_working');
|
||||
expect(result.status.shadow).toMatchObject({ reconciledBy: 'report', wouldNudge: false });
|
||||
|
||||
clock.set('2026-04-29T00:01:59.000Z');
|
||||
expect((await reader.execute({ teamName: 'team-a', memberName: 'bob' })).state).toBe(
|
||||
|
|
@ -182,6 +193,41 @@ describe('MemberWorkSync use cases', () => {
|
|||
expect(result.status.state).toBe('caught_up');
|
||||
});
|
||||
|
||||
it('marks status inactive when the team runtime is not active', async () => {
|
||||
const { deps } = createDeps({ teamActive: false });
|
||||
const status = await new MemberWorkSyncDiagnosticsReader(deps).execute({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
});
|
||||
|
||||
expect(status.state).toBe('inactive');
|
||||
expect(status.diagnostics).toContain('team_runtime_inactive');
|
||||
expect(status.shadow?.wouldNudge).toBe(false);
|
||||
});
|
||||
|
||||
it('records fingerprint transitions without treating them as progress proof', async () => {
|
||||
const { deps, source } = createDeps();
|
||||
const reader = new MemberWorkSyncDiagnosticsReader(deps);
|
||||
await reader.execute({ teamName: 'team-a', memberName: 'bob' });
|
||||
|
||||
source.agenda.items = [
|
||||
{
|
||||
...workItem,
|
||||
taskId: 'task-2',
|
||||
displayId: '22222222',
|
||||
subject: 'New work',
|
||||
},
|
||||
];
|
||||
const changed = await reader.execute({ teamName: 'team-a', memberName: 'bob' });
|
||||
|
||||
expect(changed.shadow).toMatchObject({
|
||||
fingerprintChanged: true,
|
||||
wouldNudge: true,
|
||||
});
|
||||
expect(changed.shadow?.previousFingerprint).toMatch(/^agenda:v1:/);
|
||||
expect(changed.state).toBe('needs_sync');
|
||||
});
|
||||
|
||||
it('rejects invalid report tokens without recording replayable intents', async () => {
|
||||
const { deps, store } = createDeps();
|
||||
const reader = new MemberWorkSyncDiagnosticsReader(deps);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,123 @@
|
|||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { MemberWorkSyncEventQueue } from '@features/member-work-sync/main/infrastructure/MemberWorkSyncEventQueue';
|
||||
|
||||
describe('MemberWorkSyncEventQueue', () => {
|
||||
beforeEach(() => {
|
||||
vi.useFakeTimers();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
it('coalesces duplicate member events into one queue reconcile', async () => {
|
||||
const reconciles: unknown[] = [];
|
||||
const queue = new MemberWorkSyncEventQueue({
|
||||
quietWindowMs: 100,
|
||||
reconcile: async (request, context) => {
|
||||
reconciles.push({ request, context });
|
||||
},
|
||||
isTeamActive: () => true,
|
||||
});
|
||||
|
||||
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' });
|
||||
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'inbox_changed' });
|
||||
|
||||
await vi.advanceTimersByTimeAsync(100);
|
||||
|
||||
expect(reconciles).toHaveLength(1);
|
||||
expect(reconciles[0]).toMatchObject({
|
||||
request: { teamName: 'team-a', memberName: 'bob' },
|
||||
context: {
|
||||
reconciledBy: 'queue',
|
||||
triggerReasons: ['inbox_changed', 'task_changed'],
|
||||
},
|
||||
});
|
||||
expect(queue.getDiagnostics()).toMatchObject({ reconciled: 1, coalesced: 1 });
|
||||
await queue.stop();
|
||||
});
|
||||
|
||||
it('drops queued work for inactive teams without reconciling', async () => {
|
||||
const reconcile = vi.fn();
|
||||
const queue = new MemberWorkSyncEventQueue({
|
||||
quietWindowMs: 1,
|
||||
reconcile,
|
||||
isTeamActive: () => false,
|
||||
});
|
||||
|
||||
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' });
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
expect(reconcile).not.toHaveBeenCalled();
|
||||
expect(queue.getDiagnostics()).toMatchObject({ dropped: 1, reconciled: 0 });
|
||||
await queue.stop();
|
||||
});
|
||||
|
||||
it('runs one follow-up pass when events arrive during an active reconcile', async () => {
|
||||
let release: () => void = () => {
|
||||
throw new Error('reconcile did not start');
|
||||
};
|
||||
const reconciles: unknown[] = [];
|
||||
const queue = new MemberWorkSyncEventQueue({
|
||||
quietWindowMs: 1,
|
||||
reconcile: async (request, context) => {
|
||||
reconciles.push({ request, context });
|
||||
if (reconciles.length === 1) {
|
||||
await new Promise<void>((resolve) => {
|
||||
release = resolve;
|
||||
});
|
||||
}
|
||||
},
|
||||
isTeamActive: () => true,
|
||||
});
|
||||
|
||||
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' });
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'tool_finished' });
|
||||
|
||||
release();
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
expect(reconciles).toHaveLength(2);
|
||||
expect(reconciles[1]).toMatchObject({
|
||||
context: { reconciledBy: 'queue', triggerReasons: ['task_changed', 'tool_finished'] },
|
||||
});
|
||||
await queue.stop();
|
||||
});
|
||||
|
||||
it('does not spin timers while concurrency is saturated', async () => {
|
||||
let release: () => void = () => {
|
||||
throw new Error('reconcile did not start');
|
||||
};
|
||||
const reconciles: unknown[] = [];
|
||||
const queue = new MemberWorkSyncEventQueue({
|
||||
quietWindowMs: 1,
|
||||
concurrency: 1,
|
||||
reconcile: async (request) => {
|
||||
reconciles.push(request);
|
||||
if (reconciles.length === 1) {
|
||||
await new Promise<void>((resolve) => {
|
||||
release = resolve;
|
||||
});
|
||||
}
|
||||
},
|
||||
isTeamActive: () => true,
|
||||
});
|
||||
|
||||
queue.enqueue({ teamName: 'team-a', memberName: 'alice', triggerReason: 'task_changed' });
|
||||
queue.enqueue({ teamName: 'team-a', memberName: 'bob', triggerReason: 'task_changed' });
|
||||
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
await vi.advanceTimersByTimeAsync(1_000);
|
||||
|
||||
expect(reconciles).toHaveLength(1);
|
||||
expect(queue.getDiagnostics()).toMatchObject({ queued: 1, running: 1 });
|
||||
|
||||
release();
|
||||
await vi.advanceTimersByTimeAsync(1);
|
||||
|
||||
expect(reconciles).toHaveLength(2);
|
||||
await queue.stop();
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,70 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { MemberWorkSyncTeamChangeRouter } from '@features/member-work-sync/main/adapters/input/MemberWorkSyncTeamChangeRouter';
|
||||
|
||||
function createRouter(activeMembers: string[] = ['alice', 'bob']) {
|
||||
const queue = {
|
||||
enqueue: vi.fn(),
|
||||
dropTeam: vi.fn(),
|
||||
};
|
||||
const router = new MemberWorkSyncTeamChangeRouter(
|
||||
{
|
||||
loadActiveMemberNames: async () => activeMembers,
|
||||
},
|
||||
queue as never
|
||||
);
|
||||
return { queue, router };
|
||||
}
|
||||
|
||||
describe('MemberWorkSyncTeamChangeRouter', () => {
|
||||
it('routes task and config events to all active members', async () => {
|
||||
const { queue, router } = createRouter();
|
||||
|
||||
router.noteTeamChange({ type: 'task', teamName: 'team-a', detail: 'task-1.json' });
|
||||
await Promise.resolve();
|
||||
|
||||
expect(queue.enqueue).toHaveBeenCalledWith({
|
||||
teamName: 'team-a',
|
||||
memberName: 'alice',
|
||||
triggerReason: 'task_changed',
|
||||
runAfterMs: undefined,
|
||||
});
|
||||
expect(queue.enqueue).toHaveBeenCalledWith({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
triggerReason: 'task_changed',
|
||||
runAfterMs: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('routes inbox and tool-finish events to the addressed member only', () => {
|
||||
const { queue, router } = createRouter();
|
||||
|
||||
router.noteTeamChange({ type: 'inbox', teamName: 'team-a', detail: 'inboxes/bob.json' });
|
||||
router.noteTeamChange({
|
||||
type: 'tool-activity',
|
||||
teamName: 'team-a',
|
||||
detail: JSON.stringify({ action: 'finish', memberName: 'alice', toolUseId: 'tool-1' }),
|
||||
});
|
||||
|
||||
expect(queue.enqueue).toHaveBeenCalledWith({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
triggerReason: 'inbox_changed',
|
||||
});
|
||||
expect(queue.enqueue).toHaveBeenCalledWith({
|
||||
teamName: 'team-a',
|
||||
memberName: 'alice',
|
||||
triggerReason: 'tool_finished',
|
||||
});
|
||||
});
|
||||
|
||||
it('drops queued work when the team goes offline', () => {
|
||||
const { queue, router } = createRouter();
|
||||
|
||||
router.noteTeamChange({ type: 'lead-activity', teamName: 'team-a', detail: 'offline' });
|
||||
|
||||
expect(queue.dropTeam).toHaveBeenCalledWith('team-a');
|
||||
expect(queue.enqueue).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
Loading…
Reference in a new issue