fix(member-work-sync): preserve undelivered nudge retries
This commit is contained in:
parent
f705cd25cb
commit
d27c1bcc51
5 changed files with 108 additions and 8 deletions
|
|
@ -1,6 +1,6 @@
|
|||
# Member Work Sync Control Plane Plan
|
||||
|
||||
**Status:** Phase 1, Phase 1.5 observability, minimal read-only member details surface, and readiness-gated Phase 2 nudge outbox/dispatcher implemented
|
||||
**Status:** Phase 1, Phase 1.5 observability, minimal read-only member details surface, and readiness-gated Phase 2 nudge outbox/dispatcher/scheduler implemented
|
||||
**Scope:** Team management, task work synchronization, agent work coordination
|
||||
**Primary repo:** `claude_team`
|
||||
**Secondary write-boundary repo:** `agent_teams_orchestrator` / `agent-teams-controller`
|
||||
|
|
@ -38,8 +38,9 @@ Current implementation note:
|
|||
- Queue reconciles can plan a Phase 2 outbox item only when `phase2Readiness=shadow_ready`; read-only diagnostics never create outbox intents. This preserves the anti-spam gate and keeps UI/status reads passive.
|
||||
- Dispatcher use case runs after queued reconcile and is also exposed through the facade. It claims due outbox rows, revalidates active team/status/current fingerprint/readiness/busy/watchdog cooldown, then writes one idempotent inbox nudge through a narrow port.
|
||||
- Production busy revalidation is wired through a tool-activity busy signal adapter. Active or recently finished tool calls defer Phase 2 nudges instead of interrupting work.
|
||||
- A feature-owned dispatch scheduler wakes due retryable outbox items for active teams. It is bounded, unref'ed, and still relies on dispatcher revalidation before any inbox write.
|
||||
- A feature-owned dispatch scheduler wakes due retryable outbox items for lifecycle-active teams only. It is bounded, unref'ed, and still relies on dispatcher revalidation before any inbox write.
|
||||
- Dispatcher applies per-member hourly rate limiting and bounded deterministic retry backoff with jitter before retrying failed nudge attempts.
|
||||
- Superseded-but-undelivered outbox items can be revived by a fresh queued reconcile for the same agenda fingerprint. Delivered nudges remain one-per-fingerprint.
|
||||
- Phase 2 dispatch stays blocked until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low.
|
||||
|
||||
Patterns used:
|
||||
|
|
@ -3012,6 +3013,15 @@ Includes:
|
|||
- per-member token bucket;
|
||||
- shared cooldown with watchdog.
|
||||
|
||||
Implemented safety constraints:
|
||||
|
||||
- only queued reconciles plan outbox rows;
|
||||
- read-only diagnostics never plan outbox rows;
|
||||
- outbox planning requires `phase2Readiness.state === "shadow_ready"`;
|
||||
- dispatch revalidates lifecycle, current status, current fingerprint, readiness, busy state, rate limit, and watchdog cooldown immediately before inbox insert;
|
||||
- scheduled dispatch lists lifecycle-active teams only, not all stored teams;
|
||||
- undelivered `superseded` rows can be revived by a later fresh reconcile for the same fingerprint, while `delivered` rows remain one-per-fingerprint.
|
||||
|
||||
### Phase 3: Provider Accelerators
|
||||
|
||||
`🎯 8 🛡️ 8 🧠 5`, `300-600 LOC`.
|
||||
|
|
@ -3025,6 +3035,12 @@ Includes:
|
|||
|
||||
No accelerator is proof.
|
||||
|
||||
Current implementation:
|
||||
|
||||
- tool-finish enqueue and tool-activity busy suppression are implemented through `TeamChangeEvent` and the feature-owned busy signal;
|
||||
- Claude Stop hook and OpenCode turn-settled hooks are intentionally not wired yet because the current feature boundary does not expose one authoritative cross-provider "turn settled and idle" signal. Adding an adapter around prompt text, idle notifications, or provider-specific transcript heuristics would be less reliable than the current tool-finish + scheduled reconcile path;
|
||||
- manual "sync now" remains optional because details/status reads are passive by design, and explicit manual nudges should reuse the existing outbox/dispatcher instead of bypassing readiness guards.
|
||||
|
||||
---
|
||||
|
||||
## 22. Runtime Defaults And No Feature Flags
|
||||
|
|
|
|||
|
|
@ -97,6 +97,10 @@ function isOutboxTerminal(status: MemberWorkSyncOutboxItem['status']): boolean {
|
|||
return status === 'delivered' || status === 'superseded' || status === 'failed_terminal';
|
||||
}
|
||||
|
||||
function canReviveOutboxItem(status: MemberWorkSyncOutboxItem['status']): boolean {
|
||||
return status === 'superseded' || (!isOutboxTerminal(status) && status !== 'pending');
|
||||
}
|
||||
|
||||
function canClaimOutboxItem(item: MemberWorkSyncOutboxItem, nowIso: string): boolean {
|
||||
if (item.status !== 'pending' && item.status !== 'failed_retryable') {
|
||||
return false;
|
||||
|
|
@ -379,7 +383,7 @@ export class JsonMemberWorkSyncStore
|
|||
return;
|
||||
}
|
||||
|
||||
if (!isOutboxTerminal(current.status) && current.status !== 'pending') {
|
||||
if (canReviveOutboxItem(current.status)) {
|
||||
const next: MemberWorkSyncOutboxItem = {
|
||||
...current,
|
||||
status: 'pending',
|
||||
|
|
|
|||
|
|
@ -1236,10 +1236,17 @@ async function initializeServices(): Promise<void> {
|
|||
isTeamActive: (teamName) =>
|
||||
teamProvisioningService.isTeamAlive(teamName) ||
|
||||
teamProvisioningService.hasProvisioningRun(teamName),
|
||||
listActiveTeamNames: async () =>
|
||||
(await teamDataService.listTeams())
|
||||
.filter((team) => !team.deletedAt)
|
||||
.map((team) => team.teamName),
|
||||
listActiveTeamNames: async () => {
|
||||
const teams = await teamDataService.listTeams();
|
||||
return teams
|
||||
.filter(
|
||||
(team) =>
|
||||
!team.deletedAt &&
|
||||
(teamProvisioningService.isTeamAlive(team.teamName) ||
|
||||
teamProvisioningService.hasProvisioningRun(team.teamName))
|
||||
)
|
||||
.map((team) => team.teamName);
|
||||
},
|
||||
logger: createLogger('Feature:MemberWorkSync'),
|
||||
});
|
||||
void teamDataService
|
||||
|
|
|
|||
|
|
@ -140,6 +140,18 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort {
|
|||
this.ensures.push(input);
|
||||
const current = this.items.get(input.id);
|
||||
if (current) {
|
||||
if (current.status === 'superseded') {
|
||||
const revived = {
|
||||
...current,
|
||||
status: 'pending' as const,
|
||||
updatedAt: input.nowIso,
|
||||
};
|
||||
delete revived.lastError;
|
||||
delete revived.claimedBy;
|
||||
delete revived.claimedAt;
|
||||
this.items.set(input.id, revived);
|
||||
return { ok: true as const, outcome: 'existing' as const, item: revived };
|
||||
}
|
||||
return { ok: true as const, outcome: 'existing' as const, item: current };
|
||||
}
|
||||
const item: MemberWorkSyncOutboxItem = {
|
||||
|
|
@ -520,7 +532,7 @@ describe('MemberWorkSync use cases', () => {
|
|||
it('does not dispatch stale outbox items after the member reports still working', async () => {
|
||||
const outbox = new InMemoryOutboxStore();
|
||||
const inbox = new InMemoryInboxNudge();
|
||||
const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox });
|
||||
const { clock, deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox });
|
||||
store.phase2ReadinessState = 'shadow_ready';
|
||||
|
||||
const reconciler = new MemberWorkSyncReconciler(deps);
|
||||
|
|
@ -550,6 +562,17 @@ describe('MemberWorkSync use cases', () => {
|
|||
status: 'superseded',
|
||||
lastError: 'status_no_longer_matches_outbox',
|
||||
});
|
||||
|
||||
clock.set('2026-04-29T00:03:00.000Z');
|
||||
const expired = await reconciler.execute(
|
||||
{ teamName: 'team-a', memberName: 'bob' },
|
||||
{ reconciledBy: 'queue', triggerReasons: ['task_changed'] }
|
||||
);
|
||||
|
||||
expect(expired.state).toBe('needs_sync');
|
||||
const revived = outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`);
|
||||
expect(revived).toMatchObject({ status: 'pending' });
|
||||
expect(revived).not.toHaveProperty('lastError');
|
||||
});
|
||||
|
||||
it('rate-limits delivered nudges per member per hour', async () => {
|
||||
|
|
|
|||
|
|
@ -197,6 +197,56 @@ describe('JsonMemberWorkSyncStore', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('revives superseded outbox items but keeps delivered nudges one-per-fingerprint', async () => {
|
||||
const input = {
|
||||
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
agendaFingerprint: 'agenda:v1:abc',
|
||||
payloadHash: 'hash-a',
|
||||
payload: makeNudgePayload(),
|
||||
nowIso: '2026-04-29T00:00:00.000Z',
|
||||
};
|
||||
|
||||
await store.ensurePending(input);
|
||||
await store.markSuperseded({
|
||||
teamName: 'team-a',
|
||||
id: input.id,
|
||||
reason: 'status_no_longer_matches_outbox',
|
||||
nowIso: '2026-04-29T00:01:00.000Z',
|
||||
});
|
||||
|
||||
const revived = await store.ensurePending({ ...input, nowIso: '2026-04-29T00:02:00.000Z' });
|
||||
expect(revived).toMatchObject({
|
||||
ok: true,
|
||||
outcome: 'existing',
|
||||
item: { status: 'pending' },
|
||||
});
|
||||
expect(revived.item).not.toHaveProperty('lastError');
|
||||
|
||||
const [claimed] = await store.claimDue({
|
||||
teamName: 'team-a',
|
||||
claimedBy: 'dispatcher-a',
|
||||
nowIso: '2026-04-29T00:03:00.000Z',
|
||||
limit: 1,
|
||||
});
|
||||
await store.markDelivered({
|
||||
teamName: 'team-a',
|
||||
id: input.id,
|
||||
attemptGeneration: claimed.attemptGeneration,
|
||||
deliveredMessageId: 'message-1',
|
||||
nowIso: '2026-04-29T00:04:00.000Z',
|
||||
});
|
||||
|
||||
await expect(
|
||||
store.ensurePending({ ...input, nowIso: '2026-04-29T00:05:00.000Z' })
|
||||
).resolves.toMatchObject({
|
||||
ok: true,
|
||||
outcome: 'existing',
|
||||
item: { status: 'delivered', deliveredMessageId: 'message-1' },
|
||||
});
|
||||
});
|
||||
|
||||
it('claims due outbox items and fences terminal updates by attempt generation', async () => {
|
||||
const input = {
|
||||
id: 'member-work-sync:team-a:bob:agenda:v1:abc',
|
||||
|
|
|
|||
Loading…
Reference in a new issue