feat: integrate runtime helpers and enhance lead session handling

- Replaced legacy references with new runtimeHelpers in context, processStore, tasks, and review modules for improved maintainability.
- Introduced leadSessionId handling in review requests and approvals, ensuring consistent tracking of session identifiers across operations.
- Added new utility functions in runtimeHelpers for path management and process checks, streamlining task-related functionalities.
- Updated tests to validate the inclusion of leadSessionId in various workflows, enhancing the robustness of the messaging system.
This commit is contained in:
iliya 2026-03-08 00:56:31 +02:00
parent df457eb9cd
commit 01dd87caaa
15 changed files with 523 additions and 60 deletions

View file

@ -1,4 +1,4 @@
const legacy = require('../legacy/teamctl.cli.js');
const runtimeHelpers = require('./runtimeHelpers.js');
function createControllerContext(options = {}) {
const teamName = String(options.teamName || '').trim();
@ -11,7 +11,7 @@ function createControllerContext(options = {}) {
flags['claude-dir'] = options.claudeDir.trim();
}
const paths = legacy.getPaths(flags, teamName);
const paths = runtimeHelpers.getPaths(flags, teamName);
return {
teamName,
claudeDir: paths.claudeDir,

View file

@ -2,7 +2,7 @@ const fs = require('fs');
const path = require('path');
const crypto = require('crypto');
const legacy = require('../legacy/teamctl.cli.js');
const runtimeHelpers = require('./runtimeHelpers.js');
function nowIso() {
return new Date().toISOString();
@ -39,7 +39,7 @@ function listProcesses(paths) {
const alive =
!entry.stoppedAt &&
Number.isFinite(Number(entry.pid)) &&
legacy.isProcessAlive(Number(entry.pid));
runtimeHelpers.isProcessAlive(Number(entry.pid));
if (!alive && !entry.stoppedAt) {
return {
@ -83,7 +83,7 @@ function registerProcess(paths, flags) {
existingActiveIndex >= 0
? {
...list[existingActiveIndex],
...(legacy.isProcessAlive(pid) ? {} : { stoppedAt: nowIso() }),
...(runtimeHelpers.isProcessAlive(pid) ? {} : { stoppedAt: nowIso() }),
}
: null;
if (existingActiveIndex >= 0 && existingActive && existingActive.stoppedAt) {

View file

@ -1,3 +1,6 @@
const fs = require('fs');
const path = require('path');
const kanban = require('./kanban.js');
const messages = require('./messages.js');
const tasks = require('./tasks.js');
@ -13,6 +16,22 @@ function getReviewer(context, flags) {
: null;
}
function resolveLeadSessionId(context, flags) {
if (typeof flags.leadSessionId === 'string' && flags.leadSessionId.trim()) {
return flags.leadSessionId.trim();
}
try {
const configPath = path.join(context.paths.teamDir, 'config.json');
const parsed = JSON.parse(fs.readFileSync(configPath, 'utf8'));
return typeof parsed.leadSessionId === 'string' && parsed.leadSessionId.trim()
? parsed.leadSessionId.trim()
: undefined;
} catch {
return undefined;
}
}
function requestReview(context, taskId, flags = {}) {
const task = tasks.getTask(context, taskId);
if (task.status !== 'completed') {
@ -22,6 +41,7 @@ function requestReview(context, taskId, flags = {}) {
const from =
typeof flags.from === 'string' && flags.from.trim() ? flags.from.trim() : 'team-lead';
const reviewer = getReviewer(context, flags);
const leadSessionId = resolveLeadSessionId(context, flags);
try {
kanban.setKanbanColumn(context, task.id, 'review');
@ -42,6 +62,7 @@ function requestReview(context, taskId, flags = {}) {
),
summary: `Review request for #${task.displayId || task.id}`,
source: 'system_notification',
...(leadSessionId ? { leadSessionId } : {}),
});
return tasks.getTask(context, task.id);
} catch (error) {
@ -59,6 +80,7 @@ function approveReview(context, taskId, flags = {}) {
const from =
typeof flags.from === 'string' && flags.from.trim() ? flags.from.trim() : 'team-lead';
const note = typeof flags.note === 'string' && flags.note.trim() ? flags.note.trim() : 'Approved';
const leadSessionId = resolveLeadSessionId(context, flags);
kanban.setKanbanColumn(context, task.id, 'approved');
tasks.addTaskComment(context, task.id, {
@ -77,6 +99,7 @@ function approveReview(context, taskId, flags = {}) {
: `Task ${task.displayId || task.id} approved.`,
summary: `Approved ${task.displayId || task.id}`,
source: 'system_notification',
...(leadSessionId ? { leadSessionId } : {}),
});
}
@ -95,6 +118,7 @@ function requestChanges(context, taskId, flags = {}) {
typeof flags.comment === 'string' && flags.comment.trim()
? flags.comment.trim()
: 'Reviewer requested changes.';
const leadSessionId = resolveLeadSessionId(context, flags);
kanban.clearKanban(context, task.id);
tasks.setTaskStatus(context, task.id, 'in_progress', from);
@ -111,6 +135,7 @@ function requestChanges(context, taskId, flags = {}) {
'Please fix and mark it as completed when ready.',
summary: `Fix request for ${task.displayId || task.id}`,
source: 'system_notification',
...(leadSessionId ? { leadSessionId } : {}),
});
return tasks.getTask(context, task.id);

View file

@ -0,0 +1,295 @@
const fs = require('fs');
const path = require('path');
const crypto = require('crypto');
const TASK_ATTACHMENTS_DIR = 'task-attachments';
const MAX_TASK_ATTACHMENT_BYTES = 20 * 1024 * 1024;
function nowIso() {
return new Date().toISOString();
}
function makeId() {
return crypto.randomUUID ? crypto.randomUUID() : `${Date.now()}-${Math.random()}`;
}
function ensureDir(dirPath) {
fs.mkdirSync(dirPath, { recursive: true });
}
function readJson(filePath, fallbackValue) {
try {
return JSON.parse(fs.readFileSync(filePath, 'utf8'));
} catch (error) {
if (error && error.code === 'ENOENT') {
return fallbackValue;
}
throw error;
}
}
function isSafePathSegment(value) {
const normalized = String(value == null ? '' : value);
if (normalized.length === 0 || normalized.trim().length === 0) return false;
if (normalized === '.' || normalized === '..') return false;
if (normalized.includes('/') || normalized.includes('\\')) return false;
if (normalized.includes('..')) return false;
if (normalized.includes('\0')) return false;
return true;
}
function assertSafePathSegment(label, value) {
const normalized = String(value == null ? '' : value);
if (!isSafePathSegment(normalized)) {
throw new Error(`Invalid ${String(label)}`);
}
return normalized;
}
function getHomeDir() {
if (process.env.HOME) return process.env.HOME;
if (process.env.USERPROFILE) return process.env.USERPROFILE;
if (process.env.HOMEDRIVE && process.env.HOMEPATH) {
return process.env.HOMEDRIVE + process.env.HOMEPATH;
}
try {
return require('os').homedir();
} catch {
return '';
}
}
function getClaudeDir(flags) {
const explicit =
(typeof flags['claude-dir'] === 'string' && flags['claude-dir']) ||
(typeof flags.claudeDir === 'string' && flags.claudeDir) ||
(typeof flags.claude_path === 'string' && flags.claude_path) ||
'';
if (explicit) {
return path.resolve(explicit);
}
const home = getHomeDir();
if (!home) {
throw new Error('HOME/USERPROFILE is not set');
}
return path.join(home, '.claude');
}
function getPaths(flags, teamName) {
const claudeDir = getClaudeDir(flags);
const safeTeam = assertSafePathSegment('team', teamName);
const teamDir = path.join(claudeDir, 'teams', safeTeam);
const tasksDir = path.join(claudeDir, 'tasks', safeTeam);
const kanbanPath = path.join(teamDir, 'kanban-state.json');
const processesPath = path.join(teamDir, 'processes.json');
return { claudeDir, teamDir, tasksDir, kanbanPath, processesPath };
}
function inferLeadName(paths) {
const config = readJson(path.join(paths.teamDir, 'config.json'), null);
if (!config || !Array.isArray(config.members)) {
return 'team-lead';
}
const lead = config.members.find(
(member) => member && member.role && String(member.role).toLowerCase().includes('lead')
);
if (lead) {
return String(lead.name);
}
return config.members[0] ? String(config.members[0].name) : 'team-lead';
}
function isProcessAlive(pid) {
try {
process.kill(pid, 0);
return true;
} catch (error) {
if (error && error.code === 'EPERM') {
return true;
}
return false;
}
}
function sanitizeFilename(original) {
const raw = String(original == null ? '' : original).trim();
const parts = raw.split(/[\\/]/);
const base = (parts.length ? parts[parts.length - 1] : raw).trim();
const cleaned = base
.replace(/\0/g, '')
.replace(/[\r\n\t]/g, ' ')
.replace(/[\\/]/g, '_')
.trim();
if (!cleaned) return 'attachment';
return cleaned.length > 180 ? cleaned.slice(0, 180) : cleaned;
}
function readFileHeader(filePath, maxBytes) {
const fd = fs.openSync(filePath, 'r');
try {
const buffer = Buffer.alloc(maxBytes);
const bytes = fs.readSync(fd, buffer, 0, maxBytes, 0);
return buffer.slice(0, bytes);
} finally {
try {
fs.closeSync(fd);
} catch {
// ignore
}
}
}
function startsWithBytes(buffer, bytes) {
if (!buffer || buffer.length < bytes.length) return false;
for (let i = 0; i < bytes.length; i += 1) {
if (buffer[i] !== bytes[i]) return false;
}
return true;
}
function detectMimeTypeFromPathAndHeader(filePath, filename) {
const name = String(filename || '').toLowerCase();
const ext = path.extname(name);
if (ext === '.png') return 'image/png';
if (ext === '.jpg' || ext === '.jpeg') return 'image/jpeg';
if (ext === '.gif') return 'image/gif';
if (ext === '.webp') return 'image/webp';
if (ext === '.pdf') return 'application/pdf';
if (ext === '.txt') return 'text/plain';
if (ext === '.md') return 'text/markdown';
if (ext === '.json') return 'application/json';
if (ext === '.zip') return 'application/zip';
let header;
try {
header = readFileHeader(filePath, 16);
} catch {
return 'application/octet-stream';
}
if (startsWithBytes(header, [0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a])) return 'image/png';
if (startsWithBytes(header, [0xff, 0xd8, 0xff])) return 'image/jpeg';
if (header.length >= 6) {
const signature6 = header.slice(0, 6).toString('ascii');
if (signature6 === 'GIF87a' || signature6 === 'GIF89a') return 'image/gif';
}
if (header.length >= 12) {
const riff = header.slice(0, 4).toString('ascii');
const webp = header.slice(8, 12).toString('ascii');
if (riff === 'RIFF' && webp === 'WEBP') return 'image/webp';
}
if (header.length >= 5 && header.slice(0, 5).toString('ascii') === '%PDF-') {
return 'application/pdf';
}
if (startsWithBytes(header, [0x50, 0x4b, 0x03, 0x04])) return 'application/zip';
return 'application/octet-stream';
}
function getTaskAttachmentsDir(paths, taskId) {
const safeTaskId = assertSafePathSegment('taskId', taskId);
return path.join(paths.teamDir, TASK_ATTACHMENTS_DIR, safeTaskId);
}
function getStoredAttachmentPath(paths, taskId, attachmentId, filename) {
const safeFilename = sanitizeFilename(filename);
return path.join(
getTaskAttachmentsDir(paths, taskId),
`${String(attachmentId)}--${safeFilename}`
);
}
function ensureSourceFileReadable(srcPath) {
const stats = fs.statSync(srcPath);
if (!stats.isFile()) {
throw new Error(`Not a file: ${String(srcPath)}`);
}
if (stats.size > MAX_TASK_ATTACHMENT_BYTES) {
throw new Error(
`Attachment too large: ${(stats.size / (1024 * 1024)).toFixed(1)} MB (max ${String(
MAX_TASK_ATTACHMENT_BYTES / (1024 * 1024)
)} MB)`
);
}
return stats;
}
function copyOrLinkFile(srcPath, destPath, mode, allowFallback) {
const normalizedMode = String(mode || 'copy').toLowerCase();
if (normalizedMode === 'link') {
try {
fs.linkSync(srcPath, destPath);
return { mode: 'link', fallbackUsed: false };
} catch (error) {
if (!allowFallback) throw error;
try {
fs.copyFileSync(srcPath, destPath);
return { mode: 'copy', fallbackUsed: true };
} catch (copyError) {
throw copyError || error;
}
}
}
fs.copyFileSync(srcPath, destPath);
return { mode: 'copy', fallbackUsed: false };
}
function saveTaskAttachmentFile(paths, taskId, flags) {
const rawFile =
(typeof flags.file === 'string' && flags.file.trim()) ||
(typeof flags.path === 'string' && flags.path.trim()) ||
'';
if (!rawFile) {
throw new Error('Missing --file <path>');
}
const srcPath = path.resolve(rawFile);
ensureSourceFileReadable(srcPath);
const filename =
(typeof flags.filename === 'string' && flags.filename.trim()) || path.basename(srcPath);
const mimeType =
(typeof flags['mime-type'] === 'string' && flags['mime-type'].trim()) ||
(typeof flags.mimeType === 'string' && flags.mimeType.trim()) ||
detectMimeTypeFromPathAndHeader(srcPath, filename);
const attachmentId = makeId();
const dir = getTaskAttachmentsDir(paths, taskId);
ensureDir(dir);
const destPath = getStoredAttachmentPath(paths, taskId, attachmentId, filename);
const allowFallback = !(flags['no-fallback'] === true);
if (fs.existsSync(destPath)) {
throw new Error('Attachment destination already exists');
}
const result = copyOrLinkFile(srcPath, destPath, flags.mode, allowFallback);
const stats = fs.statSync(destPath);
if (!stats.isFile() || stats.size < 0) {
throw new Error('Attachment write verification failed');
}
const meta = {
id: attachmentId,
filename,
mimeType,
size: stats.size,
addedAt: nowIso(),
};
return {
meta,
storedPath: destPath,
storageMode: result.mode,
fallbackUsed: result.fallbackUsed,
};
}
module.exports = {
getPaths,
inferLeadName,
isProcessAlive,
saveTaskAttachmentFile,
};

View file

@ -1,5 +1,5 @@
const legacy = require('../legacy/teamctl.cli.js');
const taskStore = require('./taskStore.js');
const runtimeHelpers = require('./runtimeHelpers.js');
function createTask(context, input) {
return taskStore.createTask(context.paths, input);
@ -56,7 +56,7 @@ function addTaskComment(context, taskId, flags) {
author:
typeof flags.from === 'string' && flags.from.trim()
? flags.from.trim()
: legacy.inferLeadName(context.paths),
: runtimeHelpers.inferLeadName(context.paths),
...(flags.id ? { id: flags.id } : {}),
...(flags.createdAt ? { createdAt: flags.createdAt } : {}),
...(flags.type ? { type: flags.type } : {}),
@ -75,7 +75,7 @@ function addTaskComment(context, taskId, flags) {
function attachTaskFile(context, taskId, flags) {
const canonicalTaskId = resolveTaskId(context, taskId);
const saved = legacy.saveTaskAttachmentFile(context.paths, canonicalTaskId, flags);
const saved = runtimeHelpers.saveTaskAttachmentFile(context.paths, canonicalTaskId, flags);
const task = taskStore.addTaskAttachmentMeta(context.paths, canonicalTaskId, saved.meta);
return {
...saved.meta,
@ -85,7 +85,7 @@ function attachTaskFile(context, taskId, flags) {
function attachCommentFile(context, taskId, commentId, flags) {
const canonicalTaskId = resolveTaskId(context, taskId);
const saved = legacy.saveTaskAttachmentFile(context.paths, canonicalTaskId, flags);
const saved = runtimeHelpers.saveTaskAttachmentFile(context.paths, canonicalTaskId, flags);
const task = taskStore.addCommentAttachmentMeta(context.paths, canonicalTaskId, commentId, saved.meta);
return {
...saved.meta,

View file

@ -14,6 +14,7 @@ describe('agent-teams-controller API', () => {
JSON.stringify(
{
name: 'my-team',
leadSessionId: 'lead-session-1',
members: [
{ name: 'alice', role: 'team-lead' },
{ name: 'bob', role: 'developer' },
@ -68,6 +69,11 @@ describe('agent-teams-controller API', () => {
});
expect(sent.leadSessionId).toBe('session-1');
const ownerInboxPath = path.join(claudeDir, 'teams', 'my-team', 'inboxes', 'bob.json');
const ownerInbox = JSON.parse(fs.readFileSync(ownerInboxPath, 'utf8'));
expect(ownerInbox.at(-1).summary).toContain('Approved');
expect(ownerInbox.at(-1).leadSessionId).toBe('lead-session-1');
const proc = controller.processes.registerProcess({
pid: process.pid,
label: 'dev-server',
@ -250,6 +256,7 @@ describe('agent-teams-controller API', () => {
expect(inbox[0].text).toContain('<info_for_agent>');
expect(inbox[0].text).toContain('review_approve');
expect(inbox[0].text).not.toContain('<agent-block>');
expect(inbox[0].leadSessionId).toBe('lead-session-1');
});
it('persists full inbox metadata through controller messages.sendMessage', () => {
@ -297,6 +304,7 @@ describe('agent-teams-controller API', () => {
const rows = JSON.parse(fs.readFileSync(inboxPath, 'utf8'));
expect(rows.at(-1).source).toBe('system_notification');
expect(rows.at(-1).summary).toContain('Fix request');
expect(rows.at(-1).leadSessionId).toBe('lead-session-1');
});
it('marks stale processes stopped during listing and supports unregister', () => {

View file

@ -18,13 +18,15 @@ export function registerReviewTools(server: Pick<FastMCP, 'addTool'>) {
taskId: z.string().min(1),
from: z.string().optional(),
reviewer: z.string().optional(),
leadSessionId: z.string().optional(),
}),
execute: async ({ teamName, claudeDir, taskId, from, reviewer }) =>
execute: async ({ teamName, claudeDir, taskId, from, reviewer, leadSessionId }) =>
await Promise.resolve(
jsonTextContent(
getController(teamName, claudeDir).review.requestReview(taskId, {
...(from ? { from } : {}),
...(reviewer ? { reviewer } : {}),
...(from ? { from } : {}),
...(reviewer ? { reviewer } : {}),
...(leadSessionId ? { leadSessionId } : {}),
})
)
),
@ -39,14 +41,16 @@ export function registerReviewTools(server: Pick<FastMCP, 'addTool'>) {
from: z.string().optional(),
note: z.string().optional(),
notifyOwner: z.boolean().optional(),
leadSessionId: z.string().optional(),
}),
execute: async ({ teamName, claudeDir, taskId, from, note, notifyOwner }) =>
execute: async ({ teamName, claudeDir, taskId, from, note, notifyOwner, leadSessionId }) =>
await Promise.resolve(
jsonTextContent(
getController(teamName, claudeDir).review.approveReview(taskId, {
...(from ? { from } : {}),
...(note ? { note } : {}),
...(notifyOwner !== false ? { 'notify-owner': true } : {}),
...(from ? { from } : {}),
...(note ? { note } : {}),
...(notifyOwner !== false ? { 'notify-owner': true } : {}),
...(leadSessionId ? { leadSessionId } : {}),
})
)
),
@ -60,13 +64,15 @@ export function registerReviewTools(server: Pick<FastMCP, 'addTool'>) {
taskId: z.string().min(1),
from: z.string().optional(),
comment: z.string().optional(),
leadSessionId: z.string().optional(),
}),
execute: async ({ teamName, claudeDir, taskId, from, comment }) =>
execute: async ({ teamName, claudeDir, taskId, from, comment, leadSessionId }) =>
await Promise.resolve(
jsonTextContent(
getController(teamName, claudeDir).review.requestChanges(taskId, {
...(from ? { from } : {}),
...(comment ? { comment } : {}),
...(from ? { from } : {}),
...(comment ? { comment } : {}),
...(leadSessionId ? { leadSessionId } : {}),
})
)
),

View file

@ -233,10 +233,14 @@ describe('agent-teams-mcp tools', () => {
taskId: createdTask.id,
from: 'lead',
reviewer: 'alice',
leadSessionId: 'session-review-1',
})
);
expect(reviewRequested.reviewState).toBe('review');
const reviewerInboxPath = path.join(claudeDir, 'teams', teamName, 'inboxes', 'alice.json');
const reviewerInbox = JSON.parse(fs.readFileSync(reviewerInboxPath, 'utf8'));
expect(reviewerInbox.at(-1).leadSessionId).toBe('session-review-1');
const approved = parseJsonToolResult(
await getTool('review_approve').execute({
@ -246,9 +250,13 @@ describe('agent-teams-mcp tools', () => {
from: 'lead',
note: 'Looks good',
notifyOwner: true,
leadSessionId: 'session-review-1',
})
);
expect(approved.reviewState).toBe('approved');
const ownerInboxPath = path.join(claudeDir, 'teams', teamName, 'inboxes', 'alice.json');
const ownerInbox = JSON.parse(fs.readFileSync(ownerInboxPath, 'utf8'));
expect(ownerInbox.at(-1).leadSessionId).toBe('session-review-1');
const kanbanState = parseJsonToolResult(
await getTool('kanban_get').execute({
@ -294,6 +302,7 @@ describe('agent-teams-mcp tools', () => {
taskId: createdTask.id,
from: 'lead',
reviewer: 'alice',
leadSessionId: 'session-review-2',
});
const changesRequested = parseJsonToolResult(
@ -303,11 +312,15 @@ describe('agent-teams-mcp tools', () => {
taskId: createdTask.id,
from: 'alice',
comment: 'Please revise this section.',
leadSessionId: 'session-review-2',
})
);
expect(changesRequested.status).toBe('in_progress');
expect(changesRequested.reviewState).toBe('none');
const ownerInboxPath = path.join(claudeDir, 'teams', teamName, 'inboxes', 'bob.json');
const ownerInbox = JSON.parse(fs.readFileSync(ownerInboxPath, 'utf8'));
expect(ownerInbox.at(-1).leadSessionId).toBe('session-review-2');
const kanbanCleared = parseJsonToolResult(
await getTool('kanban_clear').execute({

View file

@ -1096,6 +1096,20 @@ export class TeamDataService {
}
}
private async resolveLeadRuntimeContext(
teamName: string
): Promise<{ leadName: string; leadSessionId?: string }> {
try {
const config = await this.configReader.getConfig(teamName);
return {
leadName: this.resolveLeadNameFromConfig(config),
leadSessionId: config?.leadSessionId,
};
} catch {
return { leadName: 'team-lead' };
}
}
private isLeadOwner(owner: string, leadName: string): boolean {
const normalized = owner.trim().toLowerCase();
if (!normalized) return false;
@ -1161,8 +1175,11 @@ export class TeamDataService {
}
async requestReview(teamName: string, taskId: string): Promise<void> {
const leadName = await this.resolveLeadName(teamName);
this.getController(teamName).review.requestReview(taskId, { from: leadName });
const { leadName, leadSessionId } = await this.resolveLeadRuntimeContext(teamName);
this.getController(teamName).review.requestReview(taskId, {
from: leadName,
...(leadSessionId ? { leadSessionId } : {}),
});
}
async createTeamConfig(request: TeamCreateConfigRequest): Promise<void> {
@ -1391,23 +1408,28 @@ export class TeamDataService {
if (patch.op === 'set_column') {
if (patch.column === 'review') {
const leadName = await this.resolveLeadName(teamName);
controller.review.requestReview(taskId, { from: leadName });
const { leadName, leadSessionId } = await this.resolveLeadRuntimeContext(teamName);
controller.review.requestReview(taskId, {
from: leadName,
...(leadSessionId ? { leadSessionId } : {}),
});
} else {
const leadName = await this.resolveLeadName(teamName);
const { leadName, leadSessionId } = await this.resolveLeadRuntimeContext(teamName);
controller.review.approveReview(taskId, {
from: leadName,
note: 'Approved from kanban',
'notify-owner': true,
...(leadSessionId ? { leadSessionId } : {}),
});
}
return;
}
const leadName = await this.resolveLeadName(teamName);
const { leadName, leadSessionId } = await this.resolveLeadRuntimeContext(teamName);
controller.review.requestChanges(taskId, {
from: leadName,
comment: patch.comment?.trim() || 'Reviewer requested changes.',
...(leadSessionId ? { leadSessionId } : {}),
});
}

View file

@ -86,7 +86,9 @@ export class TeamInboxReader {
if (
typeof row.from !== 'string' ||
typeof row.text !== 'string' ||
typeof row.timestamp !== 'string'
typeof row.timestamp !== 'string' ||
typeof row.messageId !== 'string' ||
row.messageId.trim().length === 0
) {
continue;
}
@ -98,7 +100,7 @@ export class TeamInboxReader {
read: typeof row.read === 'boolean' ? row.read : false,
summary: typeof row.summary === 'string' ? row.summary : undefined,
color: typeof row.color === 'string' ? row.color : undefined,
messageId: typeof row.messageId === 'string' ? row.messageId : undefined,
messageId: row.messageId,
source: typeof row.source === 'string' ? (row.source as InboxMessage['source']) : undefined,
leadSessionId: typeof row.leadSessionId === 'string' ? row.leadSessionId : undefined,
attachments: Array.isArray(row.attachments) ? row.attachments : undefined,

View file

@ -1095,7 +1095,6 @@ export class TeamProvisioningService {
private readonly teamOpLocks = new Map<string, Promise<void>>();
private readonly leadInboxRelayInFlight = new Map<string, Promise<number>>();
private readonly relayedLeadInboxMessageIds = new Map<string, Set<string>>();
private readonly relayedLeadInboxFallbackKeys = new Map<string, Set<string>>();
private readonly liveLeadProcessMessages = new Map<string, InboxMessage[]>();
private teamChangeEmitter: ((event: TeamChangeEvent) => void) | null = null;
private helpOutputCache: string | null = null;
@ -2493,6 +2492,12 @@ export class TeamProvisioningService {
*
* Returns the number of messages relayed.
*/
private hasStableMessageId(
message: InboxMessage
): message is InboxMessage & { messageId: string } {
return typeof message.messageId === 'string' && message.messageId.trim().length > 0;
}
async relayLeadInboxMessages(teamName: string): Promise<number> {
const existing = this.leadInboxRelayInFlight.get(teamName);
if (existing) {
@ -2507,7 +2512,6 @@ export class TeamProvisioningService {
if (!run.provisioningComplete) return 0;
const relayedIds = this.relayedLeadInboxMessageIds.get(teamName) ?? new Set<string>();
const relayedFallback = this.relayedLeadInboxFallbackKeys.get(teamName) ?? new Set<string>();
let config: Awaited<ReturnType<TeamConfigReader['getConfig']>> | null = null;
try {
@ -2528,13 +2532,11 @@ export class TeamProvisioningService {
}
const unread = leadInboxMessages
.filter((m) => {
.filter((m): m is InboxMessage & { messageId: string } => {
if (m.read) return false;
if (typeof m.text !== 'string' || m.text.trim().length === 0) return false;
if (typeof m.messageId === 'string' && m.messageId.trim().length > 0) {
return !relayedIds.has(m.messageId);
}
return !relayedFallback.has(`${m.timestamp}\0${m.from}\0${m.text}`);
if (!this.hasStableMessageId(m)) return false;
return !relayedIds.has(m.messageId);
})
.sort((a, b) => Date.parse(a.timestamp) - Date.parse(b.timestamp));
@ -2630,14 +2632,9 @@ export class TeamProvisioningService {
}
for (const m of batch) {
if (typeof m.messageId === 'string' && m.messageId.trim().length > 0) {
relayedIds.add(m.messageId);
} else {
relayedFallback.add(`${m.timestamp}\0${m.from}\0${m.text}`);
}
relayedIds.add(m.messageId);
}
this.relayedLeadInboxMessageIds.set(teamName, this.trimRelayedSet(relayedIds));
this.relayedLeadInboxFallbackKeys.set(teamName, this.trimRelayedSet(relayedFallback));
try {
await this.markInboxMessagesRead(teamName, leadName, batch);
@ -2786,7 +2783,7 @@ export class TeamProvisioningService {
private async markInboxMessagesRead(
teamName: string,
member: string,
messages: { messageId?: string; timestamp: string; from: string; text: string }[]
messages: { messageId: string }[]
): Promise<void> {
const inboxPath = path.join(getTeamsBasePath(), teamName, 'inboxes', `${member}.json`);
@ -2807,27 +2804,14 @@ export class TeamProvisioningService {
}
if (!Array.isArray(parsed)) return;
const ids = new Set(messages.map((m) => m.messageId).filter((id): id is string => !!id));
const fallbackKeys = new Set(
messages.filter((m) => !m.messageId).map((m) => `${m.timestamp}\0${m.from}\0${m.text}`)
);
const ids = new Set(messages.map((m) => m.messageId).filter((id) => id.trim().length > 0));
let changed = false;
for (const item of parsed) {
if (!item || typeof item !== 'object') continue;
const row = item as Record<string, unknown>;
const msgId = typeof row.messageId === 'string' ? row.messageId : null;
const timestamp = typeof row.timestamp === 'string' ? row.timestamp : null;
const from = typeof row.from === 'string' ? row.from : null;
const text = typeof row.text === 'string' ? row.text : null;
const matchesId = msgId ? ids.has(msgId) : false;
const matchesFallback =
!msgId && timestamp && from && text
? fallbackKeys.has(`${timestamp}\0${from}\0${text}`)
: false;
if (!matchesId && !matchesFallback) continue;
if (!msgId || !ids.has(msgId)) continue;
if (row.read !== true) {
row.read = true;
@ -3873,7 +3857,6 @@ export class TeamProvisioningService {
this.activeByTeam.delete(run.teamName);
this.leadInboxRelayInFlight.delete(run.teamName);
this.relayedLeadInboxMessageIds.delete(run.teamName);
this.relayedLeadInboxFallbackKeys.delete(run.teamName);
this.liveLeadProcessMessages.delete(run.teamName);
// Dismiss any pending tool approvals for this run
if (run.pendingApprovals.size > 0) {

View file

@ -59,7 +59,9 @@ export class TeamSentMessagesStore {
if (
typeof row.from !== 'string' ||
typeof row.text !== 'string' ||
typeof row.timestamp !== 'string'
typeof row.timestamp !== 'string' ||
typeof row.messageId !== 'string' ||
row.messageId.trim().length === 0
) {
continue;
}
@ -71,7 +73,7 @@ export class TeamSentMessagesStore {
timestamp: row.timestamp,
read: typeof row.read === 'boolean' ? row.read : true,
summary: typeof row.summary === 'string' ? row.summary : undefined,
messageId: typeof row.messageId === 'string' ? row.messageId : undefined,
messageId: row.messageId,
color: typeof row.color === 'string' ? row.color : undefined,
attachments: Array.isArray(row.attachments) ? row.attachments : undefined,
source: typeof row.source === 'string' ? (row.source as InboxMessage['source']) : undefined,

View file

@ -376,6 +376,7 @@ describe('TeamDataService', () => {
getConfig: vi.fn(async () => ({
name: 'My team',
members: [{ name: 'lead', role: 'team lead' }],
leadSessionId: 'lead-1',
})),
} as never,
{} as never,
@ -397,6 +398,63 @@ describe('TeamDataService', () => {
await service.requestReview('my-team', 'task-1');
expect(requestReviewMock).toHaveBeenCalledWith('task-1', { from: 'lead' });
expect(requestReviewMock).toHaveBeenCalledWith('task-1', {
from: 'lead',
leadSessionId: 'lead-1',
});
});
it('propagates leadSessionId for kanban-driven review transitions', async () => {
const requestReviewMock = vi.fn();
const approveReviewMock = vi.fn();
const requestChangesMock = vi.fn();
const service = new TeamDataService(
{
listTeams: vi.fn(),
getConfig: vi.fn(async () => ({
name: 'My team',
members: [{ name: 'lead', role: 'team lead' }],
leadSessionId: 'lead-2',
})),
} as never,
{} as never,
{} as never,
{} as never,
{} as never,
{} as never,
{} as never,
{} as never,
{} as never,
{} as never,
() =>
({
review: {
requestReview: requestReviewMock,
approveReview: approveReviewMock,
requestChanges: requestChangesMock,
},
}) as never
);
await service.updateKanban('my-team', 'task-1', { op: 'set_column', column: 'review' });
await service.updateKanban('my-team', 'task-1', { op: 'set_column', column: 'approved' });
await service.updateKanban('my-team', 'task-1', { op: 'request_changes', comment: 'Needs fixes' });
expect(requestReviewMock).toHaveBeenCalledWith('task-1', {
from: 'lead',
leadSessionId: 'lead-2',
});
expect(approveReviewMock).toHaveBeenCalledWith('task-1', {
from: 'lead',
note: 'Approved from kanban',
'notify-owner': true,
leadSessionId: 'lead-2',
});
expect(requestChangesMock).toHaveBeenCalledWith('task-1', {
from: 'lead',
comment: 'Needs fixes',
leadSessionId: 'lead-2',
});
});
});

View file

@ -96,6 +96,7 @@ describe('TeamInboxReader', () => {
text: 'older',
timestamp: '2026-01-01T00:00:00.000Z',
read: false,
messageId: 'm-1',
},
])
);
@ -107,6 +108,7 @@ describe('TeamInboxReader', () => {
text: 'newer',
timestamp: '2026-01-02T00:00:00.000Z',
read: false,
messageId: 'm-2',
},
])
);
@ -116,4 +118,30 @@ describe('TeamInboxReader', () => {
expect(merged[0].text).toBe('newer');
expect(merged[1].text).toBe('older');
});
it('ignores legacy inbox rows without messageId', async () => {
hoisted.files.set(
'/mock/teams/my-team/inboxes/alice.json',
JSON.stringify([
{
from: 'alice',
text: 'legacy',
timestamp: '2026-01-01T00:00:00.000Z',
read: false,
},
{
from: 'alice',
text: 'supported',
timestamp: '2026-01-01T01:00:00.000Z',
read: false,
messageId: 'm-1',
},
])
);
const messages = await reader.getMessagesFor('my-team', 'alice');
expect(messages).toHaveLength(1);
expect(messages[0].text).toBe('supported');
expect(messages[0].messageId).toBe('m-1');
});
});

View file

@ -272,4 +272,25 @@ describe('TeamProvisioningService relayLeadInboxMessages', () => {
expect(second).toBe(1);
expect(writeSpy).toHaveBeenCalledTimes(1);
});
it('ignores unread lead inbox rows without messageId', async () => {
const service = new TeamProvisioningService();
const teamName = 'my-team';
seedConfig(teamName);
seedLeadInbox(teamName, [
{
from: 'bob',
text: 'Legacy row without id',
timestamp: '2026-02-23T10:00:00.000Z',
read: false,
},
]);
const { writeSpy } = attachAliveRun(service, teamName);
const relayed = await service.relayLeadInboxMessages(teamName);
expect(relayed).toBe(0);
expect(writeSpy).toHaveBeenCalledTimes(0);
expect(hoisted.appendSentMessage).not.toHaveBeenCalled();
});
});