refactor(team): extract message notification scanner
This commit is contained in:
parent
ec004e9328
commit
a8e7f1ccd5
5 changed files with 483 additions and 238 deletions
|
|
@ -94,10 +94,9 @@ import {
|
|||
TEAM_VALIDATE_CLI_ARGS,
|
||||
// eslint-disable-next-line boundaries/element-types -- IPC channel constants are shared between main and preload by design
|
||||
} from '@preload/constants/ipcChannels';
|
||||
import { AGENT_BLOCK_CLOSE, AGENT_BLOCK_OPEN, wrapAgentBlock } from '@shared/constants/agentBlocks';
|
||||
import { wrapAgentBlock } from '@shared/constants/agentBlocks';
|
||||
import { KANBAN_COLUMN_IDS } from '@shared/constants/kanban';
|
||||
import { MAX_TEXT_LENGTH } from '@shared/constants/teamLimits';
|
||||
import { isApiErrorMessage } from '@shared/utils/apiErrorDetector';
|
||||
import {
|
||||
extractFlagsFromHelp,
|
||||
extractUserFlags,
|
||||
|
|
@ -111,7 +110,6 @@ import { getErrorMessage } from '@shared/utils/errorHandling';
|
|||
import { isLeadMember } from '@shared/utils/leadDetection';
|
||||
import { createLogger } from '@shared/utils/logger';
|
||||
import { isTeamProviderBackendId, migrateProviderBackendId } from '@shared/utils/providerBackend';
|
||||
import { isRateLimitMessage } from '@shared/utils/rateLimitDetector';
|
||||
import {
|
||||
buildStandaloneSlashCommandMeta,
|
||||
parseStandaloneSlashCommand,
|
||||
|
|
@ -133,7 +131,6 @@ import {
|
|||
import {
|
||||
getAutoResumeService,
|
||||
initializeAutoResumeService,
|
||||
planRateLimitAutoResume,
|
||||
} from '../services/team/AutoResumeService';
|
||||
import {
|
||||
cloneLaunchIoGovernorPayload,
|
||||
|
|
@ -156,6 +153,7 @@ import {
|
|||
import { TeamTaskAttachmentStore } from '../services/team/TeamTaskAttachmentStore';
|
||||
import { TeamWorktreeGitService } from '../services/team/TeamWorktreeGitService';
|
||||
|
||||
import { teamMessageNotificationScanner } from './teams/teamMessageNotificationScanner';
|
||||
import {
|
||||
validateFromField,
|
||||
validateMemberName,
|
||||
|
|
@ -301,14 +299,6 @@ function validateTeamGetDataOptions(
|
|||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* In-memory set of rate-limit message keys already processed.
|
||||
* Independent of NotificationManager storage — survives notification deletion/pruning.
|
||||
* Without this, deleted rate-limit notifications would re-appear on next getData() scan.
|
||||
*/
|
||||
const seenRateLimitKeys = new Set<string>();
|
||||
const SEEN_RATE_LIMIT_KEYS_MAX = 500;
|
||||
|
||||
async function withTimeoutValue<T>(
|
||||
promise: Promise<T>,
|
||||
timeoutMs: number,
|
||||
|
|
@ -442,178 +432,6 @@ function buildLeadDirectDelegateAckBlock(actionMode?: AgentActionMode): string |
|
|||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* In-memory set of API error message keys already processed.
|
||||
* Independent of NotificationManager storage — survives notification deletion/pruning.
|
||||
*/
|
||||
const seenApiErrorKeys = new Set<string>();
|
||||
const SEEN_API_ERROR_KEYS_MAX = 500;
|
||||
|
||||
function formatNotificationClockTime(date: Date): string {
|
||||
return new Intl.DateTimeFormat(undefined, {
|
||||
hour: '2-digit',
|
||||
minute: '2-digit',
|
||||
hour12: false,
|
||||
}).format(date);
|
||||
}
|
||||
|
||||
function buildRateLimitNotificationBody(plan: ReturnType<typeof planRateLimitAutoResume>): string {
|
||||
if (plan.kind === 'scheduled') {
|
||||
return `Auto-resume scheduled at ${formatNotificationClockTime(new Date(plan.fireAtMs))}`;
|
||||
}
|
||||
return 'Manual restart needed';
|
||||
}
|
||||
|
||||
/**
|
||||
* Check messages for rate limit indicators and fire notifications for new ones.
|
||||
* Uses both in-memory seenRateLimitKeys (to prevent resurrection after deletion)
|
||||
* and NotificationManager dedupeKey (to prevent storage duplicates).
|
||||
*/
|
||||
function checkRateLimitMessages(
|
||||
messages: readonly {
|
||||
messageId?: string;
|
||||
from: string;
|
||||
text: string;
|
||||
timestamp: string;
|
||||
to?: string;
|
||||
source?: string;
|
||||
leadSessionId?: string;
|
||||
}[],
|
||||
teamName: string,
|
||||
teamDisplayName: string,
|
||||
projectPath?: string,
|
||||
teamIsAlive = true,
|
||||
currentLeadSessionId: string | null = null
|
||||
): void {
|
||||
const observedAt = new Date();
|
||||
const autoResumeEnabled =
|
||||
ConfigManager.getInstance().getConfig().notifications.autoResumeOnRateLimit;
|
||||
|
||||
for (const msg of messages) {
|
||||
if (msg.from === 'user') continue;
|
||||
if (!isRateLimitMessage(msg.text)) continue;
|
||||
|
||||
const rawKey = msg.messageId ?? `${msg.from}:${msg.timestamp}`;
|
||||
const dedupeKey = `rate-limit:${teamName}:${rawKey}`;
|
||||
const isLeadAutoResumeCandidate =
|
||||
!msg.to && (msg.source === 'lead_process' || msg.source === 'lead_session');
|
||||
const autoResumeSessionMatches =
|
||||
msg.source !== 'lead_session' ||
|
||||
(Boolean(currentLeadSessionId) && msg.leadSessionId === currentLeadSessionId);
|
||||
const autoResumePlan = planRateLimitAutoResume({
|
||||
enabled: autoResumeEnabled,
|
||||
canAutoResume: teamIsAlive && isLeadAutoResumeCandidate && autoResumeSessionMatches,
|
||||
messageText: msg.text,
|
||||
observedAt,
|
||||
messageTimestamp: new Date(msg.timestamp),
|
||||
});
|
||||
|
||||
// In-memory guard: prevents resurrection after user deletes the notification.
|
||||
if (!seenRateLimitKeys.has(dedupeKey)) {
|
||||
seenRateLimitKeys.add(dedupeKey);
|
||||
|
||||
// Evict oldest entries to prevent unbounded growth
|
||||
if (seenRateLimitKeys.size > SEEN_RATE_LIMIT_KEYS_MAX) {
|
||||
const first = seenRateLimitKeys.values().next().value;
|
||||
if (first) seenRateLimitKeys.delete(first);
|
||||
}
|
||||
|
||||
void NotificationManager.getInstance()
|
||||
.addTeamNotification({
|
||||
teamEventType: 'rate_limit',
|
||||
teamName,
|
||||
teamDisplayName,
|
||||
from: msg.from,
|
||||
summary: 'Rate limit',
|
||||
body: buildRateLimitNotificationBody(autoResumePlan),
|
||||
dedupeKey,
|
||||
target: { kind: 'member', teamName, memberName: msg.from, focus: 'logs' },
|
||||
projectPath,
|
||||
})
|
||||
.catch(() => undefined);
|
||||
}
|
||||
|
||||
// Only schedule auto-resume while a live team run currently exists.
|
||||
// Persisted history for an offline/stopped team may still contain the old
|
||||
// rate-limit message, but arming a new timer from that stale history would
|
||||
// resurrect the nudge into a later manual restart.
|
||||
if (autoResumePlan.kind === 'scheduled') {
|
||||
// Only let persisted lead_session history rebuild auto-resume when it
|
||||
// clearly belongs to the currently running lead session. Otherwise an old
|
||||
// rate-limit from a previous manual run can resurrect into a newer restart.
|
||||
// Pass the original message timestamp so relative reset windows survive restarts
|
||||
// and old history does not rebuild a fresh auto-resume timer from "now".
|
||||
getAutoResumeService().handleRateLimitMessage(
|
||||
teamName,
|
||||
msg.text,
|
||||
observedAt,
|
||||
new Date(msg.timestamp)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check messages for API errors (e.g. "API Error: 429 ...") and fire OS notifications.
|
||||
* Mirrors the rate-limit approach: in-memory dedup + NotificationManager dedupeKey.
|
||||
* Skips rate-limit messages (they have their own notification path).
|
||||
*/
|
||||
function checkApiErrorMessages(
|
||||
messages: readonly { messageId?: string; from: string; text: string; timestamp: string }[],
|
||||
teamName: string,
|
||||
teamDisplayName: string,
|
||||
projectPath?: string
|
||||
): void {
|
||||
for (const msg of messages) {
|
||||
if (msg.from === 'user') continue;
|
||||
if (!isApiErrorMessage(msg.text)) continue;
|
||||
// Don't double-notify if it's also a rate limit message
|
||||
if (isRateLimitMessage(msg.text)) continue;
|
||||
|
||||
const rawKey = msg.messageId ?? `${msg.from}:${msg.timestamp}`;
|
||||
const dedupeKey = `api-error:${teamName}:${rawKey}`;
|
||||
|
||||
if (seenApiErrorKeys.has(dedupeKey)) continue;
|
||||
seenApiErrorKeys.add(dedupeKey);
|
||||
|
||||
if (seenApiErrorKeys.size > SEEN_API_ERROR_KEYS_MAX) {
|
||||
const first = seenApiErrorKeys.values().next().value;
|
||||
if (first) seenApiErrorKeys.delete(first);
|
||||
}
|
||||
|
||||
// Extract status code for summary
|
||||
const statusMatch = /^API Error:\s*(\d{3})/.exec(msg.text);
|
||||
const statusCode = statusMatch?.[1] ?? '???';
|
||||
|
||||
void NotificationManager.getInstance()
|
||||
.addTeamNotification({
|
||||
teamEventType: 'api_error',
|
||||
teamName,
|
||||
teamDisplayName,
|
||||
from: msg.from,
|
||||
summary: `API Error ${statusCode}`,
|
||||
body: 'Manual restart needed',
|
||||
dedupeKey,
|
||||
target: { kind: 'member', teamName, memberName: msg.from, focus: 'logs' },
|
||||
projectPath,
|
||||
})
|
||||
.catch(() => undefined);
|
||||
}
|
||||
}
|
||||
|
||||
function scanTeamMessageNotifications(
|
||||
messages: readonly { messageId?: string; from: string; text: string; timestamp: string }[],
|
||||
teamName: string,
|
||||
teamDisplayName: string,
|
||||
projectPath?: string
|
||||
): void {
|
||||
if (messages.length === 0) {
|
||||
return;
|
||||
}
|
||||
checkRateLimitMessages(messages, teamName, teamDisplayName, projectPath);
|
||||
checkApiErrorMessages(messages, teamName, teamDisplayName, projectPath);
|
||||
}
|
||||
|
||||
let teamDataService: TeamDataService | null = null;
|
||||
let teamProvisioningService: TeamProvisioningService | null = null;
|
||||
let teamMemberLogsFinder: TeamMemberLogsFinder | null = null;
|
||||
|
|
@ -1145,17 +963,24 @@ async function handleGetData(
|
|||
|
||||
if (live.length === 0) {
|
||||
if (durableMessages.length > 0) {
|
||||
checkRateLimitMessages(
|
||||
durableMessages,
|
||||
tn,
|
||||
displayName,
|
||||
teamMessageNotificationScanner.checkRateLimitMessages(durableMessages, {
|
||||
teamName: tn,
|
||||
teamDisplayName: displayName,
|
||||
projectPath,
|
||||
isAlive,
|
||||
currentLeadSessionId
|
||||
);
|
||||
checkApiErrorMessages(durableMessages, tn, displayName, projectPath);
|
||||
teamIsAlive: isAlive,
|
||||
currentLeadSessionId,
|
||||
});
|
||||
teamMessageNotificationScanner.checkApiErrorMessages(durableMessages, {
|
||||
teamName: tn,
|
||||
teamDisplayName: displayName,
|
||||
projectPath,
|
||||
});
|
||||
} else {
|
||||
scanTeamMessageNotifications(live, tn, displayName, projectPath);
|
||||
teamMessageNotificationScanner.scan(live, {
|
||||
teamName: tn,
|
||||
teamDisplayName: displayName,
|
||||
projectPath,
|
||||
});
|
||||
}
|
||||
return { success: true, data: { ...data, isAlive } };
|
||||
}
|
||||
|
|
@ -1177,8 +1002,18 @@ async function handleGetData(
|
|||
}
|
||||
}
|
||||
|
||||
checkRateLimitMessages(merged, tn, displayName, projectPath, isAlive, currentLeadSessionId);
|
||||
checkApiErrorMessages(merged, tn, displayName, projectPath);
|
||||
teamMessageNotificationScanner.checkRateLimitMessages(merged, {
|
||||
teamName: tn,
|
||||
teamDisplayName: displayName,
|
||||
projectPath,
|
||||
teamIsAlive: isAlive,
|
||||
currentLeadSessionId,
|
||||
});
|
||||
teamMessageNotificationScanner.checkApiErrorMessages(merged, {
|
||||
teamName: tn,
|
||||
teamDisplayName: displayName,
|
||||
projectPath,
|
||||
});
|
||||
return { success: true, data: { ...data, isAlive } };
|
||||
}
|
||||
|
||||
|
|
@ -2786,27 +2621,27 @@ function buildMessageDeliveryText(
|
|||
'Do NOT answer only with normal assistant text because that will not appear in the UI message thread.',
|
||||
];
|
||||
hiddenBlocks.push(
|
||||
[
|
||||
AGENT_BLOCK_OPEN,
|
||||
`You received a direct message from ${senderDescriptor} via the UI.`,
|
||||
...replyInstructionLines,
|
||||
`Please reply back to recipient "${replyRecipient}" with a short, human-readable answer.`,
|
||||
'If you cannot respond now, reply with a brief status (e.g. "Busy, will reply later").',
|
||||
...(canUseAgentTeamsMessageSend
|
||||
? [
|
||||
'If neither Agent Teams MCP message_send tool name is available before any visible-message tool attempt, write exactly the concise reply text as normal assistant text so the runtime can relay it.',
|
||||
]
|
||||
: []),
|
||||
...(isUserReplyRecipient
|
||||
? [
|
||||
'CRITICAL: If the user asks you to check with the lead or another teammate before you can fully answer, FIRST send a short acknowledgement to "user" so the human sees you started (for example: "Принял, сейчас уточню и вернусь с ответом.").',
|
||||
'Only after that first acknowledgement may you message the lead or another teammate.',
|
||||
'After you get the needed information, send the final answer back to "user".',
|
||||
'Do NOT stay silent while you go ask someone else.',
|
||||
]
|
||||
: []),
|
||||
AGENT_BLOCK_CLOSE,
|
||||
].join('\n')
|
||||
wrapAgentBlock(
|
||||
[
|
||||
`You received a direct message from ${senderDescriptor} via the UI.`,
|
||||
...replyInstructionLines,
|
||||
`Please reply back to recipient "${replyRecipient}" with a short, human-readable answer.`,
|
||||
'If you cannot respond now, reply with a brief status (e.g. "Busy, will reply later").',
|
||||
...(canUseAgentTeamsMessageSend
|
||||
? [
|
||||
'If neither Agent Teams MCP message_send tool name is available before any visible-message tool attempt, write exactly the concise reply text as normal assistant text so the runtime can relay it.',
|
||||
]
|
||||
: []),
|
||||
...(isUserReplyRecipient
|
||||
? [
|
||||
'CRITICAL: If the user asks you to check with the lead or another teammate before you can fully answer, FIRST send a short acknowledgement to "user" so the human sees you started (for example: "Принял, сейчас уточню и вернусь с ответом.").',
|
||||
'Only after that first acknowledgement may you message the lead or another teammate.',
|
||||
'After you get the needed information, send the final answer back to "user".',
|
||||
'Do NOT stay silent while you go ask someone else.',
|
||||
]
|
||||
: []),
|
||||
].join('\n')
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
|
|
@ -2844,12 +2679,11 @@ async function handleGetMessagesPage(
|
|||
.catch(() => ({ displayName: teamName }));
|
||||
void notificationContextPromise
|
||||
.then((notificationContext) => {
|
||||
scanTeamMessageNotifications(
|
||||
messagesPage.messages,
|
||||
teamMessageNotificationScanner.scan(messagesPage.messages, {
|
||||
teamName,
|
||||
notificationContext.displayName,
|
||||
notificationContext.projectPath
|
||||
);
|
||||
teamDisplayName: notificationContext.displayName,
|
||||
projectPath: notificationContext.projectPath,
|
||||
});
|
||||
})
|
||||
.catch((error: unknown) => {
|
||||
logger.debug(
|
||||
|
|
@ -3062,10 +2896,12 @@ async function handleSendMessage(
|
|||
`IMPORTANT: Your text response here is shown to the user in the Messages panel. Always include a brief human-readable reply. Do NOT respond with only an agent-only block.`,
|
||||
...(rosterContextBlock ? [rosterContextBlock] : []),
|
||||
...(delegateAckBlock ? [delegateAckBlock] : []),
|
||||
AGENT_BLOCK_OPEN,
|
||||
`MessageId: ${preGeneratedMessageId}`,
|
||||
`When creating a task from this user message, prefer task_create_from_message with messageId="${preGeneratedMessageId}" for reliable provenance. Only use this exact messageId — never guess or fabricate one.`,
|
||||
AGENT_BLOCK_CLOSE,
|
||||
wrapAgentBlock(
|
||||
[
|
||||
`MessageId: ${preGeneratedMessageId}`,
|
||||
`When creating a task from this user message, prefer task_create_from_message with messageId="${preGeneratedMessageId}" for reliable provenance. Only use this exact messageId — never guess or fabricate one.`,
|
||||
].join('\n')
|
||||
),
|
||||
``,
|
||||
`Message from user:`,
|
||||
buildMessageDeliveryText(payload.text!, {
|
||||
|
|
|
|||
240
src/main/ipc/teams/teamMessageNotificationScanner.ts
Normal file
240
src/main/ipc/teams/teamMessageNotificationScanner.ts
Normal file
|
|
@ -0,0 +1,240 @@
|
|||
import { ConfigManager } from '@main/services/infrastructure/ConfigManager';
|
||||
import { NotificationManager } from '@main/services/infrastructure/NotificationManager';
|
||||
import {
|
||||
getAutoResumeService,
|
||||
planRateLimitAutoResume,
|
||||
type RateLimitAutoResumePlan,
|
||||
} from '@main/services/team/AutoResumeService';
|
||||
import { isApiErrorMessage } from '@shared/utils/apiErrorDetector';
|
||||
import { isRateLimitMessage } from '@shared/utils/rateLimitDetector';
|
||||
|
||||
import type { TeamNotificationPayload } from '@main/utils/teamNotificationBuilder';
|
||||
|
||||
export interface TeamNotificationMessage {
|
||||
messageId?: string;
|
||||
from: string;
|
||||
text: string;
|
||||
timestamp: string;
|
||||
to?: string;
|
||||
source?: string;
|
||||
leadSessionId?: string;
|
||||
}
|
||||
|
||||
interface TeamNotificationSink {
|
||||
addTeamNotification(payload: TeamNotificationPayload): Promise<unknown>;
|
||||
}
|
||||
|
||||
interface AutoResumeSink {
|
||||
handleRateLimitMessage(
|
||||
teamName: string,
|
||||
messageText: string,
|
||||
observedAt: Date,
|
||||
messageTimestamp: Date
|
||||
): void;
|
||||
}
|
||||
|
||||
interface ConfigReader {
|
||||
getConfig(): {
|
||||
notifications: {
|
||||
autoResumeOnRateLimit: boolean;
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
export interface TeamMessageNotificationScannerDeps {
|
||||
configReader?: ConfigReader;
|
||||
notificationSink?: TeamNotificationSink;
|
||||
autoResumeSink?: AutoResumeSink;
|
||||
planAutoResume?: typeof planRateLimitAutoResume;
|
||||
isRateLimit?: (text: string) => boolean;
|
||||
isApiError?: (text: string) => boolean;
|
||||
now?: () => Date;
|
||||
formatClockTime?: (date: Date) => string;
|
||||
}
|
||||
|
||||
export interface TeamMessageNotificationContext {
|
||||
teamName: string;
|
||||
teamDisplayName: string;
|
||||
projectPath?: string;
|
||||
teamIsAlive?: boolean;
|
||||
currentLeadSessionId?: string | null;
|
||||
}
|
||||
|
||||
const SEEN_RATE_LIMIT_KEYS_MAX = 500;
|
||||
const SEEN_API_ERROR_KEYS_MAX = 500;
|
||||
|
||||
function formatNotificationClockTime(date: Date): string {
|
||||
return new Intl.DateTimeFormat(undefined, {
|
||||
hour: '2-digit',
|
||||
minute: '2-digit',
|
||||
hour12: false,
|
||||
}).format(date);
|
||||
}
|
||||
|
||||
function buildRateLimitNotificationBody(
|
||||
plan: RateLimitAutoResumePlan,
|
||||
formatClockTime: (date: Date) => string
|
||||
): string {
|
||||
if (plan.kind === 'scheduled') {
|
||||
return `Auto-resume scheduled at ${formatClockTime(new Date(plan.fireAtMs))}`;
|
||||
}
|
||||
return 'Manual restart needed';
|
||||
}
|
||||
|
||||
function evictOldestIfNeeded(keys: Set<string>, maxSize: number): void {
|
||||
if (keys.size <= maxSize) {
|
||||
return;
|
||||
}
|
||||
|
||||
const first = keys.values().next().value;
|
||||
if (first) {
|
||||
keys.delete(first);
|
||||
}
|
||||
}
|
||||
|
||||
function createDefaultNotificationSink(): TeamNotificationSink {
|
||||
return {
|
||||
addTeamNotification: (payload) => NotificationManager.getInstance().addTeamNotification(payload),
|
||||
};
|
||||
}
|
||||
|
||||
export class TeamMessageNotificationScanner {
|
||||
readonly #seenRateLimitKeys = new Set<string>();
|
||||
readonly #seenApiErrorKeys = new Set<string>();
|
||||
readonly #configReader: ConfigReader;
|
||||
readonly #notificationSink: TeamNotificationSink;
|
||||
readonly #planAutoResume: typeof planRateLimitAutoResume;
|
||||
readonly #isRateLimit: (text: string) => boolean;
|
||||
readonly #isApiError: (text: string) => boolean;
|
||||
readonly #now: () => Date;
|
||||
readonly #formatClockTime: (date: Date) => string;
|
||||
readonly #autoResumeSink: AutoResumeSink | null;
|
||||
|
||||
constructor(deps: TeamMessageNotificationScannerDeps = {}) {
|
||||
this.#configReader = deps.configReader ?? ConfigManager.getInstance();
|
||||
this.#notificationSink = deps.notificationSink ?? createDefaultNotificationSink();
|
||||
this.#planAutoResume = deps.planAutoResume ?? planRateLimitAutoResume;
|
||||
this.#isRateLimit = deps.isRateLimit ?? isRateLimitMessage;
|
||||
this.#isApiError = deps.isApiError ?? isApiErrorMessage;
|
||||
this.#now = deps.now ?? (() => new Date());
|
||||
this.#formatClockTime = deps.formatClockTime ?? formatNotificationClockTime;
|
||||
this.#autoResumeSink = deps.autoResumeSink ?? null;
|
||||
}
|
||||
|
||||
checkRateLimitMessages(
|
||||
messages: readonly TeamNotificationMessage[],
|
||||
context: TeamMessageNotificationContext
|
||||
): void {
|
||||
const observedAt = this.#now();
|
||||
const autoResumeEnabled = this.#configReader.getConfig().notifications.autoResumeOnRateLimit;
|
||||
|
||||
for (const msg of messages) {
|
||||
if (msg.from === 'user') continue;
|
||||
if (!this.#isRateLimit(msg.text)) continue;
|
||||
|
||||
const rawKey = msg.messageId ?? `${msg.from}:${msg.timestamp}`;
|
||||
const dedupeKey = `rate-limit:${context.teamName}:${rawKey}`;
|
||||
const isLeadAutoResumeCandidate =
|
||||
!msg.to && (msg.source === 'lead_process' || msg.source === 'lead_session');
|
||||
const currentLeadSessionId = context.currentLeadSessionId ?? null;
|
||||
const autoResumeSessionMatches =
|
||||
msg.source !== 'lead_session' ||
|
||||
(Boolean(currentLeadSessionId) && msg.leadSessionId === currentLeadSessionId);
|
||||
const autoResumePlan = this.#planAutoResume({
|
||||
enabled: autoResumeEnabled,
|
||||
canAutoResume:
|
||||
(context.teamIsAlive ?? true) &&
|
||||
isLeadAutoResumeCandidate &&
|
||||
autoResumeSessionMatches,
|
||||
messageText: msg.text,
|
||||
observedAt,
|
||||
messageTimestamp: new Date(msg.timestamp),
|
||||
});
|
||||
|
||||
if (!this.#seenRateLimitKeys.has(dedupeKey)) {
|
||||
this.#seenRateLimitKeys.add(dedupeKey);
|
||||
evictOldestIfNeeded(this.#seenRateLimitKeys, SEEN_RATE_LIMIT_KEYS_MAX);
|
||||
|
||||
void this.#notificationSink
|
||||
.addTeamNotification({
|
||||
teamEventType: 'rate_limit',
|
||||
teamName: context.teamName,
|
||||
teamDisplayName: context.teamDisplayName,
|
||||
from: msg.from,
|
||||
summary: 'Rate limit',
|
||||
body: buildRateLimitNotificationBody(autoResumePlan, this.#formatClockTime),
|
||||
dedupeKey,
|
||||
target: {
|
||||
kind: 'member',
|
||||
teamName: context.teamName,
|
||||
memberName: msg.from,
|
||||
focus: 'logs',
|
||||
},
|
||||
projectPath: context.projectPath,
|
||||
})
|
||||
.catch(() => undefined);
|
||||
}
|
||||
|
||||
if (autoResumePlan.kind === 'scheduled') {
|
||||
const autoResumeSink = this.#autoResumeSink ?? getAutoResumeService();
|
||||
autoResumeSink.handleRateLimitMessage(
|
||||
context.teamName,
|
||||
msg.text,
|
||||
observedAt,
|
||||
new Date(msg.timestamp)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
checkApiErrorMessages(
|
||||
messages: readonly TeamNotificationMessage[],
|
||||
context: TeamMessageNotificationContext
|
||||
): void {
|
||||
for (const msg of messages) {
|
||||
if (msg.from === 'user') continue;
|
||||
if (!this.#isApiError(msg.text)) continue;
|
||||
if (this.#isRateLimit(msg.text)) continue;
|
||||
|
||||
const rawKey = msg.messageId ?? `${msg.from}:${msg.timestamp}`;
|
||||
const dedupeKey = `api-error:${context.teamName}:${rawKey}`;
|
||||
|
||||
if (this.#seenApiErrorKeys.has(dedupeKey)) continue;
|
||||
this.#seenApiErrorKeys.add(dedupeKey);
|
||||
evictOldestIfNeeded(this.#seenApiErrorKeys, SEEN_API_ERROR_KEYS_MAX);
|
||||
|
||||
const statusMatch = /^API Error:\s*(\d{3})/.exec(msg.text);
|
||||
const statusCode = statusMatch?.[1] ?? '???';
|
||||
|
||||
void this.#notificationSink
|
||||
.addTeamNotification({
|
||||
teamEventType: 'api_error',
|
||||
teamName: context.teamName,
|
||||
teamDisplayName: context.teamDisplayName,
|
||||
from: msg.from,
|
||||
summary: `API Error ${statusCode}`,
|
||||
body: 'Manual restart needed',
|
||||
dedupeKey,
|
||||
target: {
|
||||
kind: 'member',
|
||||
teamName: context.teamName,
|
||||
memberName: msg.from,
|
||||
focus: 'logs',
|
||||
},
|
||||
projectPath: context.projectPath,
|
||||
})
|
||||
.catch(() => undefined);
|
||||
}
|
||||
}
|
||||
|
||||
scan(messages: readonly TeamNotificationMessage[], context: TeamMessageNotificationContext): void {
|
||||
if (messages.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.checkRateLimitMessages(messages, context);
|
||||
this.checkApiErrorMessages(messages, context);
|
||||
}
|
||||
}
|
||||
|
||||
export const teamMessageNotificationScanner = new TeamMessageNotificationScanner();
|
||||
|
|
@ -2,12 +2,7 @@ import { fromProvisioningMembers, isMixedOpenCodeSideLanePlan } from '@features/
|
|||
import { yieldToEventLoop } from '@main/utils/asyncYield';
|
||||
import { getClaudeBasePath, getTasksBasePath, getTeamsBasePath } from '@main/utils/pathDecoder';
|
||||
import { killProcessByPid } from '@main/utils/processKill';
|
||||
import {
|
||||
AGENT_BLOCK_CLOSE,
|
||||
AGENT_BLOCK_OPEN,
|
||||
stripAgentBlocks,
|
||||
wrapAgentBlock,
|
||||
} from '@shared/constants/agentBlocks';
|
||||
import { stripAgentBlocks, wrapAgentBlock } from '@shared/constants/agentBlocks';
|
||||
import { getMemberColorByName } from '@shared/constants/memberColors';
|
||||
import { isTeamEffortLevel } from '@shared/utils/effortLevels';
|
||||
import { classifyIdleNotificationText } from '@shared/utils/idleNotificationSemantics';
|
||||
|
|
@ -2053,11 +2048,14 @@ export class TeamDataService {
|
|||
parts.push(`\nDetails:\n${task.description.trim()}`);
|
||||
}
|
||||
parts.push(
|
||||
`\n${AGENT_BLOCK_OPEN}`,
|
||||
`Begin work on this task immediately. Keep it moving until it is completed or clearly blocked. Do not leave it idle.`,
|
||||
`Update task status using the board MCP tools:`,
|
||||
`task_complete { teamName: "${teamName}", taskId: "${task.id}" }`,
|
||||
AGENT_BLOCK_CLOSE
|
||||
'',
|
||||
wrapAgentBlock(
|
||||
[
|
||||
`Begin work on this task immediately. Keep it moving until it is completed or clearly blocked. Do not leave it idle.`,
|
||||
`Update task status using the board MCP tools:`,
|
||||
`task_complete { teamName: "${teamName}", taskId: "${task.id}" }`,
|
||||
].join('\n')
|
||||
)
|
||||
);
|
||||
await this.sendMessage(teamName, {
|
||||
member: task.owner,
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
import { AGENT_BLOCK_CLOSE, AGENT_BLOCK_OPEN } from '@shared/constants/agentBlocks';
|
||||
import { wrapAgentBlock } from '@shared/constants/agentBlocks';
|
||||
import * as agentTeamsControllerModule from 'agent-teams-controller';
|
||||
|
||||
import type { AgentActionMode } from '@shared/types';
|
||||
|
|
@ -46,7 +46,7 @@ export function buildActionModeAgentBlock(mode: AgentActionMode | undefined): st
|
|||
}
|
||||
|
||||
const lines = ACTION_MODE_BLOCKS[mode];
|
||||
return `${AGENT_BLOCK_OPEN}\n${lines.join('\n')}\n${AGENT_BLOCK_CLOSE}`;
|
||||
return wrapAgentBlock(lines.join('\n'));
|
||||
}
|
||||
|
||||
export function isAgentActionMode(value: unknown): value is AgentActionMode {
|
||||
|
|
|
|||
171
test/main/ipc/teams/teamMessageNotificationScanner.test.ts
Normal file
171
test/main/ipc/teams/teamMessageNotificationScanner.test.ts
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import {
|
||||
TeamMessageNotificationScanner,
|
||||
type TeamNotificationMessage,
|
||||
} from '../../../../src/main/ipc/teams/teamMessageNotificationScanner';
|
||||
|
||||
import type { RateLimitAutoResumePlan } from '../../../../src/main/services/team/AutoResumeService';
|
||||
import type { TeamNotificationPayload } from '../../../../src/main/utils/teamNotificationBuilder';
|
||||
|
||||
function createMessage(overrides: Partial<TeamNotificationMessage> = {}): TeamNotificationMessage {
|
||||
return {
|
||||
from: 'team-lead',
|
||||
text: "You've hit your limit. Resets in 5 minutes.",
|
||||
timestamp: '2026-04-17T12:00:00.000Z',
|
||||
messageId: 'msg-1',
|
||||
source: 'lead_session',
|
||||
leadSessionId: 'sess-live',
|
||||
...overrides,
|
||||
};
|
||||
}
|
||||
|
||||
describe('TeamMessageNotificationScanner', () => {
|
||||
const notificationSink = {
|
||||
addTeamNotification: vi.fn<() => Promise<unknown>>(),
|
||||
};
|
||||
const autoResumeSink = {
|
||||
handleRateLimitMessage: vi.fn(),
|
||||
};
|
||||
let autoResumeEnabled = true;
|
||||
|
||||
beforeEach(() => {
|
||||
notificationSink.addTeamNotification.mockReset();
|
||||
notificationSink.addTeamNotification.mockResolvedValue(null);
|
||||
autoResumeSink.handleRateLimitMessage.mockReset();
|
||||
autoResumeEnabled = true;
|
||||
});
|
||||
|
||||
function createScanner(options?: {
|
||||
isRateLimit?: (text: string) => boolean;
|
||||
isApiError?: (text: string) => boolean;
|
||||
planAutoResume?: (input: {
|
||||
enabled: boolean;
|
||||
canAutoResume: boolean;
|
||||
messageText: string;
|
||||
observedAt: Date;
|
||||
messageTimestamp?: Date;
|
||||
}) => RateLimitAutoResumePlan;
|
||||
}): TeamMessageNotificationScanner {
|
||||
return new TeamMessageNotificationScanner({
|
||||
configReader: {
|
||||
getConfig: () => ({ notifications: { autoResumeOnRateLimit: autoResumeEnabled } }),
|
||||
},
|
||||
notificationSink,
|
||||
autoResumeSink,
|
||||
now: () => new Date('2026-04-17T12:02:00.000Z'),
|
||||
formatClockTime: () => '12:05',
|
||||
isRateLimit: options?.isRateLimit ?? ((text) => text.includes('limit')),
|
||||
isApiError: options?.isApiError ?? ((text) => text.startsWith('API Error:')),
|
||||
planAutoResume:
|
||||
options?.planAutoResume ??
|
||||
((input) =>
|
||||
input.enabled && input.canAutoResume
|
||||
? {
|
||||
kind: 'scheduled',
|
||||
resetTime: new Date('2026-04-17T12:05:00.000Z'),
|
||||
delayMs: 180_000,
|
||||
fireAtMs: Date.parse('2026-04-17T12:05:30.000Z'),
|
||||
rawDelayMs: 180_000,
|
||||
}
|
||||
: { kind: 'manual', reason: 'disabled' }),
|
||||
});
|
||||
}
|
||||
|
||||
it('notifies and schedules auto-resume for a live lead rate-limit message', () => {
|
||||
const scanner = createScanner();
|
||||
|
||||
scanner.checkRateLimitMessages([createMessage()], {
|
||||
teamName: 'my-team',
|
||||
teamDisplayName: 'My Team',
|
||||
projectPath: '/tmp/project',
|
||||
teamIsAlive: true,
|
||||
currentLeadSessionId: 'sess-live',
|
||||
});
|
||||
|
||||
expect(notificationSink.addTeamNotification).toHaveBeenCalledWith(
|
||||
expect.objectContaining<TeamNotificationPayload>({
|
||||
teamEventType: 'rate_limit',
|
||||
teamName: 'my-team',
|
||||
teamDisplayName: 'My Team',
|
||||
from: 'team-lead',
|
||||
summary: 'Rate limit',
|
||||
body: 'Auto-resume scheduled at 12:05',
|
||||
dedupeKey: 'rate-limit:my-team:msg-1',
|
||||
target: { kind: 'member', teamName: 'my-team', memberName: 'team-lead', focus: 'logs' },
|
||||
projectPath: '/tmp/project',
|
||||
})
|
||||
);
|
||||
expect(autoResumeSink.handleRateLimitMessage).toHaveBeenCalledWith(
|
||||
'my-team',
|
||||
"You've hit your limit. Resets in 5 minutes.",
|
||||
new Date('2026-04-17T12:02:00.000Z'),
|
||||
new Date('2026-04-17T12:00:00.000Z')
|
||||
);
|
||||
});
|
||||
|
||||
it('dedupes notification storage but still re-evaluates auto-resume later', () => {
|
||||
const scanner = createScanner();
|
||||
const context = {
|
||||
teamName: 'my-team',
|
||||
teamDisplayName: 'My Team',
|
||||
teamIsAlive: true,
|
||||
currentLeadSessionId: 'sess-live',
|
||||
};
|
||||
|
||||
autoResumeEnabled = false;
|
||||
scanner.checkRateLimitMessages([createMessage()], context);
|
||||
expect(notificationSink.addTeamNotification).toHaveBeenCalledTimes(1);
|
||||
expect(autoResumeSink.handleRateLimitMessage).not.toHaveBeenCalled();
|
||||
|
||||
autoResumeEnabled = true;
|
||||
scanner.checkRateLimitMessages([createMessage()], context);
|
||||
|
||||
expect(notificationSink.addTeamNotification).toHaveBeenCalledTimes(1);
|
||||
expect(autoResumeSink.handleRateLimitMessage).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('does not schedule auto-resume from an older lead session', () => {
|
||||
const scanner = createScanner();
|
||||
|
||||
scanner.checkRateLimitMessages(
|
||||
[createMessage({ leadSessionId: 'sess-old', messageId: 'old-session' })],
|
||||
{
|
||||
teamName: 'my-team',
|
||||
teamDisplayName: 'My Team',
|
||||
teamIsAlive: true,
|
||||
currentLeadSessionId: 'sess-live',
|
||||
}
|
||||
);
|
||||
|
||||
expect(notificationSink.addTeamNotification).toHaveBeenCalledTimes(1);
|
||||
expect(autoResumeSink.handleRateLimitMessage).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('sends API-error notifications while leaving rate limits to the rate-limit path', () => {
|
||||
const scanner = createScanner({
|
||||
isRateLimit: (text) => text.includes('429'),
|
||||
isApiError: (text) => text.startsWith('API Error:'),
|
||||
});
|
||||
|
||||
scanner.checkApiErrorMessages(
|
||||
[
|
||||
createMessage({ text: 'API Error: 429 rate limited', messageId: 'rate-limit-api' }),
|
||||
createMessage({ text: 'API Error: 500 server failed', messageId: 'api-500' }),
|
||||
],
|
||||
{
|
||||
teamName: 'my-team',
|
||||
teamDisplayName: 'My Team',
|
||||
}
|
||||
);
|
||||
|
||||
expect(notificationSink.addTeamNotification).toHaveBeenCalledTimes(1);
|
||||
expect(notificationSink.addTeamNotification).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
teamEventType: 'api_error',
|
||||
summary: 'API Error 500',
|
||||
dedupeKey: 'api-error:my-team:api-500',
|
||||
})
|
||||
);
|
||||
});
|
||||
});
|
||||
Loading…
Reference in a new issue