fix(team): preserve live overlay in worker message pages

This commit is contained in:
777genius 2026-05-30 18:42:28 +03:00
parent 180bdb7575
commit a06423a574
7 changed files with 294 additions and 66 deletions

View file

@ -194,6 +194,7 @@ import type {
CreateTaskRequest,
EffortLevel,
GlobalTask,
InboxMessage,
IpcResult,
KanbanColumnId,
LeadActivitySnapshot,
@ -331,6 +332,33 @@ function noteHeavyTeamDataWorkerFallback(operation: string): void {
);
}
async function getNewestMessagesPageWithLiveOverlay(input: {
teamName: string;
limit: number;
liveMessages: InboxMessage[];
includeUndefinedCursorInFallback?: boolean;
}): Promise<MessagesPage> {
const { teamName, limit, liveMessages } = input;
const worker = getTeamDataWorkerClient();
const options = input.includeUndefinedCursorInFallback
? { cursor: undefined, limit, liveMessages }
: { limit, liveMessages };
if (worker.isAvailable()) {
try {
return await worker.getMessagesPage(teamName, options);
} catch (workerErr) {
logger.warn(
`[teams:getMessagesPage] worker failed for live overlay, falling back: ${
workerErr instanceof Error ? workerErr.message : workerErr
}`
);
}
}
noteHeavyTeamDataWorkerFallback('teams:getMessagesPage.liveOverlay');
return getTeamDataService().getMessagesPage(teamName, options);
}
function invalidateTeamRosterSnapshotCaches(teamName: string): void {
TeamConfigReader.invalidateTeam(teamName);
const teamDataService = getTeamDataService();
@ -992,7 +1020,8 @@ async function handleGetData(
let merged = mergeLiveLeadProcessMessages(durableMessages, live);
if (durableMessages.length >= 50) {
try {
const newestPage = await teamDataService.getMessagesPage(tn, {
const newestPage = await getNewestMessagesPageWithLiveOverlay({
teamName: tn,
limit: 50,
liveMessages: live,
});
@ -2722,10 +2751,11 @@ async function handleGetMessagesPage(
cursor == null ? getTeamProvisioningService().getLiveLeadProcessMessages(teamName) : [];
if (liveMessages.length > 0) {
page = await getTeamDataService().getMessagesPage(teamName, {
cursor,
page = await getNewestMessagesPageWithLiveOverlay({
teamName,
limit,
liveMessages,
includeUndefinedCursorInFallback: true,
});
scanNotifications(page);
return page;

View file

@ -33,10 +33,7 @@ import {
import { atomicWriteAsync } from './atomicWrite';
import { extractLeadSessionMessagesFromJsonl } from './leadSessionMessageExtractor';
import { MemberActivityMetaService } from './MemberActivityMetaService';
import {
getLiveLeadProcessMessageKey,
mergeLiveLeadProcessMessages,
} from './mergeLiveLeadProcessMessages';
import { mergeLiveLeadProcessMessagesPage } from './mergeLiveLeadProcessMessages';
import { buildTaskChangePresenceDescriptor } from './taskChangePresenceUtils';
import {
choosePreferredLaunchSnapshot,
@ -1504,9 +1501,6 @@ export class TeamDataService {
): Promise<MessagesPage> {
const feed = await this.messageFeedService.getFeed(teamName);
const newestDurableMessages = feed.messages;
const durableMessageIndexByKey = new Map(
newestDurableMessages.map((message, index) => [getLiveLeadProcessMessageKey(message), index])
);
let messages = newestDurableMessages;
if (options.cursor) {
@ -1533,55 +1527,12 @@ export class TeamDataService {
// Merge live lead thoughts against the full durable newest-page history so we do not
// re-introduce persisted thoughts that have simply paged off the first durable page.
const displayMessages = mergeLiveLeadProcessMessages(
newestDurableMessages,
options.liveMessages
).slice(0, options.limit);
if (displayMessages.length === 0) {
return {
messages: displayMessages,
nextCursor: null,
hasMore: false,
feedRevision: feed.feedRevision,
};
}
let lastDurableDisplayed: InboxMessage | null = null;
for (let index = displayMessages.length - 1; index >= 0; index -= 1) {
const candidate = displayMessages[index];
if (durableMessageIndexByKey.has(getLiveLeadProcessMessageKey(candidate))) {
lastDurableDisplayed = candidate;
break;
}
}
if (!lastDurableDisplayed) {
const boundary = displayMessages[displayMessages.length - 1];
return {
messages: displayMessages,
nextCursor:
newestDurableMessages.length > 0
? `${boundary.timestamp}|${boundary.messageId ?? ''}`
: null,
hasMore: newestDurableMessages.length > 0,
feedRevision: feed.feedRevision,
};
}
const durableIndex =
durableMessageIndexByKey.get(getLiveLeadProcessMessageKey(lastDurableDisplayed)) ??
Number.POSITIVE_INFINITY;
const durableHasMore = durableIndex < newestDurableMessages.length - 1;
return {
messages: displayMessages,
nextCursor: durableHasMore
? `${lastDurableDisplayed.timestamp}|${lastDurableDisplayed.messageId ?? ''}`
: null,
hasMore: durableHasMore,
return mergeLiveLeadProcessMessagesPage({
durableMessages: newestDurableMessages,
liveMessages: options.liveMessages,
limit: options.limit,
feedRevision: feed.feedRevision,
};
});
}
async getMessageFeed(
@ -3179,12 +3130,11 @@ export class TeamDataService {
}
} finally {
const current = this.fileWatchReconcileDiagnostics.get(teamName);
if (!current) {
return;
}
current.inFlight = Math.max(0, current.inFlight - 1);
if (current.inFlight === 0 && Date.now() - current.windowStartedAt > 30_000) {
this.fileWatchReconcileDiagnostics.delete(teamName);
if (current) {
current.inFlight = Math.max(0, current.inFlight - 1);
if (current.inFlight === 0 && Date.now() - current.windowStartedAt > 30_000) {
this.fileWatchReconcileDiagnostics.delete(teamName);
}
}
}
}

View file

@ -15,6 +15,7 @@ import { createLogger } from '@shared/utils/logger';
import type { TeamDataWorkerRequest, TeamDataWorkerResponse } from './teamDataWorkerTypes';
import type {
InboxMessage,
MemberLogSummary,
MessagesPage,
TeamGetDataOptions,
@ -81,6 +82,17 @@ function getTeamDataRequestPayload(
return normalizedOptions ? { teamName, options: normalizedOptions } : { teamName };
}
function getLiveMessagesRequestKey(liveMessages?: InboxMessage[]): unknown {
if (!liveMessages?.length) return undefined;
return liveMessages.map((message) => ({
messageId: message.messageId,
timestamp: message.timestamp,
source: message.source,
from: message.from,
text: message.text,
}));
}
function summarizeWorkerRequest(request: TeamDataWorkerRequest): Record<string, unknown> {
switch (request.op) {
case 'warmup':
@ -98,6 +110,7 @@ function summarizeWorkerRequest(request: TeamDataWorkerRequest): Record<string,
teamName,
cursor: typeof options.cursor === 'string' ? options.cursor.slice(0, 24) : options.cursor,
limit: options.limit,
liveMessages: options.liveMessages?.length,
};
}
case 'getMemberActivityMeta':
@ -336,13 +349,14 @@ export class TeamDataWorkerClient {
async getMessagesPage(
teamName: string,
options: { cursor?: string | null; limit: number }
options: { cursor?: string | null; limit: number; liveMessages?: InboxMessage[] }
): Promise<MessagesPage> {
if (!SAFE_NAME_RE.test(teamName)) throw new Error('Invalid teamName');
const key = JSON.stringify({
teamName,
cursor: options.cursor ?? null,
limit: options.limit,
liveMessages: getLiveMessagesRequestKey(options.liveMessages),
});
const existing = this.getMessagesPageInFlight.get(key);
if (existing) return existing;

View file

@ -1,4 +1,4 @@
import type { InboxMessage } from '@shared/types';
import type { InboxMessage, MessagesPage } from '@shared/types';
export function getLiveLeadProcessMessageKey(message: {
messageId?: string;
@ -71,3 +71,65 @@ export function mergeLiveLeadProcessMessages(
merged.sort((left, right) => Date.parse(right.timestamp) - Date.parse(left.timestamp));
return merged;
}
export function mergeLiveLeadProcessMessagesPage(input: {
durableMessages: InboxMessage[];
liveMessages: InboxMessage[];
limit: number;
feedRevision: string;
durableHasMoreAfterWindow?: boolean;
}): MessagesPage {
const displayMessages = mergeLiveLeadProcessMessages(
input.durableMessages,
input.liveMessages
).slice(0, input.limit);
if (displayMessages.length === 0) {
return {
messages: displayMessages,
nextCursor: null,
hasMore: false,
feedRevision: input.feedRevision,
};
}
const durableMessageIndexByKey = new Map(
input.durableMessages.map((message, index) => [getLiveLeadProcessMessageKey(message), index])
);
let lastDurableDisplayed: InboxMessage | null = null;
for (let index = displayMessages.length - 1; index >= 0; index -= 1) {
const candidate = displayMessages[index];
if (durableMessageIndexByKey.has(getLiveLeadProcessMessageKey(candidate))) {
lastDurableDisplayed = candidate;
break;
}
}
if (!lastDurableDisplayed) {
const boundary = displayMessages[displayMessages.length - 1];
return {
messages: displayMessages,
nextCursor:
input.durableMessages.length > 0 || input.durableHasMoreAfterWindow
? `${boundary.timestamp}|${boundary.messageId ?? ''}`
: null,
hasMore: input.durableMessages.length > 0 || Boolean(input.durableHasMoreAfterWindow),
feedRevision: input.feedRevision,
};
}
const durableIndex =
durableMessageIndexByKey.get(getLiveLeadProcessMessageKey(lastDurableDisplayed)) ??
Number.POSITIVE_INFINITY;
const durableHasMore =
durableIndex < input.durableMessages.length - 1 || Boolean(input.durableHasMoreAfterWindow);
return {
messages: displayMessages,
nextCursor: durableHasMore
? `${lastDurableDisplayed.timestamp}|${lastDurableDisplayed.messageId ?? ''}`
: null,
hasMore: durableHasMore,
feedRevision: input.feedRevision,
};
}

View file

@ -3,6 +3,7 @@
*/
import type {
InboxMessage,
MemberLogSummary,
MessagesPage,
TeamGetDataOptions,
@ -22,6 +23,7 @@ export interface GetMessagesPagePayload {
options: {
cursor?: string | null;
limit: number;
liveMessages?: InboxMessage[];
};
}

View file

@ -1896,6 +1896,53 @@ describe('ipc teams handlers', () => {
expect(service.getMessagesPage).not.toHaveBeenCalled();
});
it('keeps live message overlay on TEAM_GET_MESSAGES_PAGE in worker path', async () => {
mockTeamDataWorkerClient.isAvailable.mockReturnValue(true);
const liveMessage: InboxMessage = {
from: 'team-lead',
text: 'Команда поднята, приступаю к раздаче задач.',
timestamp: '2026-02-23T10:00:01.000Z',
read: true,
source: 'lead_process' as const,
messageId: 'live-1',
};
mockTeamDataWorkerClient.getMessagesPage.mockResolvedValueOnce({
messages: [
liveMessage,
{
from: 'user',
text: 'Ping',
timestamp: '2026-02-23T10:00:00.000Z',
read: true,
source: 'user_sent' as const,
messageId: 'durable-1',
},
],
nextCursor: null,
hasMore: false,
feedRevision: 'rev-worker',
});
provisioningService.getLiveLeadProcessMessages.mockReturnValueOnce([liveMessage]);
const handler = handlers.get(TEAM_GET_MESSAGES_PAGE)!;
const result = (await handler({} as never, 'my-team', {
limit: 20,
})) as { success: boolean; data: MessagesPage };
expect(result.success).toBe(true);
expect(result.data.messages.map((message) => message.messageId)).toEqual([
'live-1',
'durable-1',
]);
expect(result.data.feedRevision).toBe('rev-worker');
expect(mockTeamDataWorkerClient.getMessagesPage).toHaveBeenCalledWith('my-team', {
cursor: undefined,
limit: 20,
liveMessages: [liveMessage],
});
expect(service.getMessagesPage).not.toHaveBeenCalled();
});
it('scans rate-limit notifications from message-page results without hydrating TEAM_GET_DATA feed', async () => {
mockTeamDataWorkerClient.isAvailable.mockReturnValue(true);
mockTeamDataWorkerClient.getMessagesPage.mockResolvedValueOnce({
@ -2457,6 +2504,64 @@ describe('ipc teams handlers', () => {
expect(result.data.messages).toHaveLength(50);
});
it('rebuilds capped TEAM_GET_DATA live overlay through worker when available', async () => {
mockTeamDataWorkerClient.isAvailable.mockReturnValue(true);
const liveMessage: InboxMessage = {
from: 'team-lead',
text: 'Live thought',
timestamp: '2026-02-23T11:00:00.000Z',
read: true,
source: 'lead_process' as const,
messageId: 'live-1',
};
mockTeamDataWorkerClient.getTeamData.mockResolvedValueOnce({
teamName: 'my-team',
config: { name: 'My Team' },
tasks: [],
members: [],
messages: Array.from({ length: 50 }, (_, index) => ({
from: 'alice',
text: `filler-${index}`,
timestamp: `2026-02-23T10:${String(index).padStart(2, '0')}:00.000Z`,
read: true,
source: 'inbox' as const,
messageId: `durable-${index}`,
})),
kanbanState: { teamName: 'my-team', reviewers: [], tasks: {} },
processes: [],
});
mockTeamDataWorkerClient.getMessagesPage.mockResolvedValueOnce({
messages: [
{
from: 'alice',
text: 'filler-0',
timestamp: '2026-02-23T10:00:00.000Z',
read: true,
source: 'inbox' as const,
messageId: 'durable-0',
},
],
nextCursor: null,
hasMore: false,
feedRevision: 'rev-worker',
});
provisioningService.getLiveLeadProcessMessages.mockReturnValueOnce([liveMessage]);
const getDataHandler = handlers.get(TEAM_GET_DATA)!;
const result = (await getDataHandler({} as never, 'my-team')) as {
success: boolean;
data: { messages?: InboxMessage[] };
};
expect(result.success).toBe(true);
expect(mockTeamDataWorkerClient.getMessagesPage).toHaveBeenCalledWith('my-team', {
limit: 50,
liveMessages: [liveMessage],
});
expect(service.getMessagesPage).not.toHaveBeenCalled();
expect(result.data.messages).toHaveLength(50);
});
it('overlays live lead_process messages onto the newest messages page', async () => {
service.getMessagesPage.mockImplementationOnce(async (...args: unknown[]) => {
const { liveMessages = [] } = (args[1] ?? {}) as { liveMessages?: InboxMessage[] };

View file

@ -229,6 +229,71 @@ describe('TeamDataWorkerClient', () => {
client.dispose();
});
it('does not deduplicate getMessagesPage calls with different live overlays', async () => {
const { TeamDataWorkerClient } = await import(
'../../../../src/main/services/team/TeamDataWorkerClient'
);
const client = new TeamDataWorkerClient();
await Promise.all([
client.getMessagesPage('my-team', {
cursor: null,
limit: 50,
liveMessages: [
{
from: 'team-lead',
text: 'first',
timestamp: '2026-02-23T10:00:00.000Z',
read: true,
source: 'lead_process',
messageId: 'live-1',
},
],
}),
client.getMessagesPage('my-team', {
cursor: null,
limit: 50,
liveMessages: [
{
from: 'team-lead',
text: 'second',
timestamp: '2026-02-23T10:00:01.000Z',
read: true,
source: 'lead_process',
messageId: 'live-2',
},
],
}),
]);
expect(hoisted.workers).toHaveLength(1);
expect(hoisted.workers[0].messages).toHaveLength(2);
expect(hoisted.workers[0].messages[0]).toMatchObject({
op: 'getMessagesPage',
payload: {
teamName: 'my-team',
options: {
cursor: null,
limit: 50,
liveMessages: [expect.objectContaining({ messageId: 'live-1' })],
},
},
});
expect(hoisted.workers[0].messages[1]).toMatchObject({
op: 'getMessagesPage',
payload: {
teamName: 'my-team',
options: {
cursor: null,
limit: 50,
liveMessages: [expect.objectContaining({ messageId: 'live-2' })],
},
},
});
client.dispose();
});
it('sends best-effort message feed invalidation to the worker', async () => {
const { TeamDataWorkerClient } = await import(
'../../../../src/main/services/team/TeamDataWorkerClient'