fix worker lifecycle edge cases
This commit is contained in:
parent
dc0627c285
commit
0531fc1dbf
2 changed files with 24 additions and 16 deletions
|
|
@ -577,6 +577,12 @@ async function handleGetData(
|
|||
if (getDataMs >= 1500) {
|
||||
logger.warn(`[teams:getData] slow team=${tn} ms=${getDataMs}`);
|
||||
}
|
||||
const teamDataService = getTeamDataService();
|
||||
if (data.processes.some((process) => !process.stoppedAt)) {
|
||||
teamDataService.trackProcessHealthForTeam?.(tn);
|
||||
} else {
|
||||
teamDataService.untrackProcessHealthForTeam?.(tn);
|
||||
}
|
||||
const provisioning = getTeamProvisioningService();
|
||||
const isAlive = provisioning.isTeamAlive(tn);
|
||||
|
||||
|
|
|
|||
|
|
@ -60,6 +60,18 @@ export class TeamDataWorkerClient {
|
|||
private warnedUnavailable = false;
|
||||
private pending = new Map<string, PendingEntry>();
|
||||
|
||||
private failWorker(worker: Worker, error: Error): void {
|
||||
if (this.worker !== worker) return;
|
||||
|
||||
this.worker = null;
|
||||
const pendingEntries = Array.from(this.pending.values());
|
||||
this.pending.clear();
|
||||
|
||||
for (const entry of pendingEntries) {
|
||||
entry.reject(error);
|
||||
}
|
||||
}
|
||||
|
||||
isAvailable(): boolean {
|
||||
if (!this.workerPath && !this.warnedUnavailable) {
|
||||
this.warnedUnavailable = true;
|
||||
|
|
@ -90,23 +102,13 @@ export class TeamDataWorkerClient {
|
|||
// Without this guard, a stale worker's exit event can reject
|
||||
// pending requests that belong to a newer replacement worker.
|
||||
w.on('error', (err) => {
|
||||
if (this.worker !== w) return;
|
||||
logger.error('Worker error', err);
|
||||
for (const [, entry] of this.pending) {
|
||||
entry.reject(err instanceof Error ? err : new Error(String(err)));
|
||||
}
|
||||
this.pending.clear();
|
||||
this.worker = null;
|
||||
this.failWorker(w, err instanceof Error ? err : new Error(String(err)));
|
||||
});
|
||||
|
||||
w.on('exit', (code) => {
|
||||
if (this.worker !== w) return;
|
||||
if (code !== 0) logger.warn(`Worker exited with code ${code}`);
|
||||
for (const [, entry] of this.pending) {
|
||||
entry.reject(new Error(`Worker exited with code ${code}`));
|
||||
}
|
||||
this.pending.clear();
|
||||
this.worker = null;
|
||||
this.failWorker(w, new Error(`Worker exited with code ${code}`));
|
||||
});
|
||||
|
||||
return w;
|
||||
|
|
@ -121,10 +123,10 @@ export class TeamDataWorkerClient {
|
|||
|
||||
return new Promise((resolve, reject) => {
|
||||
const timeout = setTimeout(() => {
|
||||
this.pending.delete(id);
|
||||
this.worker?.terminate().catch(() => undefined);
|
||||
this.worker = null;
|
||||
reject(new Error(`Worker call timeout after ${WORKER_CALL_TIMEOUT_MS}ms`));
|
||||
const timeoutError = new Error(`Worker call timeout after ${WORKER_CALL_TIMEOUT_MS}ms`);
|
||||
this.failWorker(worker, timeoutError);
|
||||
worker.terminate().catch(() => undefined);
|
||||
reject(timeoutError);
|
||||
}, WORKER_CALL_TIMEOUT_MS);
|
||||
|
||||
this.pending.set(id, {
|
||||
|
|
|
|||
Loading…
Reference in a new issue