fix: serialize team launch-state persistence
This commit is contained in:
parent
5a8a934d8d
commit
44e7ca3cf8
1 changed files with 69 additions and 9 deletions
|
|
@ -4308,6 +4308,7 @@ export class TeamProvisioningService {
|
|||
{ expiresAtMs: number; metadata: Map<string, LiveTeamAgentRuntimeMetadata> }
|
||||
>();
|
||||
private readonly launchStateStore = new TeamLaunchStateStore();
|
||||
private readonly launchStateStoreQueue = new Map<string, Promise<unknown>>();
|
||||
private readonly memberLogsFinder: TeamMemberLogsFinder;
|
||||
private readonly transcriptProjectResolver: TeamTranscriptProjectResolver;
|
||||
private teamChangeEmitter: ((event: TeamChangeEvent) => void) | null = null;
|
||||
|
|
@ -8347,7 +8348,7 @@ export class TeamProvisioningService {
|
|||
},
|
||||
updatedAt: input.observedAt,
|
||||
});
|
||||
await this.launchStateStore.write(input.teamName, snapshot);
|
||||
await this.writeLaunchStateSnapshot(input.teamName, snapshot);
|
||||
this.agentRuntimeSnapshotCache.delete(input.teamName);
|
||||
this.liveTeamAgentRuntimeMetadataCache.delete(input.teamName);
|
||||
this.teamChangeEmitter?.({
|
||||
|
|
@ -10213,6 +10214,12 @@ export class TeamProvisioningService {
|
|||
'error',
|
||||
error instanceof Error ? error.message : String(error)
|
||||
);
|
||||
if (run.isLaunch) {
|
||||
await this.persistLaunchStateSnapshot(
|
||||
run,
|
||||
run.provisioningComplete ? 'finished' : 'active'
|
||||
);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -10242,6 +10249,12 @@ export class TeamProvisioningService {
|
|||
'error',
|
||||
error instanceof Error ? error.message : String(error)
|
||||
);
|
||||
if (run.isLaunch) {
|
||||
await this.persistLaunchStateSnapshot(
|
||||
run,
|
||||
run.provisioningComplete ? 'finished' : 'active'
|
||||
);
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
|
@ -10379,7 +10392,7 @@ export class TeamProvisioningService {
|
|||
members: nextMembers,
|
||||
updatedAt,
|
||||
});
|
||||
await this.launchStateStore.write(teamName, nextSnapshot);
|
||||
await this.writeLaunchStateSnapshot(teamName, nextSnapshot);
|
||||
this.agentRuntimeSnapshotCache.delete(teamName);
|
||||
this.liveTeamAgentRuntimeMetadataCache.delete(teamName);
|
||||
}
|
||||
|
|
@ -13514,7 +13527,7 @@ export class TeamProvisioningService {
|
|||
launchPhase: result.launchPhase,
|
||||
members,
|
||||
});
|
||||
await this.launchStateStore.write(input.teamName, snapshot);
|
||||
await this.writeLaunchStateSnapshot(input.teamName, snapshot);
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
|
|
@ -16851,10 +16864,48 @@ export class TeamProvisioningService {
|
|||
}
|
||||
|
||||
private async clearPersistedLaunchState(teamName: string): Promise<void> {
|
||||
await this.enqueueLaunchStateStoreOperation(teamName, () =>
|
||||
this.clearPersistedLaunchStateNow(teamName)
|
||||
);
|
||||
}
|
||||
|
||||
private async clearPersistedLaunchStateNow(teamName: string): Promise<void> {
|
||||
await this.launchStateStore.clear(teamName);
|
||||
await clearBootstrapState(teamName);
|
||||
}
|
||||
|
||||
private async writeLaunchStateSnapshot(
|
||||
teamName: string,
|
||||
snapshot: PersistedTeamLaunchSnapshot
|
||||
): Promise<void> {
|
||||
await this.enqueueLaunchStateStoreOperation(teamName, () =>
|
||||
this.writeLaunchStateSnapshotNow(teamName, snapshot)
|
||||
);
|
||||
}
|
||||
|
||||
private async writeLaunchStateSnapshotNow(
|
||||
teamName: string,
|
||||
snapshot: PersistedTeamLaunchSnapshot
|
||||
): Promise<void> {
|
||||
await this.launchStateStore.write(teamName, snapshot);
|
||||
}
|
||||
|
||||
private async enqueueLaunchStateStoreOperation<T>(
|
||||
teamName: string,
|
||||
operation: () => Promise<T>
|
||||
): Promise<T> {
|
||||
const previous = this.launchStateStoreQueue.get(teamName);
|
||||
const queued = (previous ?? Promise.resolve()).catch(() => undefined).then(operation);
|
||||
this.launchStateStoreQueue.set(teamName, queued);
|
||||
try {
|
||||
return await queued;
|
||||
} finally {
|
||||
if (this.launchStateStoreQueue.get(teamName) === queued) {
|
||||
this.launchStateStoreQueue.delete(teamName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private getFailedSpawnMembers(
|
||||
run: ProvisioningRun
|
||||
): { name: string; error?: string; updatedAt: string }[] {
|
||||
|
|
@ -17434,11 +17485,20 @@ export class TeamProvisioningService {
|
|||
launchPhase: 'active' | 'finished' | 'reconciled' = run.provisioningComplete
|
||||
? 'finished'
|
||||
: 'active'
|
||||
): Promise<PersistedTeamLaunchSnapshot | null> {
|
||||
return this.enqueueLaunchStateStoreOperation(run.teamName, () =>
|
||||
this.persistLaunchStateSnapshotNow(run, launchPhase)
|
||||
);
|
||||
}
|
||||
|
||||
private async persistLaunchStateSnapshotNow(
|
||||
run: ProvisioningRun,
|
||||
launchPhase: 'active' | 'finished' | 'reconciled'
|
||||
): Promise<PersistedTeamLaunchSnapshot | null> {
|
||||
const snapshot = this.buildLiveLaunchSnapshotForRun(run, launchPhase);
|
||||
if (!snapshot) {
|
||||
if (run.isLaunch) {
|
||||
await this.clearPersistedLaunchState(run.teamName);
|
||||
await this.clearPersistedLaunchStateNow(run.teamName);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
|
@ -17447,13 +17507,13 @@ export class TeamProvisioningService {
|
|||
const filteredSnapshot = this.filterRemovedMembersFromLaunchSnapshot(snapshot, metaMembers);
|
||||
|
||||
if (filteredSnapshot.teamLaunchState === 'clean_success' && launchPhase !== 'active') {
|
||||
await this.clearPersistedLaunchState(run.teamName);
|
||||
await this.clearPersistedLaunchStateNow(run.teamName);
|
||||
this.agentRuntimeSnapshotCache.delete(run.teamName);
|
||||
this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName);
|
||||
return null;
|
||||
}
|
||||
|
||||
await this.launchStateStore.write(run.teamName, filteredSnapshot);
|
||||
await this.writeLaunchStateSnapshotNow(run.teamName, filteredSnapshot);
|
||||
this.agentRuntimeSnapshotCache.delete(run.teamName);
|
||||
this.liveTeamAgentRuntimeMetadataCache.delete(run.teamName);
|
||||
return filteredSnapshot;
|
||||
|
|
@ -18009,7 +18069,7 @@ export class TeamProvisioningService {
|
|||
primaryStatuses,
|
||||
secondaryMembers,
|
||||
});
|
||||
await this.launchStateStore.write(teamName, recoveredSnapshot);
|
||||
await this.writeLaunchStateSnapshot(teamName, recoveredSnapshot);
|
||||
return recoveredSnapshot;
|
||||
}
|
||||
|
||||
|
|
@ -18440,7 +18500,7 @@ export class TeamProvisioningService {
|
|||
return { snapshot: null, statuses: {} };
|
||||
}
|
||||
|
||||
await this.launchStateStore.write(teamName, reconciled);
|
||||
await this.writeLaunchStateSnapshot(teamName, reconciled);
|
||||
return {
|
||||
snapshot: reconciled,
|
||||
statuses: snapshotToMemberSpawnStatuses(reconciled),
|
||||
|
|
@ -19404,7 +19464,7 @@ export class TeamProvisioningService {
|
|||
previousLaunchState,
|
||||
force: true,
|
||||
});
|
||||
await this.launchStateStore.write(
|
||||
await this.writeLaunchStateSnapshot(
|
||||
teamName,
|
||||
createPersistedLaunchSnapshot({
|
||||
teamName,
|
||||
|
|
|
|||
Loading…
Reference in a new issue