feat(member-work-sync): rate limit nudge retries
This commit is contained in:
parent
895c3afc5e
commit
99b636fd33
6 changed files with 152 additions and 2 deletions
|
|
@ -37,6 +37,7 @@ Current implementation note:
|
|||
- Phase 2 storage foundation is implemented as an inert durable outbox: idempotency key, payload hash conflict checks, claim generation fencing, retry/terminal states. It is not wired to dispatch inbox nudges yet.
|
||||
- Reconciler can plan a Phase 2 outbox item only when `phase2Readiness=shadow_ready`; otherwise it records normal shadow status and does nothing. This preserves the anti-spam gate before any dispatcher exists.
|
||||
- Dispatcher use case runs after queued reconcile and is also exposed through the facade. It claims due outbox rows, revalidates active team/status/current fingerprint/readiness/busy/watchdog cooldown, then writes one idempotent inbox nudge through a narrow port.
|
||||
- Dispatcher applies per-member hourly rate limiting and bounded deterministic retry backoff with jitter before retrying failed nudge attempts.
|
||||
- Phase 2 must not start until real shadow metrics confirm that `needs_sync` churn and false positives are acceptably low.
|
||||
|
||||
Patterns used:
|
||||
|
|
|
|||
|
|
@ -298,3 +298,9 @@ export interface MemberWorkSyncOutboxMarkFailedInput {
|
|||
nowIso: string;
|
||||
nextAttemptAt?: string;
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncOutboxCountRecentDeliveredInput {
|
||||
teamName: string;
|
||||
memberName: string;
|
||||
sinceIso: string;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,10 @@
|
|||
import type { MemberWorkSyncOutboxItem } from '../../contracts';
|
||||
import type { MemberWorkSyncUseCaseDeps } from './ports';
|
||||
|
||||
const MEMBER_WORK_SYNC_MAX_NUDGES_PER_MEMBER_PER_HOUR = 2;
|
||||
const MEMBER_WORK_SYNC_RETRY_BASE_MINUTES = 10;
|
||||
const MEMBER_WORK_SYNC_RETRY_MAX_MINUTES = 60;
|
||||
|
||||
export interface MemberWorkSyncNudgeDispatchSummary {
|
||||
claimed: number;
|
||||
delivered: number;
|
||||
|
|
@ -23,6 +27,26 @@ function addMinutes(iso: string, minutes: number): string {
|
|||
return new Date(Date.parse(iso) + minutes * 60_000).toISOString();
|
||||
}
|
||||
|
||||
function subtractMinutes(iso: string, minutes: number): string {
|
||||
return new Date(Date.parse(iso) - minutes * 60_000).toISOString();
|
||||
}
|
||||
|
||||
function stableJitterMinutes(id: string, attemptGeneration: number): number {
|
||||
const seed = `${id}:${attemptGeneration}`;
|
||||
let value = 0;
|
||||
for (const char of seed) {
|
||||
value = (value * 31 + char.charCodeAt(0)) % 997;
|
||||
}
|
||||
return value % 5;
|
||||
}
|
||||
|
||||
function nextRetryAt(item: MemberWorkSyncOutboxItem, nowIso: string): string {
|
||||
const exponentialMinutes =
|
||||
MEMBER_WORK_SYNC_RETRY_BASE_MINUTES * 2 ** Math.max(0, item.attemptGeneration - 1);
|
||||
const cappedMinutes = Math.min(MEMBER_WORK_SYNC_RETRY_MAX_MINUTES, exponentialMinutes);
|
||||
return addMinutes(nowIso, cappedMinutes + stableJitterMinutes(item.id, item.attemptGeneration));
|
||||
}
|
||||
|
||||
export class MemberWorkSyncNudgeDispatcher {
|
||||
constructor(private readonly deps: MemberWorkSyncUseCaseDeps) {}
|
||||
|
||||
|
|
@ -71,7 +95,7 @@ export class MemberWorkSyncNudgeDispatcher {
|
|||
error: revalidation.reason,
|
||||
retryable: true,
|
||||
nowIso,
|
||||
nextAttemptAt: revalidation.nextAttemptAt ?? addMinutes(nowIso, 10),
|
||||
nextAttemptAt: revalidation.nextAttemptAt ?? nextRetryAt(item, nowIso),
|
||||
});
|
||||
return 'retryable';
|
||||
}
|
||||
|
|
@ -120,7 +144,7 @@ export class MemberWorkSyncNudgeDispatcher {
|
|||
error: String(error),
|
||||
retryable: true,
|
||||
nowIso,
|
||||
nextAttemptAt: addMinutes(nowIso, 10),
|
||||
nextAttemptAt: nextRetryAt(item, nowIso),
|
||||
});
|
||||
return 'retryable';
|
||||
}
|
||||
|
|
@ -160,6 +184,23 @@ export class MemberWorkSyncNudgeDispatcher {
|
|||
return { ok: false, reason: 'phase2_not_ready', retryable: true };
|
||||
}
|
||||
|
||||
const recentDelivered = await this.deps.outboxStore?.countRecentDelivered({
|
||||
teamName: item.teamName,
|
||||
memberName: item.memberName,
|
||||
sinceIso: subtractMinutes(nowIso, 60),
|
||||
});
|
||||
if (
|
||||
recentDelivered != null &&
|
||||
recentDelivered >= MEMBER_WORK_SYNC_MAX_NUDGES_PER_MEMBER_PER_HOUR
|
||||
) {
|
||||
return {
|
||||
ok: false,
|
||||
reason: 'member_nudge_rate_limited',
|
||||
retryable: true,
|
||||
nextAttemptAt: addMinutes(nowIso, 60),
|
||||
};
|
||||
}
|
||||
|
||||
const busy = await this.deps.busySignal?.isBusy({
|
||||
teamName: item.teamName,
|
||||
memberName: item.memberName,
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import type {
|
|||
MemberWorkSyncTeamMetrics,
|
||||
MemberWorkSyncProviderId,
|
||||
MemberWorkSyncOutboxClaimInput,
|
||||
MemberWorkSyncOutboxCountRecentDeliveredInput,
|
||||
MemberWorkSyncOutboxEnsureInput,
|
||||
MemberWorkSyncOutboxEnsureResult,
|
||||
MemberWorkSyncOutboxItem,
|
||||
|
|
@ -100,6 +101,7 @@ export interface MemberWorkSyncOutboxStorePort {
|
|||
markDelivered(input: MemberWorkSyncOutboxMarkDeliveredInput): Promise<void>;
|
||||
markSuperseded(input: MemberWorkSyncOutboxMarkSupersededInput): Promise<void>;
|
||||
markFailed(input: MemberWorkSyncOutboxMarkFailedInput): Promise<void>;
|
||||
countRecentDelivered(input: MemberWorkSyncOutboxCountRecentDeliveredInput): Promise<number>;
|
||||
}
|
||||
|
||||
export interface MemberWorkSyncInboxNudgePort {
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ import { withFileLock } from '@main/services/team/fileLock';
|
|||
import type {
|
||||
MemberWorkSyncMetricEvent,
|
||||
MemberWorkSyncOutboxClaimInput,
|
||||
MemberWorkSyncOutboxCountRecentDeliveredInput,
|
||||
MemberWorkSyncOutboxEnsureInput,
|
||||
MemberWorkSyncOutboxEnsureResult,
|
||||
MemberWorkSyncOutboxItem,
|
||||
|
|
@ -513,6 +514,18 @@ export class JsonMemberWorkSyncStore
|
|||
});
|
||||
}
|
||||
|
||||
async countRecentDelivered(
|
||||
input: MemberWorkSyncOutboxCountRecentDeliveredInput
|
||||
): Promise<number> {
|
||||
const file = await this.readOutboxFile(input.teamName);
|
||||
return Object.values(file.items).filter(
|
||||
(item) =>
|
||||
item.memberName.trim().toLowerCase() === input.memberName.trim().toLowerCase() &&
|
||||
item.status === 'delivered' &&
|
||||
item.updatedAt >= input.sinceIso
|
||||
).length;
|
||||
}
|
||||
|
||||
private async readFile(teamName: string): Promise<StoreFile> {
|
||||
const filePath = this.paths.getStatusPath(teamName);
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -171,6 +171,7 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort {
|
|||
...current,
|
||||
status: 'delivered',
|
||||
deliveredMessageId: input.deliveredMessageId,
|
||||
updatedAt: input.nowIso,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -189,15 +190,33 @@ class InMemoryOutboxStore implements MemberWorkSyncOutboxStorePort {
|
|||
...current,
|
||||
status: input.retryable ? 'failed_retryable' : 'failed_terminal',
|
||||
lastError: input.error,
|
||||
...(input.nextAttemptAt ? { nextAttemptAt: input.nextAttemptAt } : {}),
|
||||
updatedAt: input.nowIso,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async countRecentDelivered(input: {
|
||||
memberName: string;
|
||||
sinceIso: string;
|
||||
}): Promise<number> {
|
||||
return [...this.items.values()].filter(
|
||||
(item) =>
|
||||
item.status === 'delivered' &&
|
||||
item.memberName === input.memberName &&
|
||||
item.updatedAt >= input.sinceIso
|
||||
).length;
|
||||
}
|
||||
}
|
||||
|
||||
class InMemoryInboxNudge implements MemberWorkSyncInboxNudgePort {
|
||||
readonly inserted: Array<Parameters<MemberWorkSyncInboxNudgePort['insertIfAbsent']>[0]> = [];
|
||||
fail = false;
|
||||
|
||||
async insertIfAbsent(input: Parameters<MemberWorkSyncInboxNudgePort['insertIfAbsent']>[0]) {
|
||||
if (this.fail) {
|
||||
throw new Error('inbox unavailable');
|
||||
}
|
||||
this.inserted.push(input);
|
||||
return { inserted: true, messageId: input.messageId };
|
||||
}
|
||||
|
|
@ -505,6 +524,74 @@ describe('MemberWorkSync use cases', () => {
|
|||
});
|
||||
});
|
||||
|
||||
it('rate-limits delivered nudges per member per hour', async () => {
|
||||
const outbox = new InMemoryOutboxStore();
|
||||
const inbox = new InMemoryInboxNudge();
|
||||
const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox });
|
||||
store.phase2ReadinessState = 'shadow_ready';
|
||||
|
||||
const current = await new MemberWorkSyncDiagnosticsReader(deps).execute({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
});
|
||||
const firstId = `member-work-sync:team-a:bob:${current.agenda.fingerprint}:old-1`;
|
||||
const secondId = `member-work-sync:team-a:bob:${current.agenda.fingerprint}:old-2`;
|
||||
const baseItem = outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`);
|
||||
expect(baseItem).toBeDefined();
|
||||
for (const id of [firstId, secondId]) {
|
||||
outbox.items.set(id, {
|
||||
...(baseItem as NonNullable<typeof baseItem>),
|
||||
id,
|
||||
status: 'delivered',
|
||||
deliveredMessageId: id,
|
||||
updatedAt: '2026-04-29T00:00:00.000Z',
|
||||
});
|
||||
}
|
||||
|
||||
const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
|
||||
teamNames: ['team-a'],
|
||||
claimedBy: 'test-dispatcher',
|
||||
});
|
||||
|
||||
expect(summary).toMatchObject({ claimed: 1, delivered: 0, retryable: 1 });
|
||||
expect(inbox.inserted).toEqual([]);
|
||||
expect(outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`)).toMatchObject({
|
||||
status: 'failed_retryable',
|
||||
lastError: 'member_nudge_rate_limited',
|
||||
nextAttemptAt: '2026-04-29T01:00:00.000Z',
|
||||
});
|
||||
});
|
||||
|
||||
it('uses bounded retry backoff when inbox delivery fails', async () => {
|
||||
const outbox = new InMemoryOutboxStore();
|
||||
const inbox = new InMemoryInboxNudge();
|
||||
inbox.fail = true;
|
||||
const { deps, store } = createDeps({ outboxStore: outbox, inboxNudge: inbox });
|
||||
store.phase2ReadinessState = 'shadow_ready';
|
||||
|
||||
const current = await new MemberWorkSyncDiagnosticsReader(deps).execute({
|
||||
teamName: 'team-a',
|
||||
memberName: 'bob',
|
||||
});
|
||||
const summary = await new MemberWorkSyncNudgeDispatcher(deps).dispatchDue({
|
||||
teamNames: ['team-a'],
|
||||
claimedBy: 'test-dispatcher',
|
||||
});
|
||||
|
||||
const item = outbox.items.get(`member-work-sync:team-a:bob:${current.agenda.fingerprint}`);
|
||||
expect(summary).toMatchObject({ claimed: 1, delivered: 0, retryable: 1 });
|
||||
expect(item).toMatchObject({
|
||||
status: 'failed_retryable',
|
||||
lastError: 'Error: inbox unavailable',
|
||||
});
|
||||
expect(Date.parse(item?.nextAttemptAt ?? '')).toBeGreaterThan(
|
||||
Date.parse('2026-04-29T00:09:59.000Z')
|
||||
);
|
||||
expect(Date.parse(item?.nextAttemptAt ?? '')).toBeLessThanOrEqual(
|
||||
Date.parse('2026-04-29T00:14:00.000Z')
|
||||
);
|
||||
});
|
||||
|
||||
it('rejects invalid report tokens without recording replayable intents', async () => {
|
||||
const { deps, store } = createDeps();
|
||||
const reader = new MemberWorkSyncDiagnosticsReader(deps);
|
||||
|
|
|
|||
Loading…
Reference in a new issue