chore(foundry): workbench action responsiveness (#254)

* wip

* wip
This commit is contained in:
Nathan Flurry 2026-03-14 20:42:18 -07:00 committed by GitHub
parent 400f9a214e
commit 99abb9d42e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
171 changed files with 7260 additions and 7342 deletions

View file

@ -0,0 +1,33 @@
import type { TopicData, TopicKey, TopicParams } from "./topics.js";
export type TopicStatus = "loading" | "connected" | "error";
export interface DebugSubscriptionTopic {
topicKey: TopicKey;
cacheKey: string;
listenerCount: number;
status: TopicStatus;
lastRefreshAt: number | null;
}
export interface TopicState<K extends TopicKey> {
data: TopicData<K> | undefined;
status: TopicStatus;
error: Error | null;
}
/**
* The SubscriptionManager owns all realtime actor connections and cached state.
*
* Multiple subscribers to the same topic share one connection and one cache
* entry. After the last subscriber leaves, a short grace period keeps the
* connection warm so navigation does not thrash actor connections.
*/
export interface SubscriptionManager {
subscribe<K extends TopicKey>(topicKey: K, params: TopicParams<K>, listener: () => void): () => void;
getSnapshot<K extends TopicKey>(topicKey: K, params: TopicParams<K>): TopicData<K> | undefined;
getStatus<K extends TopicKey>(topicKey: K, params: TopicParams<K>): TopicStatus;
getError<K extends TopicKey>(topicKey: K, params: TopicParams<K>): Error | null;
listDebugTopics(): DebugSubscriptionTopic[];
dispose(): void;
}

View file

@ -0,0 +1,12 @@
import { createMockBackendClient } from "../mock/backend-client.js";
import { RemoteSubscriptionManager } from "./remote-manager.js";
/**
* Mock implementation shares the same subscription-manager harness as the remote
* path, but uses the in-memory mock backend that synthesizes actor events.
*/
export class MockSubscriptionManager extends RemoteSubscriptionManager {
constructor() {
super(createMockBackendClient());
}
}

View file

@ -0,0 +1,190 @@
import type { BackendClient } from "../backend-client.js";
import type { DebugSubscriptionTopic, SubscriptionManager, TopicStatus } from "./manager.js";
import { topicDefinitions, type TopicData, type TopicDefinition, type TopicKey, type TopicParams } from "./topics.js";
const GRACE_PERIOD_MS = 30_000;
/**
* Remote implementation of SubscriptionManager.
* Each cache entry owns one actor connection plus one materialized snapshot.
*/
export class RemoteSubscriptionManager implements SubscriptionManager {
private entries = new Map<string, TopicEntry<any, any, any>>();
constructor(private readonly backend: BackendClient) {}
subscribe<K extends TopicKey>(topicKey: K, params: TopicParams<K>, listener: () => void): () => void {
const definition = topicDefinitions[topicKey] as unknown as TopicDefinition<any, any, any>;
const cacheKey = definition.key(params as any);
let entry = this.entries.get(cacheKey);
if (!entry) {
entry = new TopicEntry(topicKey, cacheKey, definition, this.backend, params as any);
this.entries.set(cacheKey, entry);
}
entry.cancelTeardown();
entry.addListener(listener);
entry.ensureStarted();
return () => {
const current = this.entries.get(cacheKey);
if (!current) {
return;
}
current.removeListener(listener);
if (current.listenerCount === 0) {
current.scheduleTeardown(GRACE_PERIOD_MS, () => {
this.entries.delete(cacheKey);
});
}
};
}
getSnapshot<K extends TopicKey>(topicKey: K, params: TopicParams<K>): TopicData<K> | undefined {
return this.entries.get((topicDefinitions[topicKey] as any).key(params))?.data as TopicData<K> | undefined;
}
getStatus<K extends TopicKey>(topicKey: K, params: TopicParams<K>): TopicStatus {
return this.entries.get((topicDefinitions[topicKey] as any).key(params))?.status ?? "loading";
}
getError<K extends TopicKey>(topicKey: K, params: TopicParams<K>): Error | null {
return this.entries.get((topicDefinitions[topicKey] as any).key(params))?.error ?? null;
}
listDebugTopics(): DebugSubscriptionTopic[] {
return [...this.entries.values()]
.filter((entry) => entry.listenerCount > 0)
.map((entry) => entry.getDebugTopic())
.sort((left, right) => left.cacheKey.localeCompare(right.cacheKey));
}
dispose(): void {
for (const entry of this.entries.values()) {
entry.dispose();
}
this.entries.clear();
}
}
class TopicEntry<TData, TParams, TEvent> {
data: TData | undefined;
status: TopicStatus = "loading";
error: Error | null = null;
listenerCount = 0;
lastRefreshAt: number | null = null;
private readonly listeners = new Set<() => void>();
private conn: Awaited<ReturnType<TopicDefinition<TData, TParams, TEvent>["connect"]>> | null = null;
private unsubscribeEvent: (() => void) | null = null;
private unsubscribeError: (() => void) | null = null;
private teardownTimer: ReturnType<typeof setTimeout> | null = null;
private startPromise: Promise<void> | null = null;
private started = false;
constructor(
private readonly topicKey: TopicKey,
private readonly cacheKey: string,
private readonly definition: TopicDefinition<TData, TParams, TEvent>,
private readonly backend: BackendClient,
private readonly params: TParams,
) {}
getDebugTopic(): DebugSubscriptionTopic {
return {
topicKey: this.topicKey,
cacheKey: this.cacheKey,
listenerCount: this.listenerCount,
status: this.status,
lastRefreshAt: this.lastRefreshAt,
};
}
addListener(listener: () => void): void {
this.listeners.add(listener);
this.listenerCount = this.listeners.size;
}
removeListener(listener: () => void): void {
this.listeners.delete(listener);
this.listenerCount = this.listeners.size;
}
ensureStarted(): void {
if (this.started || this.startPromise) {
return;
}
this.startPromise = this.start().finally(() => {
this.startPromise = null;
});
}
scheduleTeardown(ms: number, onTeardown: () => void): void {
this.teardownTimer = setTimeout(() => {
this.dispose();
onTeardown();
}, ms);
}
cancelTeardown(): void {
if (this.teardownTimer) {
clearTimeout(this.teardownTimer);
this.teardownTimer = null;
}
}
dispose(): void {
this.cancelTeardown();
this.unsubscribeEvent?.();
this.unsubscribeError?.();
if (this.conn) {
void this.conn.dispose();
}
this.conn = null;
this.data = undefined;
this.status = "loading";
this.error = null;
this.lastRefreshAt = null;
this.started = false;
}
private async start(): Promise<void> {
this.status = "loading";
this.error = null;
this.notify();
try {
this.conn = await this.definition.connect(this.backend, this.params);
this.unsubscribeEvent = this.conn.on(this.definition.event, (event: TEvent) => {
if (this.data === undefined) {
return;
}
this.data = this.definition.applyEvent(this.data, event);
this.lastRefreshAt = Date.now();
this.notify();
});
this.unsubscribeError = this.conn.onError((error: unknown) => {
this.status = "error";
this.error = error instanceof Error ? error : new Error(String(error));
this.notify();
});
this.data = await this.definition.fetchInitial(this.backend, this.params);
this.status = "connected";
this.lastRefreshAt = Date.now();
this.started = true;
this.notify();
} catch (error) {
this.status = "error";
this.error = error instanceof Error ? error : new Error(String(error));
this.started = false;
this.notify();
}
}
private notify(): void {
for (const listener of [...this.listeners]) {
listener();
}
}
}

View file

@ -0,0 +1,147 @@
import type {
AppEvent,
FoundryAppSnapshot,
SandboxProviderId,
SandboxProcessesEvent,
SessionEvent,
TaskEvent,
WorkbenchSessionDetail,
WorkbenchTaskDetail,
OrganizationEvent,
OrganizationSummarySnapshot,
} from "@sandbox-agent/foundry-shared";
import type { ActorConn, BackendClient, SandboxProcessRecord } from "../backend-client.js";
/**
* Topic definitions for the subscription manager.
*
* Each topic describes one actor connection plus one materialized read model.
* Events always carry full replacement payloads for the changed entity so the
* client can replace cached state directly instead of reconstructing patches.
*/
export interface TopicDefinition<TData, TParams, TEvent> {
key: (params: TParams) => string;
event: string;
connect: (backend: BackendClient, params: TParams) => Promise<ActorConn>;
fetchInitial: (backend: BackendClient, params: TParams) => Promise<TData>;
applyEvent: (current: TData, event: TEvent) => TData;
}
export interface AppTopicParams {}
export interface OrganizationTopicParams {
organizationId: string;
}
export interface TaskTopicParams {
organizationId: string;
repoId: string;
taskId: string;
}
export interface SessionTopicParams {
organizationId: string;
repoId: string;
taskId: string;
sessionId: string;
}
export interface SandboxProcessesTopicParams {
organizationId: string;
sandboxProviderId: SandboxProviderId;
sandboxId: string;
}
function upsertById<T extends { id: string }>(items: T[], nextItem: T, sort: (left: T, right: T) => number): T[] {
const filtered = items.filter((item) => item.id !== nextItem.id);
return [...filtered, nextItem].sort(sort);
}
function upsertByPrId<T extends { prId: string }>(items: T[], nextItem: T, sort: (left: T, right: T) => number): T[] {
const filtered = items.filter((item) => item.prId !== nextItem.prId);
return [...filtered, nextItem].sort(sort);
}
export const topicDefinitions = {
app: {
key: () => "app",
event: "appUpdated",
connect: (backend: BackendClient, _params: AppTopicParams) => backend.connectOrganization("app"),
fetchInitial: (backend: BackendClient, _params: AppTopicParams) => backend.getAppSnapshot(),
applyEvent: (_current: FoundryAppSnapshot, event: AppEvent) => event.snapshot,
} satisfies TopicDefinition<FoundryAppSnapshot, AppTopicParams, AppEvent>,
organization: {
key: (params: OrganizationTopicParams) => `organization:${params.organizationId}`,
event: "organizationUpdated",
connect: (backend: BackendClient, params: OrganizationTopicParams) => backend.connectOrganization(params.organizationId),
fetchInitial: (backend: BackendClient, params: OrganizationTopicParams) => backend.getOrganizationSummary(params.organizationId),
applyEvent: (current: OrganizationSummarySnapshot, event: OrganizationEvent) => {
switch (event.type) {
case "taskSummaryUpdated":
return {
...current,
taskSummaries: upsertById(current.taskSummaries, event.taskSummary, (left, right) => right.updatedAtMs - left.updatedAtMs),
};
case "taskRemoved":
return {
...current,
taskSummaries: current.taskSummaries.filter((task) => task.id !== event.taskId),
};
case "repoAdded":
case "repoUpdated":
return {
...current,
repos: upsertById(current.repos, event.repo, (left, right) => right.latestActivityMs - left.latestActivityMs),
};
case "repoRemoved":
return {
...current,
repos: current.repos.filter((repo) => repo.id !== event.repoId),
};
case "pullRequestUpdated":
return {
...current,
openPullRequests: upsertByPrId(current.openPullRequests, event.pullRequest, (left, right) => right.updatedAtMs - left.updatedAtMs),
};
case "pullRequestRemoved":
return {
...current,
openPullRequests: current.openPullRequests.filter((pullRequest) => pullRequest.prId !== event.prId),
};
}
},
} satisfies TopicDefinition<OrganizationSummarySnapshot, OrganizationTopicParams, OrganizationEvent>,
task: {
key: (params: TaskTopicParams) => `task:${params.organizationId}:${params.taskId}`,
event: "taskUpdated",
connect: (backend: BackendClient, params: TaskTopicParams) => backend.connectTask(params.organizationId, params.repoId, params.taskId),
fetchInitial: (backend: BackendClient, params: TaskTopicParams) => backend.getTaskDetail(params.organizationId, params.repoId, params.taskId),
applyEvent: (_current: WorkbenchTaskDetail, event: TaskEvent) => event.detail,
} satisfies TopicDefinition<WorkbenchTaskDetail, TaskTopicParams, TaskEvent>,
session: {
key: (params: SessionTopicParams) => `session:${params.organizationId}:${params.taskId}:${params.sessionId}`,
event: "sessionUpdated",
connect: (backend: BackendClient, params: SessionTopicParams) => backend.connectTask(params.organizationId, params.repoId, params.taskId),
fetchInitial: (backend: BackendClient, params: SessionTopicParams) =>
backend.getSessionDetail(params.organizationId, params.repoId, params.taskId, params.sessionId),
applyEvent: (current: WorkbenchSessionDetail, event: SessionEvent) => {
if (event.session.sessionId !== current.sessionId) {
return current;
}
return event.session;
},
} satisfies TopicDefinition<WorkbenchSessionDetail, SessionTopicParams, SessionEvent>,
sandboxProcesses: {
key: (params: SandboxProcessesTopicParams) => `sandbox:${params.organizationId}:${params.sandboxProviderId}:${params.sandboxId}`,
event: "processesUpdated",
connect: (backend: BackendClient, params: SandboxProcessesTopicParams) =>
backend.connectSandbox(params.organizationId, params.sandboxProviderId, params.sandboxId),
fetchInitial: async (backend: BackendClient, params: SandboxProcessesTopicParams) =>
(await backend.listSandboxProcesses(params.organizationId, params.sandboxProviderId, params.sandboxId)).processes,
applyEvent: (_current: SandboxProcessRecord[], event: SandboxProcessesEvent) => event.processes,
} satisfies TopicDefinition<SandboxProcessRecord[], SandboxProcessesTopicParams, SandboxProcessesEvent>,
} as const;
export type TopicKey = keyof typeof topicDefinitions;
export type TopicParams<K extends TopicKey> = Parameters<(typeof topicDefinitions)[K]["fetchInitial"]>[1];
export type TopicData<K extends TopicKey> = Awaited<ReturnType<(typeof topicDefinitions)[K]["fetchInitial"]>>;

View file

@ -0,0 +1,56 @@
import { useMemo, useRef, useSyncExternalStore } from "react";
import type { SubscriptionManager, TopicState } from "./manager.js";
import { topicDefinitions, type TopicKey, type TopicParams } from "./topics.js";
/**
* React bridge for the subscription manager.
*
* `null` params disable the subscription entirely, which is how screens express
* conditional subscription in task/session/sandbox topics.
*/
export function useSubscription<K extends TopicKey>(manager: SubscriptionManager, topicKey: K, params: TopicParams<K> | null): TopicState<K> {
const paramsKey = params ? (topicDefinitions[topicKey] as any).key(params) : null;
const paramsRef = useRef<TopicParams<K> | null>(params);
paramsRef.current = params;
const subscribe = useMemo(() => {
return (listener: () => void) => {
const currentParams = paramsRef.current;
if (!currentParams) {
return () => {};
}
return manager.subscribe(topicKey, currentParams, listener);
};
}, [manager, topicKey, paramsKey]);
const getSnapshot = useMemo(() => {
let lastSnapshot: TopicState<K> | null = null;
return (): TopicState<K> => {
const currentParams = paramsRef.current;
const nextSnapshot: TopicState<K> = currentParams
? {
data: manager.getSnapshot(topicKey, currentParams),
status: manager.getStatus(topicKey, currentParams),
error: manager.getError(topicKey, currentParams),
}
: {
data: undefined,
status: "loading",
error: null,
};
// `useSyncExternalStore` requires referentially-stable snapshots when the
// underlying store has not changed. Reuse the previous object whenever
// the topic data/status/error triplet is unchanged.
if (lastSnapshot && lastSnapshot.data === nextSnapshot.data && lastSnapshot.status === nextSnapshot.status && lastSnapshot.error === nextSnapshot.error) {
return lastSnapshot;
}
lastSnapshot = nextSnapshot;
return nextSnapshot;
};
}, [manager, topicKey, paramsKey]);
return useSyncExternalStore(subscribe, getSnapshot, getSnapshot);
}