agent-ecosystem/src/main/services/team/TeamInboxWriter.ts

448 lines
16 KiB
TypeScript

import { getTeamsBasePath } from '@main/utils/pathDecoder';
import { randomUUID } from 'crypto';
import * as fs from 'fs';
import * as path from 'path';
import { atomicWriteAsync } from './atomicWrite';
import { withFileLock } from './fileLock';
import { withInboxLock } from './inboxLock';
import type { InboxMessage, SendMessageRequest, SendMessageResult, TaskRef } from '@shared/types';
export interface MergeRuntimeDeliveryTaskRefsRequest {
inboxName: string;
messageId: string;
relayOfMessageId: string;
from: string;
taskRefs: TaskRef[];
}
export interface MergeRuntimeDeliveryTaskRefsResult {
found: boolean;
updated: boolean;
message?: InboxMessage & { messageId: string };
}
export interface CorrelateRuntimeDeliveryReplyRequest {
inboxName: string;
messageId: string;
relayOfMessageId: string;
from: string;
taskRefs?: TaskRef[];
}
export interface CorrelateRuntimeDeliveryReplyResult {
found: boolean;
updated: boolean;
message?: InboxMessage & { messageId: string };
}
export class TeamInboxWriter {
async sendMessage(teamName: string, request: SendMessageRequest): Promise<SendMessageResult> {
const inboxPath = path.join(getTeamsBasePath(), teamName, 'inboxes', `${request.member}.json`);
const messageId = request.messageId?.trim() || randomUUID();
const attachmentMeta = request.attachments?.map((a) => ({
id: a.id,
filename: a.filename,
mimeType: a.mimeType,
size: a.size,
}));
const payload: InboxMessage = {
from: request.from ?? 'user',
to: request.to ?? request.member,
text: request.text,
timestamp: request.timestamp ?? new Date().toISOString(),
read: false,
taskRefs: request.taskRefs?.length ? request.taskRefs : undefined,
actionMode: request.actionMode,
commentId: typeof request.commentId === 'string' ? request.commentId : undefined,
summary: request.summary,
messageId,
...(request.relayOfMessageId && { relayOfMessageId: request.relayOfMessageId }),
attachments: attachmentMeta?.length ? attachmentMeta : undefined,
...(request.source && { source: request.source }),
...(request.leadSessionId && { leadSessionId: request.leadSessionId }),
...(request.color && { color: request.color }),
...(request.conversationId && { conversationId: request.conversationId }),
...(request.replyToConversationId && {
replyToConversationId: request.replyToConversationId,
}),
...(request.toolSummary && { toolSummary: request.toolSummary }),
...(request.toolCalls && { toolCalls: request.toolCalls }),
...(request.messageKind && { messageKind: request.messageKind }),
...(request.workSyncIntent && { workSyncIntent: request.workSyncIntent }),
...(request.workSyncIntentKey && { workSyncIntentKey: request.workSyncIntentKey }),
...(request.workSyncReviewRequestEventIds?.length
? { workSyncReviewRequestEventIds: request.workSyncReviewRequestEventIds }
: {}),
...(request.workSyncPayloadHash ? { workSyncPayloadHash: request.workSyncPayloadHash } : {}),
...(request.slashCommand && { slashCommand: request.slashCommand }),
...(request.commandOutput && { commandOutput: request.commandOutput }),
};
let resultMessageId = messageId;
let resultDeduplicated = false;
await withFileLock(inboxPath, async () => {
await withInboxLock(inboxPath, async () => {
for (let attempt = 0; attempt < 3; attempt++) {
const list = await this.readInbox(inboxPath);
const duplicateIndex = this.findRuntimeDeliveryDuplicateIndex(list, payload);
if (duplicateIndex >= 0) {
const duplicate = list[duplicateIndex];
const merged = this.mergeTaskRefs(duplicate.taskRefs, payload.taskRefs);
resultMessageId = duplicate.messageId ?? messageId;
resultDeduplicated = true;
if (merged.changed) {
list[duplicateIndex] = {
...duplicate,
taskRefs: merged.taskRefs,
};
await atomicWriteAsync(inboxPath, JSON.stringify(list, null, 2));
const written = await this.readInbox(inboxPath);
const writtenDuplicateIndex = this.findRuntimeDeliveryDuplicateIndex(
written,
payload
);
const writtenDuplicate =
writtenDuplicateIndex >= 0 ? written[writtenDuplicateIndex] : null;
if (
writtenDuplicate &&
this.taskRefsIncludeAll(writtenDuplicate.taskRefs, payload.taskRefs ?? [])
) {
return;
}
await new Promise((resolve) => setTimeout(resolve, 10 * 2 ** attempt));
continue;
}
return;
}
list.push(payload);
await atomicWriteAsync(inboxPath, JSON.stringify(list, null, 2));
const written = await this.readInbox(inboxPath);
if (written.some((msg) => msg.messageId === messageId)) {
return;
}
await new Promise((resolve) => setTimeout(resolve, 10 * 2 ** attempt));
}
throw new Error('Failed to verify inbox write');
});
});
return {
deliveredToInbox: true,
messageId: resultMessageId,
...(resultDeduplicated ? { deduplicated: true } : {}),
};
}
async mergeRuntimeDeliveryTaskRefs(
teamName: string,
request: MergeRuntimeDeliveryTaskRefsRequest
): Promise<MergeRuntimeDeliveryTaskRefsResult> {
const inboxName = request.inboxName.trim();
const messageId = request.messageId.trim();
const relayOfMessageId = request.relayOfMessageId.trim();
const taskRefs = this.normalizeTaskRefs(request.taskRefs);
if (!inboxName || !messageId || !relayOfMessageId || taskRefs.length === 0) {
return { found: false, updated: false };
}
const inboxPath = path.join(getTeamsBasePath(), teamName, 'inboxes', `${inboxName}.json`);
const expectedFrom = this.normalizeComparableParticipant(request.from);
if (!expectedFrom) {
return { found: false, updated: false };
}
let result: MergeRuntimeDeliveryTaskRefsResult = { found: false, updated: false };
await withFileLock(inboxPath, async () => {
await withInboxLock(inboxPath, async () => {
for (let attempt = 0; attempt < 3; attempt++) {
const list = await this.readInbox(inboxPath);
const index = list.findIndex((message) => {
const rowMessageId =
typeof message.messageId === 'string' ? message.messageId.trim() : '';
const rowRelayOf =
typeof message.relayOfMessageId === 'string' ? message.relayOfMessageId.trim() : '';
const rowSource = message.source;
return (
rowMessageId === messageId &&
rowRelayOf === relayOfMessageId &&
this.normalizeComparableParticipant(message.from) === expectedFrom &&
(rowSource === undefined || rowSource === 'runtime_delivery')
);
});
if (index < 0) {
result = { found: false, updated: false };
return;
}
const existing = list[index];
const merged = this.mergeTaskRefs(existing.taskRefs, taskRefs);
if (!merged.changed) {
result = {
found: true,
updated: false,
message: { ...existing, messageId },
};
return;
}
list[index] = { ...existing, taskRefs: merged.taskRefs };
await atomicWriteAsync(inboxPath, JSON.stringify(list, null, 2));
const written = await this.readInbox(inboxPath);
const verified = written.find((message) => {
const rowMessageId =
typeof message.messageId === 'string' ? message.messageId.trim() : '';
const rowRelayOf =
typeof message.relayOfMessageId === 'string' ? message.relayOfMessageId.trim() : '';
const rowSource = message.source;
return (
rowMessageId === messageId &&
rowRelayOf === relayOfMessageId &&
this.normalizeComparableParticipant(message.from) === expectedFrom &&
(rowSource === undefined || rowSource === 'runtime_delivery') &&
this.taskRefsIncludeAll(message.taskRefs, taskRefs)
);
});
if (verified) {
result = {
found: true,
updated: true,
message: { ...verified, messageId },
};
return;
}
await new Promise((resolve) => setTimeout(resolve, 10 * 2 ** attempt));
}
throw new Error('Failed to verify inbox taskRefs merge');
});
});
return result;
}
async correlateRuntimeDeliveryReply(
teamName: string,
request: CorrelateRuntimeDeliveryReplyRequest
): Promise<CorrelateRuntimeDeliveryReplyResult> {
const inboxName = request.inboxName.trim();
const messageId = request.messageId.trim();
const relayOfMessageId = request.relayOfMessageId.trim();
const expectedFrom = this.normalizeComparableParticipant(request.from);
if (!inboxName || !messageId || !relayOfMessageId || !expectedFrom) {
return { found: false, updated: false };
}
const inboxPath = path.join(getTeamsBasePath(), teamName, 'inboxes', `${inboxName}.json`);
const taskRefs = this.normalizeTaskRefs(request.taskRefs);
let result: CorrelateRuntimeDeliveryReplyResult = { found: false, updated: false };
await withFileLock(inboxPath, async () => {
await withInboxLock(inboxPath, async () => {
for (let attempt = 0; attempt < 3; attempt++) {
const list = await this.readInbox(inboxPath);
const index = list.findIndex((message) => {
const rowMessageId =
typeof message.messageId === 'string' ? message.messageId.trim() : '';
const rowSource = message.source;
return (
rowMessageId === messageId &&
this.normalizeComparableParticipant(message.from) === expectedFrom &&
(rowSource === undefined || rowSource === 'runtime_delivery')
);
});
if (index < 0) {
result = { found: false, updated: false };
return;
}
const existing = list[index];
const merged = this.mergeTaskRefs(existing.taskRefs, taskRefs);
const currentRelayOf =
typeof existing.relayOfMessageId === 'string' ? existing.relayOfMessageId.trim() : '';
if (currentRelayOf === relayOfMessageId && !merged.changed) {
result = {
found: true,
updated: false,
message: { ...existing, messageId },
};
return;
}
const nextMessage: InboxMessage = {
...existing,
relayOfMessageId,
...(merged.taskRefs ? { taskRefs: merged.taskRefs } : {}),
};
list[index] = nextMessage;
await atomicWriteAsync(inboxPath, JSON.stringify(list, null, 2));
const written = await this.readInbox(inboxPath);
const verified = written.find((message) => {
const rowMessageId =
typeof message.messageId === 'string' ? message.messageId.trim() : '';
const rowRelayOf =
typeof message.relayOfMessageId === 'string' ? message.relayOfMessageId.trim() : '';
const rowSource = message.source;
return (
rowMessageId === messageId &&
rowRelayOf === relayOfMessageId &&
this.normalizeComparableParticipant(message.from) === expectedFrom &&
(rowSource === undefined || rowSource === 'runtime_delivery') &&
this.taskRefsIncludeAll(message.taskRefs, taskRefs)
);
});
if (verified) {
result = {
found: true,
updated: true,
message: { ...verified, messageId },
};
return;
}
await new Promise((resolve) => setTimeout(resolve, 10 * 2 ** attempt));
}
throw new Error('Failed to verify inbox runtime delivery correlation update');
});
});
return result;
}
private findRuntimeDeliveryDuplicateIndex(
messages: readonly InboxMessage[],
payload: InboxMessage
): number {
if (
payload.source !== 'runtime_delivery' ||
typeof payload.relayOfMessageId !== 'string' ||
payload.relayOfMessageId.trim().length === 0
) {
return -1;
}
const relayOfMessageId = payload.relayOfMessageId.trim();
const from = this.normalizeComparableParticipant(payload.from);
const to = this.normalizeComparableParticipant(payload.to);
const text = this.normalizeComparableText(payload.text);
if (!from || !to || !text) {
return -1;
}
return messages.findIndex(
(candidate) =>
candidate.source === 'runtime_delivery' &&
(candidate.relayOfMessageId ?? '').trim() === relayOfMessageId &&
this.normalizeComparableParticipant(candidate.from) === from &&
this.normalizeComparableParticipant(candidate.to) === to &&
this.normalizeComparableText(candidate.text) === text
);
}
private mergeTaskRefs(
existing: readonly TaskRef[] | undefined,
incoming: readonly TaskRef[] | undefined
): { changed: boolean; taskRefs?: TaskRef[] } {
const normalizedExisting = this.normalizeTaskRefs(existing);
const normalizedIncoming = this.normalizeTaskRefs(incoming);
if (normalizedIncoming.length === 0) {
return {
changed: false,
taskRefs: normalizedExisting.length ? normalizedExisting : undefined,
};
}
const seen = new Set(normalizedExisting.map((taskRef) => this.taskRefKey(taskRef)));
const merged = [...normalizedExisting];
let changed = false;
for (const taskRef of normalizedIncoming) {
const key = this.taskRefKey(taskRef);
if (seen.has(key)) {
continue;
}
seen.add(key);
merged.push(taskRef);
changed = true;
}
return { changed, taskRefs: merged.length ? merged : undefined };
}
private taskRefsIncludeAll(
actual: readonly TaskRef[] | undefined,
expected: readonly TaskRef[]
): boolean {
const actualKeys = new Set(
this.normalizeTaskRefs(actual).map((taskRef) => this.taskRefKey(taskRef))
);
return this.normalizeTaskRefs(expected).every((taskRef) =>
actualKeys.has(this.taskRefKey(taskRef))
);
}
private normalizeTaskRefs(taskRefs: readonly TaskRef[] | undefined): TaskRef[] {
if (!Array.isArray(taskRefs)) {
return [];
}
const normalized: TaskRef[] = [];
for (const rawTaskRef of taskRefs as readonly unknown[]) {
if (!rawTaskRef || typeof rawTaskRef !== 'object') {
continue;
}
const taskRef = rawTaskRef as Record<string, unknown>;
const teamName = typeof taskRef.teamName === 'string' ? taskRef.teamName.trim() : '';
const taskId = typeof taskRef.taskId === 'string' ? taskRef.taskId.trim() : '';
const displayId = typeof taskRef.displayId === 'string' ? taskRef.displayId.trim() : '';
if (teamName && taskId && displayId) {
normalized.push({ teamName, taskId, displayId });
}
}
return normalized;
}
private taskRefKey(taskRef: TaskRef): string {
return `${taskRef.teamName.trim()}\u0000${taskRef.taskId.trim()}\u0000${taskRef.displayId.trim()}`;
}
private normalizeComparableParticipant(value: unknown): string {
return typeof value === 'string' ? value.trim().toLowerCase() : '';
}
private normalizeComparableText(value: unknown): string {
return typeof value === 'string'
? value
.trim()
.replace(/\r\n/g, '\n')
.replace(/[ \t]+/g, ' ')
: '';
}
private async readInbox(inboxPath: string): Promise<InboxMessage[]> {
let raw: string;
try {
raw = await fs.promises.readFile(inboxPath, 'utf8');
} catch (error) {
if ((error as NodeJS.ErrnoException).code === 'ENOENT') {
return [];
}
throw error;
}
const parsed = JSON.parse(raw) as unknown;
if (!Array.isArray(parsed)) {
return [];
}
return parsed.filter((item): item is InboxMessage => {
if (!item || typeof item !== 'object') {
return false;
}
const row = item as Partial<InboxMessage>;
return (
typeof row.from === 'string' &&
typeof row.text === 'string' &&
typeof row.timestamp === 'string' &&
typeof row.read === 'boolean'
);
});
}
}