From a06423a57455a9d19c41b818349dbaecc4c50efd Mon Sep 17 00:00:00 2001 From: 777genius Date: Sat, 30 May 2026 18:42:28 +0300 Subject: [PATCH] fix(team): preserve live overlay in worker message pages --- src/main/ipc/teams.ts | 36 +++++- src/main/services/team/TeamDataService.ts | 72 ++---------- .../services/team/TeamDataWorkerClient.ts | 16 ++- .../team/mergeLiveLeadProcessMessages.ts | 64 ++++++++++- src/main/services/team/teamDataWorkerTypes.ts | 2 + test/main/ipc/teams.test.ts | 105 ++++++++++++++++++ .../team/TeamDataWorkerClient.test.ts | 65 +++++++++++ 7 files changed, 294 insertions(+), 66 deletions(-) diff --git a/src/main/ipc/teams.ts b/src/main/ipc/teams.ts index 17955bae..9f0d44df 100644 --- a/src/main/ipc/teams.ts +++ b/src/main/ipc/teams.ts @@ -194,6 +194,7 @@ import type { CreateTaskRequest, EffortLevel, GlobalTask, + InboxMessage, IpcResult, KanbanColumnId, LeadActivitySnapshot, @@ -331,6 +332,33 @@ function noteHeavyTeamDataWorkerFallback(operation: string): void { ); } +async function getNewestMessagesPageWithLiveOverlay(input: { + teamName: string; + limit: number; + liveMessages: InboxMessage[]; + includeUndefinedCursorInFallback?: boolean; +}): Promise { + const { teamName, limit, liveMessages } = input; + const worker = getTeamDataWorkerClient(); + const options = input.includeUndefinedCursorInFallback + ? { cursor: undefined, limit, liveMessages } + : { limit, liveMessages }; + if (worker.isAvailable()) { + try { + return await worker.getMessagesPage(teamName, options); + } catch (workerErr) { + logger.warn( + `[teams:getMessagesPage] worker failed for live overlay, falling back: ${ + workerErr instanceof Error ? workerErr.message : workerErr + }` + ); + } + } + + noteHeavyTeamDataWorkerFallback('teams:getMessagesPage.liveOverlay'); + return getTeamDataService().getMessagesPage(teamName, options); +} + function invalidateTeamRosterSnapshotCaches(teamName: string): void { TeamConfigReader.invalidateTeam(teamName); const teamDataService = getTeamDataService(); @@ -992,7 +1020,8 @@ async function handleGetData( let merged = mergeLiveLeadProcessMessages(durableMessages, live); if (durableMessages.length >= 50) { try { - const newestPage = await teamDataService.getMessagesPage(tn, { + const newestPage = await getNewestMessagesPageWithLiveOverlay({ + teamName: tn, limit: 50, liveMessages: live, }); @@ -2722,10 +2751,11 @@ async function handleGetMessagesPage( cursor == null ? getTeamProvisioningService().getLiveLeadProcessMessages(teamName) : []; if (liveMessages.length > 0) { - page = await getTeamDataService().getMessagesPage(teamName, { - cursor, + page = await getNewestMessagesPageWithLiveOverlay({ + teamName, limit, liveMessages, + includeUndefinedCursorInFallback: true, }); scanNotifications(page); return page; diff --git a/src/main/services/team/TeamDataService.ts b/src/main/services/team/TeamDataService.ts index 3859bb00..a2cc19fe 100644 --- a/src/main/services/team/TeamDataService.ts +++ b/src/main/services/team/TeamDataService.ts @@ -33,10 +33,7 @@ import { import { atomicWriteAsync } from './atomicWrite'; import { extractLeadSessionMessagesFromJsonl } from './leadSessionMessageExtractor'; import { MemberActivityMetaService } from './MemberActivityMetaService'; -import { - getLiveLeadProcessMessageKey, - mergeLiveLeadProcessMessages, -} from './mergeLiveLeadProcessMessages'; +import { mergeLiveLeadProcessMessagesPage } from './mergeLiveLeadProcessMessages'; import { buildTaskChangePresenceDescriptor } from './taskChangePresenceUtils'; import { choosePreferredLaunchSnapshot, @@ -1504,9 +1501,6 @@ export class TeamDataService { ): Promise { const feed = await this.messageFeedService.getFeed(teamName); const newestDurableMessages = feed.messages; - const durableMessageIndexByKey = new Map( - newestDurableMessages.map((message, index) => [getLiveLeadProcessMessageKey(message), index]) - ); let messages = newestDurableMessages; if (options.cursor) { @@ -1533,55 +1527,12 @@ export class TeamDataService { // Merge live lead thoughts against the full durable newest-page history so we do not // re-introduce persisted thoughts that have simply paged off the first durable page. - const displayMessages = mergeLiveLeadProcessMessages( - newestDurableMessages, - options.liveMessages - ).slice(0, options.limit); - - if (displayMessages.length === 0) { - return { - messages: displayMessages, - nextCursor: null, - hasMore: false, - feedRevision: feed.feedRevision, - }; - } - - let lastDurableDisplayed: InboxMessage | null = null; - for (let index = displayMessages.length - 1; index >= 0; index -= 1) { - const candidate = displayMessages[index]; - if (durableMessageIndexByKey.has(getLiveLeadProcessMessageKey(candidate))) { - lastDurableDisplayed = candidate; - break; - } - } - - if (!lastDurableDisplayed) { - const boundary = displayMessages[displayMessages.length - 1]; - return { - messages: displayMessages, - nextCursor: - newestDurableMessages.length > 0 - ? `${boundary.timestamp}|${boundary.messageId ?? ''}` - : null, - hasMore: newestDurableMessages.length > 0, - feedRevision: feed.feedRevision, - }; - } - - const durableIndex = - durableMessageIndexByKey.get(getLiveLeadProcessMessageKey(lastDurableDisplayed)) ?? - Number.POSITIVE_INFINITY; - const durableHasMore = durableIndex < newestDurableMessages.length - 1; - - return { - messages: displayMessages, - nextCursor: durableHasMore - ? `${lastDurableDisplayed.timestamp}|${lastDurableDisplayed.messageId ?? ''}` - : null, - hasMore: durableHasMore, + return mergeLiveLeadProcessMessagesPage({ + durableMessages: newestDurableMessages, + liveMessages: options.liveMessages, + limit: options.limit, feedRevision: feed.feedRevision, - }; + }); } async getMessageFeed( @@ -3179,12 +3130,11 @@ export class TeamDataService { } } finally { const current = this.fileWatchReconcileDiagnostics.get(teamName); - if (!current) { - return; - } - current.inFlight = Math.max(0, current.inFlight - 1); - if (current.inFlight === 0 && Date.now() - current.windowStartedAt > 30_000) { - this.fileWatchReconcileDiagnostics.delete(teamName); + if (current) { + current.inFlight = Math.max(0, current.inFlight - 1); + if (current.inFlight === 0 && Date.now() - current.windowStartedAt > 30_000) { + this.fileWatchReconcileDiagnostics.delete(teamName); + } } } } diff --git a/src/main/services/team/TeamDataWorkerClient.ts b/src/main/services/team/TeamDataWorkerClient.ts index 582c783d..276bdf9c 100644 --- a/src/main/services/team/TeamDataWorkerClient.ts +++ b/src/main/services/team/TeamDataWorkerClient.ts @@ -15,6 +15,7 @@ import { createLogger } from '@shared/utils/logger'; import type { TeamDataWorkerRequest, TeamDataWorkerResponse } from './teamDataWorkerTypes'; import type { + InboxMessage, MemberLogSummary, MessagesPage, TeamGetDataOptions, @@ -81,6 +82,17 @@ function getTeamDataRequestPayload( return normalizedOptions ? { teamName, options: normalizedOptions } : { teamName }; } +function getLiveMessagesRequestKey(liveMessages?: InboxMessage[]): unknown { + if (!liveMessages?.length) return undefined; + return liveMessages.map((message) => ({ + messageId: message.messageId, + timestamp: message.timestamp, + source: message.source, + from: message.from, + text: message.text, + })); +} + function summarizeWorkerRequest(request: TeamDataWorkerRequest): Record { switch (request.op) { case 'warmup': @@ -98,6 +110,7 @@ function summarizeWorkerRequest(request: TeamDataWorkerRequest): Record { if (!SAFE_NAME_RE.test(teamName)) throw new Error('Invalid teamName'); const key = JSON.stringify({ teamName, cursor: options.cursor ?? null, limit: options.limit, + liveMessages: getLiveMessagesRequestKey(options.liveMessages), }); const existing = this.getMessagesPageInFlight.get(key); if (existing) return existing; diff --git a/src/main/services/team/mergeLiveLeadProcessMessages.ts b/src/main/services/team/mergeLiveLeadProcessMessages.ts index 5b613caf..b72083f3 100644 --- a/src/main/services/team/mergeLiveLeadProcessMessages.ts +++ b/src/main/services/team/mergeLiveLeadProcessMessages.ts @@ -1,4 +1,4 @@ -import type { InboxMessage } from '@shared/types'; +import type { InboxMessage, MessagesPage } from '@shared/types'; export function getLiveLeadProcessMessageKey(message: { messageId?: string; @@ -71,3 +71,65 @@ export function mergeLiveLeadProcessMessages( merged.sort((left, right) => Date.parse(right.timestamp) - Date.parse(left.timestamp)); return merged; } + +export function mergeLiveLeadProcessMessagesPage(input: { + durableMessages: InboxMessage[]; + liveMessages: InboxMessage[]; + limit: number; + feedRevision: string; + durableHasMoreAfterWindow?: boolean; +}): MessagesPage { + const displayMessages = mergeLiveLeadProcessMessages( + input.durableMessages, + input.liveMessages + ).slice(0, input.limit); + + if (displayMessages.length === 0) { + return { + messages: displayMessages, + nextCursor: null, + hasMore: false, + feedRevision: input.feedRevision, + }; + } + + const durableMessageIndexByKey = new Map( + input.durableMessages.map((message, index) => [getLiveLeadProcessMessageKey(message), index]) + ); + let lastDurableDisplayed: InboxMessage | null = null; + for (let index = displayMessages.length - 1; index >= 0; index -= 1) { + const candidate = displayMessages[index]; + if (durableMessageIndexByKey.has(getLiveLeadProcessMessageKey(candidate))) { + lastDurableDisplayed = candidate; + break; + } + } + + if (!lastDurableDisplayed) { + const boundary = displayMessages[displayMessages.length - 1]; + return { + messages: displayMessages, + nextCursor: + input.durableMessages.length > 0 || input.durableHasMoreAfterWindow + ? `${boundary.timestamp}|${boundary.messageId ?? ''}` + : null, + hasMore: input.durableMessages.length > 0 || Boolean(input.durableHasMoreAfterWindow), + feedRevision: input.feedRevision, + }; + } + + const durableIndex = + durableMessageIndexByKey.get(getLiveLeadProcessMessageKey(lastDurableDisplayed)) ?? + Number.POSITIVE_INFINITY; + const durableHasMore = + durableIndex < input.durableMessages.length - 1 || Boolean(input.durableHasMoreAfterWindow); + + return { + messages: displayMessages, + nextCursor: durableHasMore + ? `${lastDurableDisplayed.timestamp}|${lastDurableDisplayed.messageId ?? ''}` + : null, + hasMore: durableHasMore, + feedRevision: input.feedRevision, + }; +} diff --git a/src/main/services/team/teamDataWorkerTypes.ts b/src/main/services/team/teamDataWorkerTypes.ts index 5ef127df..7438730f 100644 --- a/src/main/services/team/teamDataWorkerTypes.ts +++ b/src/main/services/team/teamDataWorkerTypes.ts @@ -3,6 +3,7 @@ */ import type { + InboxMessage, MemberLogSummary, MessagesPage, TeamGetDataOptions, @@ -22,6 +23,7 @@ export interface GetMessagesPagePayload { options: { cursor?: string | null; limit: number; + liveMessages?: InboxMessage[]; }; } diff --git a/test/main/ipc/teams.test.ts b/test/main/ipc/teams.test.ts index a43f261a..851c3bca 100644 --- a/test/main/ipc/teams.test.ts +++ b/test/main/ipc/teams.test.ts @@ -1896,6 +1896,53 @@ describe('ipc teams handlers', () => { expect(service.getMessagesPage).not.toHaveBeenCalled(); }); + it('keeps live message overlay on TEAM_GET_MESSAGES_PAGE in worker path', async () => { + mockTeamDataWorkerClient.isAvailable.mockReturnValue(true); + const liveMessage: InboxMessage = { + from: 'team-lead', + text: 'Команда поднята, приступаю к раздаче задач.', + timestamp: '2026-02-23T10:00:01.000Z', + read: true, + source: 'lead_process' as const, + messageId: 'live-1', + }; + mockTeamDataWorkerClient.getMessagesPage.mockResolvedValueOnce({ + messages: [ + liveMessage, + { + from: 'user', + text: 'Ping', + timestamp: '2026-02-23T10:00:00.000Z', + read: true, + source: 'user_sent' as const, + messageId: 'durable-1', + }, + ], + nextCursor: null, + hasMore: false, + feedRevision: 'rev-worker', + }); + provisioningService.getLiveLeadProcessMessages.mockReturnValueOnce([liveMessage]); + + const handler = handlers.get(TEAM_GET_MESSAGES_PAGE)!; + const result = (await handler({} as never, 'my-team', { + limit: 20, + })) as { success: boolean; data: MessagesPage }; + + expect(result.success).toBe(true); + expect(result.data.messages.map((message) => message.messageId)).toEqual([ + 'live-1', + 'durable-1', + ]); + expect(result.data.feedRevision).toBe('rev-worker'); + expect(mockTeamDataWorkerClient.getMessagesPage).toHaveBeenCalledWith('my-team', { + cursor: undefined, + limit: 20, + liveMessages: [liveMessage], + }); + expect(service.getMessagesPage).not.toHaveBeenCalled(); + }); + it('scans rate-limit notifications from message-page results without hydrating TEAM_GET_DATA feed', async () => { mockTeamDataWorkerClient.isAvailable.mockReturnValue(true); mockTeamDataWorkerClient.getMessagesPage.mockResolvedValueOnce({ @@ -2457,6 +2504,64 @@ describe('ipc teams handlers', () => { expect(result.data.messages).toHaveLength(50); }); + it('rebuilds capped TEAM_GET_DATA live overlay through worker when available', async () => { + mockTeamDataWorkerClient.isAvailable.mockReturnValue(true); + const liveMessage: InboxMessage = { + from: 'team-lead', + text: 'Live thought', + timestamp: '2026-02-23T11:00:00.000Z', + read: true, + source: 'lead_process' as const, + messageId: 'live-1', + }; + mockTeamDataWorkerClient.getTeamData.mockResolvedValueOnce({ + teamName: 'my-team', + config: { name: 'My Team' }, + tasks: [], + members: [], + messages: Array.from({ length: 50 }, (_, index) => ({ + from: 'alice', + text: `filler-${index}`, + timestamp: `2026-02-23T10:${String(index).padStart(2, '0')}:00.000Z`, + read: true, + source: 'inbox' as const, + messageId: `durable-${index}`, + })), + kanbanState: { teamName: 'my-team', reviewers: [], tasks: {} }, + processes: [], + }); + mockTeamDataWorkerClient.getMessagesPage.mockResolvedValueOnce({ + messages: [ + { + from: 'alice', + text: 'filler-0', + timestamp: '2026-02-23T10:00:00.000Z', + read: true, + source: 'inbox' as const, + messageId: 'durable-0', + }, + ], + nextCursor: null, + hasMore: false, + feedRevision: 'rev-worker', + }); + provisioningService.getLiveLeadProcessMessages.mockReturnValueOnce([liveMessage]); + + const getDataHandler = handlers.get(TEAM_GET_DATA)!; + const result = (await getDataHandler({} as never, 'my-team')) as { + success: boolean; + data: { messages?: InboxMessage[] }; + }; + + expect(result.success).toBe(true); + expect(mockTeamDataWorkerClient.getMessagesPage).toHaveBeenCalledWith('my-team', { + limit: 50, + liveMessages: [liveMessage], + }); + expect(service.getMessagesPage).not.toHaveBeenCalled(); + expect(result.data.messages).toHaveLength(50); + }); + it('overlays live lead_process messages onto the newest messages page', async () => { service.getMessagesPage.mockImplementationOnce(async (...args: unknown[]) => { const { liveMessages = [] } = (args[1] ?? {}) as { liveMessages?: InboxMessage[] }; diff --git a/test/main/services/team/TeamDataWorkerClient.test.ts b/test/main/services/team/TeamDataWorkerClient.test.ts index d3ef99ae..c2edbbd4 100644 --- a/test/main/services/team/TeamDataWorkerClient.test.ts +++ b/test/main/services/team/TeamDataWorkerClient.test.ts @@ -229,6 +229,71 @@ describe('TeamDataWorkerClient', () => { client.dispose(); }); + it('does not deduplicate getMessagesPage calls with different live overlays', async () => { + const { TeamDataWorkerClient } = await import( + '../../../../src/main/services/team/TeamDataWorkerClient' + ); + const client = new TeamDataWorkerClient(); + + await Promise.all([ + client.getMessagesPage('my-team', { + cursor: null, + limit: 50, + liveMessages: [ + { + from: 'team-lead', + text: 'first', + timestamp: '2026-02-23T10:00:00.000Z', + read: true, + source: 'lead_process', + messageId: 'live-1', + }, + ], + }), + client.getMessagesPage('my-team', { + cursor: null, + limit: 50, + liveMessages: [ + { + from: 'team-lead', + text: 'second', + timestamp: '2026-02-23T10:00:01.000Z', + read: true, + source: 'lead_process', + messageId: 'live-2', + }, + ], + }), + ]); + + expect(hoisted.workers).toHaveLength(1); + expect(hoisted.workers[0].messages).toHaveLength(2); + expect(hoisted.workers[0].messages[0]).toMatchObject({ + op: 'getMessagesPage', + payload: { + teamName: 'my-team', + options: { + cursor: null, + limit: 50, + liveMessages: [expect.objectContaining({ messageId: 'live-1' })], + }, + }, + }); + expect(hoisted.workers[0].messages[1]).toMatchObject({ + op: 'getMessagesPage', + payload: { + teamName: 'my-team', + options: { + cursor: null, + limit: 50, + liveMessages: [expect.objectContaining({ messageId: 'live-2' })], + }, + }, + }); + + client.dispose(); + }); + it('sends best-effort message feed invalidation to the worker', async () => { const { TeamDataWorkerClient } = await import( '../../../../src/main/services/team/TeamDataWorkerClient'