diff --git a/.env.development.example b/.env.development.example index c4dac97..c4132f4 100644 --- a/.env.development.example +++ b/.env.development.example @@ -8,7 +8,7 @@ APP_URL=http://localhost:4173 BETTER_AUTH_URL=http://localhost:4173 BETTER_AUTH_SECRET=sandbox-agent-foundry-development-only-change-me -GITHUB_REDIRECT_URI=http://localhost:4173/v1/auth/github/callback +GITHUB_REDIRECT_URI=http://localhost:4173/v1/auth/callback/github # Fill these in when enabling live GitHub OAuth. GITHUB_CLIENT_ID= diff --git a/docs/deploy/foundry-self-hosting.mdx b/docs/deploy/foundry-self-hosting.mdx index 04b0e9f..172d680 100644 --- a/docs/deploy/foundry-self-hosting.mdx +++ b/docs/deploy/foundry-self-hosting.mdx @@ -38,7 +38,7 @@ These values can be safely defaulted for local development: - `APP_URL=http://localhost:4173` - `BETTER_AUTH_URL=http://localhost:7741` - `BETTER_AUTH_SECRET=sandbox-agent-foundry-development-only-change-me` -- `GITHUB_REDIRECT_URI=http://localhost:7741/v1/auth/github/callback` +- `GITHUB_REDIRECT_URI=http://localhost:7741/v1/auth/callback/github` These should be treated as development-only values. diff --git a/foundry/CLAUDE.md b/foundry/CLAUDE.md index 2e6e82e..c0e7871 100644 --- a/foundry/CLAUDE.md +++ b/foundry/CLAUDE.md @@ -65,6 +65,58 @@ Use `pnpm` workspaces and Turborepo. - When asked for screenshots, capture all relevant affected screens and modal states, not just a single viewport. Include empty, populated, success, and blocked/error states when they are part of the changed flow. - If a screenshot catches a transition frame, blank modal, or otherwise misleading state, retake it before reporting it. +## Realtime Data Architecture + +### Core pattern: fetch initial state + subscribe to deltas + +All client data flows follow the same pattern: + +1. **Connect** to the actor via WebSocket. +2. **Fetch initial state** via an action call to get the current materialized snapshot. +3. **Subscribe to events** on the connection. Events carry **full replacement payloads** for the changed entity (not empty notifications, not patches — the complete new state of the thing that changed). +4. **Unsubscribe** after a 30-second grace period when interest ends (screen navigation, component unmount). The grace period prevents thrashing during screen transitions and React double-renders. + +Do not use polling (`refetchInterval`), empty "go re-fetch" broadcast events, or full-snapshot re-fetches on every mutation. Every mutation broadcasts the new absolute state of the changed entity to connected clients. + +### Materialized state in coordinator actors + +- **Workspace actor** materializes sidebar-level data in its own SQLite: repo catalog, task summaries (title, status, branch, PR, updatedAt), repo summaries (overview/branch state), and session summaries (id, name, status, unread, model — no transcript). Task actors push summary changes to the workspace actor when they mutate. The workspace actor broadcasts the updated entity to connected clients. `getWorkspaceSummary` reads from local tables only — no fan-out to child actors. +- **Task actor** materializes its own detail state (session summaries, sandbox info, diffs, file tree). `getTaskDetail` reads from the task actor's own SQLite. The task actor broadcasts updates directly to clients connected to it. +- **Session data** lives on the task actor but is a separate subscription topic. The task topic includes `sessions_summary` (list without content). The `session` topic provides full transcript and draft state. Clients subscribe to the `session` topic for whichever session tab is active, and filter `sessionUpdated` events by session ID (ignoring events for other sessions on the same actor). +- The expensive fan-out (querying every project/task actor) only exists as a background reconciliation/rebuild path, never on the hot read path. + +### Interest manager + +The interest manager (`packages/client`) is a global singleton that manages WebSocket connections, cached state, and subscriptions for all topics. It: + +- **Deduplicates** — multiple subscribers to the same topic share one connection and one cached state. +- **Grace period (30s)** — when the last subscriber leaves, the connection and state stay alive for 30 seconds before teardown. This keeps data warm for back-navigation and prevents thrashing. +- **Exposes a single hook** — `useInterest(topicKey, params)` returns `{ data, status, error }`. Null params = no subscription (conditional interest). +- **Shared harness, separate implementations** — the `InterestManager` interface is shared between mock and remote implementations. The mock implementation uses in-memory state. The remote implementation uses WebSocket connections. The API/client exposure is identical for both. + +### Topics + +Each topic maps to one actor connection and one event stream: + +| Topic | Actor | Event | Data | +|---|---|---|---| +| `app` | Workspace `"app"` | `appUpdated` | Auth, orgs, onboarding | +| `workspace` | Workspace `{workspaceId}` | `workspaceUpdated` | Repo catalog, task summaries, repo summaries | +| `task` | Task `{workspaceId, repoId, taskId}` | `taskUpdated` | Session summaries, sandbox info, diffs, file tree | +| `session` | Task `{workspaceId, repoId, taskId}` (filtered by sessionId) | `sessionUpdated` | Transcript, draft state | +| `sandboxProcesses` | SandboxInstance | `processesUpdated` | Process list | + +The client subscribes to `app` always, `workspace` when entering a workspace, `task` when viewing a task, and `session` when viewing a specific session tab. At most 4 actor connections at a time (app + workspace + task + sandbox if terminal is open). The `session` topic reuses the task actor connection and filters by session ID. + +### Rules + +- Do not add `useQuery` with `refetchInterval` for data that should be push-based. +- Do not broadcast empty notification events. Events must carry the full new state of the changed entity. +- Do not re-fetch full snapshots after mutations. The mutation triggers a server-side broadcast with the new entity state; the client replaces it in local state. +- All event subscriptions go through the interest manager. Do not create ad-hoc `handle.connect()` + `conn.on()` patterns. +- Backend mutations that affect sidebar data (task title, status, branch, PR state) must push the updated summary to the parent workspace actor, which broadcasts to workspace subscribers. +- Comment architecture-related code: add doc comments explaining the materialized state pattern, why deltas flow the way they do, and the relationship between parent/child actor broadcasts. New contributors should understand the data flow from comments alone. + ## Runtime Policy - Runtime is Bun-native. diff --git a/foundry/packages/backend/src/actors/sandbox-instance/index.ts b/foundry/packages/backend/src/actors/sandbox-instance/index.ts index 2fa84fc..566a378 100644 --- a/foundry/packages/backend/src/actors/sandbox-instance/index.ts +++ b/foundry/packages/backend/src/actors/sandbox-instance/index.ts @@ -278,10 +278,12 @@ async function getSandboxAgentClient(c: any) { }); } -function broadcastProcessesUpdated(c: any): void { +async function broadcastProcessesUpdated(c: any): Promise { + const client = await getSandboxAgentClient(c); + const { processes } = await client.listProcesses(); c.broadcast("processesUpdated", { - sandboxId: c.state.sandboxId, - at: Date.now(), + type: "processesUpdated", + processes, }); } @@ -475,7 +477,7 @@ export const sandboxInstance = actor({ async createProcess(c: any, request: ProcessCreateRequest): Promise { const client = await getSandboxAgentClient(c); const created = await client.createProcess(request); - broadcastProcessesUpdated(c); + await broadcastProcessesUpdated(c); return created; }, @@ -492,21 +494,21 @@ export const sandboxInstance = actor({ async stopProcess(c: any, request: { processId: string; query?: ProcessSignalQuery }): Promise { const client = await getSandboxAgentClient(c); const stopped = await client.stopProcess(request.processId, request.query); - broadcastProcessesUpdated(c); + await broadcastProcessesUpdated(c); return stopped; }, async killProcess(c: any, request: { processId: string; query?: ProcessSignalQuery }): Promise { const client = await getSandboxAgentClient(c); const killed = await client.killProcess(request.processId, request.query); - broadcastProcessesUpdated(c); + await broadcastProcessesUpdated(c); return killed; }, async deleteProcess(c: any, request: { processId: string }): Promise { const client = await getSandboxAgentClient(c); await client.deleteProcess(request.processId); - broadcastProcessesUpdated(c); + await broadcastProcessesUpdated(c); }, async providerState(c: any): Promise<{ providerId: ProviderId; sandboxId: string; state: string; at: number }> { diff --git a/foundry/packages/backend/src/actors/task/index.ts b/foundry/packages/backend/src/actors/task/index.ts index d8bf069..674ac84 100644 --- a/foundry/packages/backend/src/actors/task/index.ts +++ b/foundry/packages/backend/src/actors/task/index.ts @@ -19,7 +19,9 @@ import { changeWorkbenchModel, closeWorkbenchSession, createWorkbenchSession, - getWorkbenchTask, + getSessionDetail, + getTaskDetail, + getTaskSummary, markWorkbenchUnread, publishWorkbenchPr, renameWorkbenchBranch, @@ -228,8 +230,16 @@ export const task = actor({ return await getCurrentRecord({ db: c.db, state: c.state }); }, - async getWorkbench(c) { - return await getWorkbenchTask(c); + async getTaskSummary(c) { + return await getTaskSummary(c); + }, + + async getTaskDetail(c) { + return await getTaskDetail(c); + }, + + async getSessionDetail(c, input: { sessionId: string }) { + return await getSessionDetail(c, input.sessionId); }, async markWorkbenchUnread(c): Promise { diff --git a/foundry/packages/backend/src/actors/task/workbench.ts b/foundry/packages/backend/src/actors/task/workbench.ts index 7de8f00..e243e70 100644 --- a/foundry/packages/backend/src/actors/task/workbench.ts +++ b/foundry/packages/backend/src/actors/task/workbench.ts @@ -286,11 +286,6 @@ async function requireReadySessionMeta(c: any, tabId: string): Promise { return meta; } -async function notifyWorkbenchUpdated(c: any): Promise { - const workspace = await getOrCreateWorkspace(c, c.state.workspaceId); - await workspace.notifyWorkbenchUpdated({}); -} - function shellFragment(parts: string[]): string { return parts.join(" && "); } @@ -600,42 +595,60 @@ export async function ensureWorkbenchSeeded(c: any): Promise { return record; } -export async function getWorkbenchTask(c: any): Promise { +function buildSessionSummary(record: any, meta: any): any { + const derivedSandboxSessionId = meta.sandboxSessionId ?? (meta.status === "pending_provision" && record.activeSessionId ? record.activeSessionId : null); + const sessionStatus = + meta.status === "ready" && derivedSandboxSessionId ? activeSessionStatus(record, derivedSandboxSessionId) : meta.status === "error" ? "error" : "idle"; + let thinkingSinceMs = meta.thinkingSinceMs ?? null; + let unread = Boolean(meta.unread); + if (thinkingSinceMs && sessionStatus !== "running") { + thinkingSinceMs = null; + unread = true; + } + + return { + id: meta.id, + sessionId: derivedSandboxSessionId, + sessionName: meta.sessionName, + agent: agentKindForModel(meta.model), + model: meta.model, + status: sessionStatus, + thinkingSinceMs: sessionStatus === "running" ? thinkingSinceMs : null, + unread, + created: Boolean(meta.created || derivedSandboxSessionId), + }; +} + +function buildSessionDetailFromMeta(record: any, meta: any): any { + const summary = buildSessionSummary(record, meta); + return { + sessionId: meta.tabId, + tabId: meta.tabId, + sandboxSessionId: summary.sessionId, + sessionName: summary.sessionName, + agent: summary.agent, + model: summary.model, + status: summary.status, + thinkingSinceMs: summary.thinkingSinceMs, + unread: summary.unread, + created: summary.created, + draft: { + text: meta.draftText ?? "", + attachments: Array.isArray(meta.draftAttachments) ? meta.draftAttachments : [], + updatedAtMs: meta.draftUpdatedAtMs ?? null, + }, + transcript: meta.transcript ?? [], + }; +} + +/** + * Builds a WorkbenchTaskSummary from local task actor state. Task actors push + * this to the parent workspace actor so workspace sidebar reads stay local. + */ +export async function buildTaskSummary(c: any): Promise { const record = await ensureWorkbenchSeeded(c); - const gitState = await readCachedGitState(c); const sessions = await listSessionMetaRows(c); await maybeScheduleWorkbenchRefreshes(c, record, sessions); - const tabs = []; - - for (const meta of sessions) { - const derivedSandboxSessionId = meta.sandboxSessionId ?? (meta.status === "pending_provision" && record.activeSessionId ? record.activeSessionId : null); - const sessionStatus = - meta.status === "ready" && derivedSandboxSessionId ? activeSessionStatus(record, derivedSandboxSessionId) : meta.status === "error" ? "error" : "idle"; - let thinkingSinceMs = meta.thinkingSinceMs ?? null; - let unread = Boolean(meta.unread); - if (thinkingSinceMs && sessionStatus !== "running") { - thinkingSinceMs = null; - unread = true; - } - - tabs.push({ - id: meta.id, - sessionId: derivedSandboxSessionId, - sessionName: meta.sessionName, - agent: agentKindForModel(meta.model), - model: meta.model, - status: sessionStatus, - thinkingSinceMs: sessionStatus === "running" ? thinkingSinceMs : null, - unread, - created: Boolean(meta.created || derivedSandboxSessionId), - draft: { - text: meta.draftText ?? "", - attachments: Array.isArray(meta.draftAttachments) ? meta.draftAttachments : [], - updatedAtMs: meta.draftUpdatedAtMs ?? null, - }, - transcript: meta.transcript ?? [], - }); - } return { id: c.state.taskId, @@ -646,19 +659,98 @@ export async function getWorkbenchTask(c: any): Promise { updatedAtMs: record.updatedAt, branch: record.branchName, pullRequest: await readPullRequestSummary(c, record.branchName), - tabs, + sessionsSummary: sessions.map((meta) => buildSessionSummary(record, meta)), + }; +} + +/** + * Builds a WorkbenchTaskDetail from local task actor state for direct task + * subscribers. This is a full replacement payload, not a patch. + */ +export async function buildTaskDetail(c: any): Promise { + const record = await ensureWorkbenchSeeded(c); + const gitState = await readCachedGitState(c); + const sessions = await listSessionMetaRows(c); + await maybeScheduleWorkbenchRefreshes(c, record, sessions); + const summary = await buildTaskSummary(c); + + return { + ...summary, + task: record.task, + agentType: record.agentType === "claude" || record.agentType === "codex" ? record.agentType : null, + runtimeStatus: record.status, + statusMessage: record.statusMessage ?? null, + activeSessionId: record.activeSessionId ?? null, + diffStat: record.diffStat ?? null, + prUrl: record.prUrl ?? null, + reviewStatus: record.reviewStatus ?? null, fileChanges: gitState.fileChanges, diffs: gitState.diffs, fileTree: gitState.fileTree, minutesUsed: 0, + sandboxes: (record.sandboxes ?? []).map((sandbox: any) => ({ + providerId: sandbox.providerId, + sandboxId: sandbox.sandboxId, + cwd: sandbox.cwd ?? null, + })), + activeSandboxId: record.activeSandboxId ?? null, }; } +/** + * Builds a WorkbenchSessionDetail for a specific session tab. + */ +export async function buildSessionDetail(c: any, tabId: string): Promise { + const record = await ensureWorkbenchSeeded(c); + const meta = await readSessionMeta(c, tabId); + if (!meta || meta.closed) { + throw new Error(`Unknown workbench session tab: ${tabId}`); + } + + return buildSessionDetailFromMeta(record, meta); +} + +export async function getTaskSummary(c: any): Promise { + return await buildTaskSummary(c); +} + +export async function getTaskDetail(c: any): Promise { + return await buildTaskDetail(c); +} + +export async function getSessionDetail(c: any, tabId: string): Promise { + return await buildSessionDetail(c, tabId); +} + +/** + * Replaces the old notifyWorkbenchUpdated pattern. + * + * The task actor emits two kinds of updates: + * - Push summary state up to the parent workspace actor so the sidebar + * materialized projection stays current. + * - Broadcast full detail/session payloads down to direct task subscribers. + */ +export async function broadcastTaskUpdate(c: any, options?: { sessionId?: string }): Promise { + const workspace = await getOrCreateWorkspace(c, c.state.workspaceId); + await workspace.applyTaskSummaryUpdate({ taskSummary: await buildTaskSummary(c) }); + c.broadcast("taskUpdated", { + type: "taskDetailUpdated", + detail: await buildTaskDetail(c), + }); + + if (options?.sessionId) { + c.broadcast("sessionUpdated", { + type: "sessionUpdated", + session: await buildSessionDetail(c, options.sessionId), + }); + } +} + export async function refreshWorkbenchDerivedState(c: any): Promise { const record = await ensureWorkbenchSeeded(c); const gitState = await collectWorkbenchGitState(c, record); await writeCachedGitState(c, gitState); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c); } export async function refreshWorkbenchSessionTranscript(c: any, sessionId: string): Promise { @@ -670,7 +762,7 @@ export async function refreshWorkbenchSessionTranscript(c: any, sessionId: strin const transcript = await readSessionTranscript(c, record, meta.sandboxSessionId); await writeSessionTranscript(c, meta.tabId, transcript); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId: meta.tabId }); } export async function renameWorkbenchTask(c: any, value: string): Promise { @@ -688,7 +780,7 @@ export async function renameWorkbenchTask(c: any, value: string): Promise .where(eq(taskTable.id, 1)) .run(); c.state.title = nextTitle; - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c); } export async function renameWorkbenchBranch(c: any, value: string): Promise { @@ -739,7 +831,7 @@ export async function renameWorkbenchBranch(c: any, value: string): Promise { @@ -755,7 +847,7 @@ export async function createWorkbenchSession(c: any, model?: string): Promise<{ sessionName: "Session 1", status: "ready", }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId: record.activeSessionId }); return { tabId: record.activeSessionId }; } } @@ -780,7 +872,7 @@ export async function createWorkbenchSession(c: any, model?: string): Promise<{ wait: false, }, ); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId: tabId }); return { tabId }; } @@ -815,7 +907,7 @@ export async function ensureWorkbenchSession(c: any, tabId: string, model?: stri await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", { sessionId: record.activeSessionId, }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId: tabId }); return; } @@ -827,7 +919,7 @@ export async function ensureWorkbenchSession(c: any, tabId: string, model?: stri await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", { sessionId: meta.sandboxSessionId, }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId: tabId }); return; } @@ -838,7 +930,7 @@ export async function ensureWorkbenchSession(c: any, tabId: string, model?: stri status: "error", errorMessage: "cannot create session without a sandbox cwd", }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId: tabId }); return; } @@ -873,7 +965,7 @@ export async function ensureWorkbenchSession(c: any, tabId: string, model?: stri }); } - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId: tabId }); } export async function enqueuePendingWorkbenchSessions(c: any): Promise { @@ -904,14 +996,14 @@ export async function renameWorkbenchSession(c: any, sessionId: string, title: s await updateSessionMeta(c, sessionId, { sessionName: trimmed, }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId }); } export async function setWorkbenchSessionUnread(c: any, sessionId: string, unread: boolean): Promise { await updateSessionMeta(c, sessionId, { unread: unread ? 1 : 0, }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId }); } export async function updateWorkbenchDraft(c: any, sessionId: string, text: string, attachments: Array): Promise { @@ -920,14 +1012,14 @@ export async function updateWorkbenchDraft(c: any, sessionId: string, text: stri draftAttachmentsJson: JSON.stringify(attachments), draftUpdatedAt: Date.now(), }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId }); } export async function changeWorkbenchModel(c: any, sessionId: string, model: string): Promise { await updateSessionMeta(c, sessionId, { model, }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId }); } export async function sendWorkbenchMessage(c: any, sessionId: string, text: string, attachments: Array): Promise { @@ -984,7 +1076,7 @@ export async function sendWorkbenchMessage(c: any, sessionId: string, text: stri await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", { sessionId: meta.sandboxSessionId, }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId }); } export async function stopWorkbenchSession(c: any, sessionId: string): Promise { @@ -998,7 +1090,7 @@ export async function stopWorkbenchSession(c: any, sessionId: string): Promise { @@ -1063,7 +1155,7 @@ export async function syncWorkbenchSessionStatus(c: any, sessionId: string, stat }); await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_derived", {}); } - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId: meta.tabId }); } } @@ -1096,7 +1188,7 @@ export async function closeWorkbenchSession(c: any, sessionId: string): Promise< .where(eq(taskRuntime.id, 1)) .run(); } - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c); } export async function markWorkbenchUnread(c: any): Promise { @@ -1108,7 +1200,7 @@ export async function markWorkbenchUnread(c: any): Promise { await updateSessionMeta(c, latest.tabId, { unread: 1, }); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c, { sessionId: latest.tabId }); } export async function publishWorkbenchPr(c: any): Promise { @@ -1129,7 +1221,7 @@ export async function publishWorkbenchPr(c: any): Promise { }) .where(eq(taskTable.id, 1)) .run(); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c); } export async function revertWorkbenchFile(c: any, path: string): Promise { @@ -1152,5 +1244,5 @@ export async function revertWorkbenchFile(c: any, path: string): Promise { throw new Error(`file revert failed (${result.exitCode}): ${result.result}`); } await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_derived", {}); - await notifyWorkbenchUpdated(c); + await broadcastTaskUpdate(c); } diff --git a/foundry/packages/backend/src/actors/task/workflow/common.ts b/foundry/packages/backend/src/actors/task/workflow/common.ts index 251c288..0dfc667 100644 --- a/foundry/packages/backend/src/actors/task/workflow/common.ts +++ b/foundry/packages/backend/src/actors/task/workflow/common.ts @@ -1,9 +1,9 @@ // @ts-nocheck import { eq } from "drizzle-orm"; import type { TaskRecord, TaskStatus } from "@sandbox-agent/foundry-shared"; -import { getOrCreateWorkspace } from "../../handles.js"; import { task as taskTable, taskRuntime, taskSandboxes } from "../db/schema.js"; import { historyKey } from "../../keys.js"; +import { broadcastTaskUpdate } from "../workbench.js"; export const TASK_ROW_ID = 1; @@ -83,8 +83,7 @@ export async function setTaskState(ctx: any, status: TaskStatus, statusMessage?: .run(); } - const workspace = await getOrCreateWorkspace(ctx, ctx.state.workspaceId); - await workspace.notifyWorkbenchUpdated({}); + await broadcastTaskUpdate(ctx); } export async function getCurrentRecord(ctx: any): Promise { @@ -176,6 +175,5 @@ export async function appendHistory(ctx: any, kind: string, payload: Record { + await db.execute(`ALTER TABLE task_runtime ADD COLUMN git_state_json text`).catch(() => {}); + await db.execute(`ALTER TABLE task_runtime ADD COLUMN git_state_updated_at integer`).catch(() => {}); + await db.execute(`ALTER TABLE task_runtime ADD COLUMN provision_stage text`).catch(() => {}); + await db.execute(`ALTER TABLE task_runtime ADD COLUMN provision_stage_updated_at integer`).catch(() => {}); +} + async function withActivityTimeout(timeoutMs: number, label: string, run: () => Promise): Promise { let timer: ReturnType | null = null; try { @@ -61,6 +68,8 @@ export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise< const initialStatusMessage = loopCtx.state.branchName && loopCtx.state.title ? "provisioning" : "naming"; try { + await ensureTaskRuntimeCacheColumns(db); + await db .insert(taskTable) .values({ diff --git a/foundry/packages/backend/src/actors/workspace/actions.ts b/foundry/packages/backend/src/actors/workspace/actions.ts index 51843fb..0ba55f8 100644 --- a/foundry/packages/backend/src/actors/workspace/actions.ts +++ b/foundry/packages/backend/src/actors/workspace/actions.ts @@ -4,6 +4,17 @@ import { Loop } from "rivetkit/workflow"; import type { AddRepoInput, CreateTaskInput, + HistoryEvent, + HistoryQueryInput, + ListTasksInput, + ProviderId, + RepoOverview, + RepoRecord, + RepoStackActionInput, + RepoStackActionResult, + StarSandboxAgentRepoInput, + StarSandboxAgentRepoResult, + SwitchResult, TaskRecord, TaskSummary, TaskWorkbenchChangeModelInput, @@ -14,20 +25,13 @@ import type { TaskWorkbenchSelectInput, TaskWorkbenchSetSessionUnreadInput, TaskWorkbenchSendMessageInput, - TaskWorkbenchSnapshot, TaskWorkbenchTabInput, TaskWorkbenchUpdateDraftInput, - HistoryEvent, - HistoryQueryInput, - ListTasksInput, - ProviderId, - RepoOverview, - RepoStackActionInput, - RepoStackActionResult, - RepoRecord, - StarSandboxAgentRepoInput, - StarSandboxAgentRepoResult, - SwitchResult, + WorkbenchRepoSummary, + WorkbenchSessionSummary, + WorkbenchTaskSummary, + WorkspaceEvent, + WorkspaceSummarySnapshot, WorkspaceUseInput, } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../context.js"; @@ -35,7 +39,7 @@ import { getTask, getOrCreateHistory, getOrCreateProject, selfWorkspace } from " import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { normalizeRemoteUrl, repoIdFromRemote } from "../../services/repo.js"; import { resolveWorkspaceGithubAuth } from "../../services/github-auth.js"; -import { taskLookup, repos, providerProfiles } from "./db/schema.js"; +import { taskLookup, repos, providerProfiles, taskSummaries } from "./db/schema.js"; import { agentTypeForModel } from "../task/workbench.js"; import { expectQueueResponse } from "../../services/queue.js"; import { workspaceAppActions } from "./app-shell.js"; @@ -109,6 +113,18 @@ async function upsertTaskLookupRow(c: any, taskId: string, repoId: string): Prom .run(); } +function parseJsonValue(value: string | null | undefined, fallback: T): T { + if (!value) { + return fallback; + } + + try { + return JSON.parse(value) as T; + } catch { + return fallback; + } +} + async function collectAllTaskSummaries(c: any): Promise { const repoRows = await c.db.select({ repoId: repos.repoId, remoteUrl: repos.remoteUrl }).from(repos).orderBy(desc(repos.updatedAt)).all(); @@ -145,17 +161,55 @@ function repoLabelFromRemote(remoteUrl: string): string { return remoteUrl; } -async function buildWorkbenchSnapshot(c: any): Promise { +function buildRepoSummary(repoRow: { repoId: string; remoteUrl: string; updatedAt: number }, taskRows: WorkbenchTaskSummary[]): WorkbenchRepoSummary { + const repoTasks = taskRows.filter((task) => task.repoId === repoRow.repoId); + const latestActivityMs = repoTasks.reduce((latest, task) => Math.max(latest, task.updatedAtMs), repoRow.updatedAt); + + return { + id: repoRow.repoId, + label: repoLabelFromRemote(repoRow.remoteUrl), + taskCount: repoTasks.length, + latestActivityMs, + }; +} + +function taskSummaryRowFromSummary(taskSummary: WorkbenchTaskSummary) { + return { + taskId: taskSummary.id, + repoId: taskSummary.repoId, + title: taskSummary.title, + status: taskSummary.status, + repoName: taskSummary.repoName, + updatedAtMs: taskSummary.updatedAtMs, + branch: taskSummary.branch, + pullRequestJson: JSON.stringify(taskSummary.pullRequest), + sessionsSummaryJson: JSON.stringify(taskSummary.sessionsSummary), + }; +} + +function taskSummaryFromRow(row: any): WorkbenchTaskSummary { + return { + id: row.taskId, + repoId: row.repoId, + title: row.title, + status: row.status, + repoName: row.repoName, + updatedAtMs: row.updatedAtMs, + branch: row.branch ?? null, + pullRequest: parseJsonValue(row.pullRequestJson, null), + sessionsSummary: parseJsonValue(row.sessionsSummaryJson, []), + }; +} + +async function reconcileWorkbenchProjection(c: any): Promise { const repoRows = await c.db .select({ repoId: repos.repoId, remoteUrl: repos.remoteUrl, updatedAt: repos.updatedAt }) .from(repos) .orderBy(desc(repos.updatedAt)) .all(); - const tasks: Array = []; - const projects: Array = []; + const taskRows: WorkbenchTaskSummary[] = []; for (const row of repoRows) { - const projectTasks: Array = []; try { const project = await getOrCreateProject(c, c.state.workspaceId, row.repoId, row.remoteUrl); const summaries = await project.listTaskSummaries({ includeArchived: true }); @@ -163,11 +217,18 @@ async function buildWorkbenchSnapshot(c: any): Promise { try { await upsertTaskLookupRow(c, summary.taskId, row.repoId); const task = getTask(c, c.state.workspaceId, row.repoId, summary.taskId); - const snapshot = await task.getWorkbench({}); - tasks.push(snapshot); - projectTasks.push(snapshot); + const taskSummary = await task.getTaskSummary({}); + taskRows.push(taskSummary); + await c.db + .insert(taskSummaries) + .values(taskSummaryRowFromSummary(taskSummary)) + .onConflictDoUpdate({ + target: taskSummaries.taskId, + set: taskSummaryRowFromSummary(taskSummary), + }) + .run(); } catch (error) { - logActorWarning("workspace", "failed collecting workbench task", { + logActorWarning("workspace", "failed collecting task summary during reconciliation", { workspaceId: c.state.workspaceId, repoId: row.repoId, taskId: summary.taskId, @@ -175,17 +236,8 @@ async function buildWorkbenchSnapshot(c: any): Promise { }); } } - - if (projectTasks.length > 0) { - projects.push({ - id: row.repoId, - label: repoLabelFromRemote(row.remoteUrl), - updatedAtMs: projectTasks[0]?.updatedAtMs ?? row.updatedAt, - tasks: projectTasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs), - }); - } } catch (error) { - logActorWarning("workspace", "failed collecting workbench repo snapshot", { + logActorWarning("workspace", "failed collecting repo during workbench reconciliation", { workspaceId: c.state.workspaceId, repoId: row.repoId, error: resolveErrorMessage(error), @@ -193,16 +245,11 @@ async function buildWorkbenchSnapshot(c: any): Promise { } } - tasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs); - projects.sort((left, right) => right.updatedAtMs - left.updatedAtMs); + taskRows.sort((left, right) => right.updatedAtMs - left.updatedAtMs); return { workspaceId: c.state.workspaceId, - repos: repoRows.map((row) => ({ - id: row.repoId, - label: repoLabelFromRemote(row.remoteUrl), - })), - projects, - tasks, + repos: repoRows.map((row) => buildRepoSummary(row, taskRows)).sort((left, right) => right.latestActivityMs - left.latestActivityMs), + taskSummaries: taskRows, }; } @@ -211,6 +258,41 @@ async function requireWorkbenchTask(c: any, taskId: string) { return getTask(c, c.state.workspaceId, repoId, taskId); } +/** + * Reads the workspace sidebar snapshot from the workspace actor's local SQLite + * only. Task actors push summary updates into `task_summaries`, so clients do + * not need this action to fan out to every child actor on the hot read path. + */ +async function getWorkspaceSummarySnapshot(c: any): Promise { + const repoRows = await c.db + .select({ + repoId: repos.repoId, + remoteUrl: repos.remoteUrl, + updatedAt: repos.updatedAt, + }) + .from(repos) + .orderBy(desc(repos.updatedAt)) + .all(); + const taskRows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all(); + const summaries = taskRows.map(taskSummaryFromRow); + + return { + workspaceId: c.state.workspaceId, + repos: repoRows.map((row) => buildRepoSummary(row, summaries)).sort((left, right) => right.latestActivityMs - left.latestActivityMs), + taskSummaries: summaries, + }; +} + +async function broadcastRepoSummary( + c: any, + type: "repoAdded" | "repoUpdated", + repoRow: { repoId: string; remoteUrl: string; updatedAt: number }, +): Promise { + const matchingTaskRows = await c.db.select().from(taskSummaries).where(eq(taskSummaries.repoId, repoRow.repoId)).all(); + const repo = buildRepoSummary(repoRow, matchingTaskRows.map(taskSummaryFromRow)); + c.broadcast("workspaceUpdated", { type, repo } satisfies WorkspaceEvent); +} + async function addRepoMutation(c: any, input: AddRepoInput): Promise { assertWorkspace(c, input.workspaceId); @@ -225,6 +307,7 @@ async function addRepoMutation(c: any, input: AddRepoInput): Promise const repoId = repoIdFromRemote(remoteUrl); const now = Date.now(); + const existing = await c.db.select({ repoId: repos.repoId }).from(repos).where(eq(repos.repoId, repoId)).get(); await c.db .insert(repos) @@ -243,7 +326,11 @@ async function addRepoMutation(c: any, input: AddRepoInput): Promise }) .run(); - await workspaceActions.notifyWorkbenchUpdated(c); + await broadcastRepoSummary(c, existing ? "repoUpdated" : "repoAdded", { + repoId, + remoteUrl, + updatedAt: now, + }); return { workspaceId: c.state.workspaceId, repoId, @@ -306,7 +393,20 @@ async function createTaskMutation(c: any, input: CreateTaskInput): Promise { - assertWorkspace(c, input.workspaceId); - return await buildWorkbenchSnapshot(c); + /** + * Called by task actors when their summary-level state changes. + * This is the write path for the local materialized projection; clients read + * the projection via `getWorkspaceSummary`, but only task actors should push + * rows into it. + */ + async applyTaskSummaryUpdate(c: any, input: { taskSummary: WorkbenchTaskSummary }): Promise { + await c.db + .insert(taskSummaries) + .values(taskSummaryRowFromSummary(input.taskSummary)) + .onConflictDoUpdate({ + target: taskSummaries.taskId, + set: taskSummaryRowFromSummary(input.taskSummary), + }) + .run(); + c.broadcast("workspaceUpdated", { type: "taskSummaryUpdated", taskSummary: input.taskSummary } satisfies WorkspaceEvent); }, - async notifyWorkbenchUpdated(c: any): Promise { - c.broadcast("workbenchUpdated", { at: Date.now() }); + async removeTaskSummary(c: any, input: { taskId: string }): Promise { + await c.db.delete(taskSummaries).where(eq(taskSummaries.taskId, input.taskId)).run(); + c.broadcast("workspaceUpdated", { type: "taskRemoved", taskId: input.taskId } satisfies WorkspaceEvent); + }, + + async getWorkspaceSummary(c: any, input: WorkspaceUseInput): Promise { + assertWorkspace(c, input.workspaceId); + return await getWorkspaceSummarySnapshot(c); + }, + + async reconcileWorkbenchState(c: any, input: WorkspaceUseInput): Promise { + assertWorkspace(c, input.workspaceId); + return await reconcileWorkbenchProjection(c); }, async createWorkbenchTask(c: any, input: TaskWorkbenchCreateTaskInput): Promise<{ taskId: string; tabId?: string }> { diff --git a/foundry/packages/backend/src/actors/workspace/app-shell.ts b/foundry/packages/backend/src/actors/workspace/app-shell.ts index e9fd32e..c259474 100644 --- a/foundry/packages/backend/src/actors/workspace/app-shell.ts +++ b/foundry/packages/backend/src/actors/workspace/app-shell.ts @@ -152,6 +152,10 @@ function encodeEligibleOrganizationIds(value: string[]): string { return JSON.stringify([...new Set(value)]); } +function errorMessage(error: unknown): string { + return error instanceof Error ? error.message : String(error); +} + function seatsIncludedForPlan(planId: FoundryBillingPlanId): number { switch (planId) { case "free": @@ -217,7 +221,76 @@ async function getOrganizationState(workspace: any) { return await workspace.getOrganizationShellState({}); } -async function buildAppSnapshot(c: any, sessionId: string): Promise { +async function getOrganizationStateIfInitialized(workspace: any) { + return await workspace.getOrganizationShellStateIfInitialized({}); +} + +async function listSnapshotOrganizations(c: any, sessionId: string, organizationIds: string[]) { + const results = await Promise.all( + organizationIds.map(async (organizationId) => { + const organizationStartedAt = performance.now(); + try { + const workspace = await getOrCreateWorkspace(c, organizationId); + const organizationState = await getOrganizationStateIfInitialized(workspace); + if (!organizationState) { + logger.warn( + { + sessionId, + workspaceId: c.state.workspaceId, + organizationId, + durationMs: roundDurationMs(organizationStartedAt), + }, + "build_app_snapshot_organization_uninitialized", + ); + return { organizationId, snapshot: null, status: "uninitialized" as const }; + } + logger.info( + { + sessionId, + workspaceId: c.state.workspaceId, + organizationId, + durationMs: roundDurationMs(organizationStartedAt), + }, + "build_app_snapshot_organization_completed", + ); + return { organizationId, snapshot: organizationState.snapshot, status: "ok" as const }; + } catch (error) { + const message = errorMessage(error); + if (!message.includes("Actor not found")) { + logger.error( + { + sessionId, + workspaceId: c.state.workspaceId, + organizationId, + durationMs: roundDurationMs(organizationStartedAt), + errorMessage: message, + errorStack: error instanceof Error ? error.stack : undefined, + }, + "build_app_snapshot_organization_failed", + ); + throw error; + } + logger.info( + { + sessionId, + workspaceId: c.state.workspaceId, + organizationId, + durationMs: roundDurationMs(organizationStartedAt), + }, + "build_app_snapshot_organization_missing", + ); + return { organizationId, snapshot: null, status: "missing" as const }; + } + }), + ); + + return { + organizations: results.map((result) => result.snapshot).filter((organization): organization is FoundryOrganization => organization !== null), + uninitializedOrganizationIds: results.filter((result) => result.status === "uninitialized").map((result) => result.organizationId), + }; +} + +async function buildAppSnapshot(c: any, sessionId: string, allowOrganizationRepair = true): Promise { assertAppWorkspace(c); const startedAt = performance.now(); const auth = getBetterAuthService(); @@ -252,53 +325,31 @@ async function buildAppSnapshot(c: any, sessionId: string): Promise { - const organizationStartedAt = performance.now(); - try { - const workspace = await getOrCreateWorkspace(c, organizationId); - const organizationState = await getOrganizationState(workspace); - logger.info( - { - sessionId, - workspaceId: c.state.workspaceId, - organizationId, - durationMs: roundDurationMs(organizationStartedAt), - }, - "build_app_snapshot_organization_completed", - ); - return organizationState.snapshot; - } catch (error) { - const message = error instanceof Error ? error.message : String(error); - if (!message.includes("Actor not found")) { - logger.error( - { - sessionId, - workspaceId: c.state.workspaceId, - organizationId, - durationMs: roundDurationMs(organizationStartedAt), - errorMessage: message, - errorStack: error instanceof Error ? error.stack : undefined, - }, - "build_app_snapshot_organization_failed", - ); - throw error; - } - logger.info( - { - sessionId, - workspaceId: c.state.workspaceId, - organizationId, - durationMs: roundDurationMs(organizationStartedAt), - }, - "build_app_snapshot_organization_missing", - ); - return null; - } - }), - ) - ).filter((organization): organization is FoundryOrganization => organization !== null); + let { organizations, uninitializedOrganizationIds } = await listSnapshotOrganizations(c, sessionId, eligibleOrganizationIds); + + if (allowOrganizationRepair && uninitializedOrganizationIds.length > 0) { + const token = await auth.getAccessTokenForSession(sessionId); + if (token?.accessToken) { + logger.info( + { + sessionId, + workspaceId: c.state.workspaceId, + organizationIds: uninitializedOrganizationIds, + }, + "build_app_snapshot_repairing_organizations", + ); + await syncGithubOrganizationsInternal(c, { sessionId, accessToken: token.accessToken }, { broadcast: false }); + return await buildAppSnapshot(c, sessionId, false); + } + logger.warn( + { + sessionId, + workspaceId: c.state.workspaceId, + organizationIds: uninitializedOrganizationIds, + }, + "build_app_snapshot_repair_skipped_no_access_token", + ); + } const currentUser: FoundryUser | null = user ? { @@ -466,6 +517,10 @@ async function safeListInstallations(accessToken: string): Promise { * already returned a redirect to the browser. */ export async function syncGithubOrganizations(c: any, input: { sessionId: string; accessToken: string }): Promise { + await syncGithubOrganizationsInternal(c, input, { broadcast: true }); +} + +async function syncGithubOrganizationsInternal(c: any, input: { sessionId: string; accessToken: string }, options: { broadcast: boolean }): Promise { assertAppWorkspace(c); const auth = getBetterAuthService(); const { appShell } = getActorRuntimeContext(); @@ -532,7 +587,13 @@ export async function syncGithubOrganizations(c: any, input: { sessionId: string roleLabel: "GitHub user", eligibleOrganizationIdsJson: encodeEligibleOrganizationIds(linkedOrganizationIds), }); - c.broadcast("appUpdated", { at: Date.now(), sessionId }); + if (!options.broadcast) { + return; + } + c.broadcast("appUpdated", { + type: "appUpdated", + snapshot: await buildAppSnapshot(c, sessionId), + }); } export async function syncGithubOrganizationRepos(c: any, input: { sessionId: string; organizationId: string }): Promise { @@ -639,6 +700,19 @@ async function listOrganizationRepoCatalog(c: any): Promise { async function buildOrganizationState(c: any) { const startedAt = performance.now(); const row = await requireOrganizationProfileRow(c); + return await buildOrganizationStateFromRow(c, row, startedAt); +} + +async function buildOrganizationStateIfInitialized(c: any) { + const startedAt = performance.now(); + const row = await readOrganizationProfileRow(c); + if (!row) { + return null; + } + return await buildOrganizationStateFromRow(c, row, startedAt); +} + +async function buildOrganizationStateFromRow(c: any, row: any, startedAt: number) { const repoCatalog = await listOrganizationRepoCatalog(c); const members = await listOrganizationMembers(c); const seatAssignmentEmails = await listOrganizationSeatAssignments(c); @@ -1579,6 +1653,11 @@ export const workspaceAppActions = { return await buildOrganizationState(c); }, + async getOrganizationShellStateIfInitialized(c: any): Promise { + assertOrganizationWorkspace(c); + return await buildOrganizationStateIfInitialized(c); + }, + async updateOrganizationShellProfile(c: any, input: Pick): Promise { assertOrganizationWorkspace(c); const existing = await requireOrganizationProfileRow(c); diff --git a/foundry/packages/backend/src/actors/workspace/db/migrations.ts b/foundry/packages/backend/src/actors/workspace/db/migrations.ts index a0f2f74..607eb19 100644 --- a/foundry/packages/backend/src/actors/workspace/db/migrations.ts +++ b/foundry/packages/backend/src/actors/workspace/db/migrations.ts @@ -16,6 +16,12 @@ const journal = { tag: "0001_auth_index_tables", breakpoints: true, }, + { + idx: 2, + when: 1773720000000, + tag: "0002_task_summaries", + breakpoints: true, + }, ], } as const; @@ -150,6 +156,18 @@ CREATE TABLE IF NOT EXISTS \`auth_verification\` ( \`created_at\` integer NOT NULL, \`updated_at\` integer NOT NULL ); +`, + m0002: `CREATE TABLE IF NOT EXISTS \`task_summaries\` ( + \`task_id\` text PRIMARY KEY NOT NULL, + \`repo_id\` text NOT NULL, + \`title\` text NOT NULL, + \`status\` text NOT NULL, + \`repo_name\` text NOT NULL, + \`updated_at_ms\` integer NOT NULL, + \`branch\` text, + \`pull_request_json\` text, + \`sessions_summary_json\` text DEFAULT '[]' NOT NULL +); `, } as const, }; diff --git a/foundry/packages/backend/src/actors/workspace/db/schema.ts b/foundry/packages/backend/src/actors/workspace/db/schema.ts index ca40a88..93082af 100644 --- a/foundry/packages/backend/src/actors/workspace/db/schema.ts +++ b/foundry/packages/backend/src/actors/workspace/db/schema.ts @@ -20,6 +20,23 @@ export const taskLookup = sqliteTable("task_lookup", { repoId: text("repo_id").notNull(), }); +/** + * Materialized sidebar projection maintained by task actors. + * The source of truth still lives on each task actor; this table exists so + * workspace reads can stay local and avoid fan-out across child actors. + */ +export const taskSummaries = sqliteTable("task_summaries", { + taskId: text("task_id").notNull().primaryKey(), + repoId: text("repo_id").notNull(), + title: text("title").notNull(), + status: text("status").notNull(), + repoName: text("repo_name").notNull(), + updatedAtMs: integer("updated_at_ms").notNull(), + branch: text("branch"), + pullRequestJson: text("pull_request_json"), + sessionsSummaryJson: text("sessions_summary_json").notNull().default("[]"), +}); + export const organizationProfile = sqliteTable("organization_profile", { id: text("id").notNull().primaryKey(), kind: text("kind").notNull(), diff --git a/foundry/packages/client/package.json b/foundry/packages/client/package.json index 7d558f0..98079d5 100644 --- a/foundry/packages/client/package.json +++ b/foundry/packages/client/package.json @@ -15,10 +15,12 @@ }, "dependencies": { "@sandbox-agent/foundry-shared": "workspace:*", + "react": "^19.1.1", "rivetkit": "2.1.6", "sandbox-agent": "workspace:*" }, "devDependencies": { + "@types/react": "^19.1.12", "tsup": "^8.5.0" } } diff --git a/foundry/packages/client/src/backend-client.ts b/foundry/packages/client/src/backend-client.ts index 19c119f..6d27504 100644 --- a/foundry/packages/client/src/backend-client.ts +++ b/foundry/packages/client/src/backend-client.ts @@ -6,6 +6,9 @@ import type { FoundryAppSnapshot, FoundryBillingPlanId, CreateTaskInput, + AppEvent, + SessionEvent, + SandboxProcessesEvent, TaskRecord, TaskSummary, TaskWorkbenchChangeModelInput, @@ -20,6 +23,12 @@ import type { TaskWorkbenchSnapshot, TaskWorkbenchTabInput, TaskWorkbenchUpdateDraftInput, + TaskEvent, + WorkbenchTaskDetail, + WorkbenchTaskSummary, + WorkbenchSessionDetail, + WorkspaceEvent, + WorkspaceSummarySnapshot, HistoryEvent, HistoryQueryInput, ProviderId, @@ -34,7 +43,7 @@ import type { } from "@sandbox-agent/foundry-shared"; import type { ProcessCreateRequest, ProcessInfo, ProcessLogFollowQuery, ProcessLogsResponse, ProcessSignalQuery } from "sandbox-agent"; import { createMockBackendClient } from "./mock/backend-client.js"; -import { sandboxInstanceKey, workspaceKey } from "./keys.js"; +import { sandboxInstanceKey, taskKey, workspaceKey } from "./keys.js"; export type TaskAction = "push" | "sync" | "merge" | "archive" | "kill"; @@ -60,7 +69,14 @@ export interface SandboxSessionEventRecord { export type SandboxProcessRecord = ProcessInfo; +export interface ActorConn { + on(event: string, listener: (payload: any) => void): () => void; + onError(listener: (error: unknown) => void): () => void; + dispose(): Promise; +} + interface WorkspaceHandle { + connect(): ActorConn; addRepo(input: AddRepoInput): Promise; listRepos(input: { workspaceId: string }): Promise; createTask(input: CreateTaskInput): Promise; @@ -78,7 +94,10 @@ interface WorkspaceHandle { killTask(input: { workspaceId: string; taskId: string; reason?: string }): Promise; useWorkspace(input: { workspaceId: string }): Promise<{ workspaceId: string }>; starSandboxAgentRepo(input: StarSandboxAgentRepoInput): Promise; - getWorkbench(input: { workspaceId: string }): Promise; + getWorkspaceSummary(input: { workspaceId: string }): Promise; + applyTaskSummaryUpdate(input: { taskSummary: WorkbenchTaskSummary }): Promise; + removeTaskSummary(input: { taskId: string }): Promise; + reconcileWorkbenchState(input: { workspaceId: string }): Promise; createWorkbenchTask(input: TaskWorkbenchCreateTaskInput): Promise; markWorkbenchUnread(input: TaskWorkbenchSelectInput): Promise; renameWorkbenchTask(input: TaskWorkbenchRenameInput): Promise; @@ -95,7 +114,15 @@ interface WorkspaceHandle { revertWorkbenchFile(input: TaskWorkbenchDiffInput): Promise; } +interface TaskHandle { + getTaskSummary(): Promise; + getTaskDetail(): Promise; + getSessionDetail(input: { sessionId: string }): Promise; + connect(): ActorConn; +} + interface SandboxInstanceHandle { + connect(): ActorConn; createSession(input: { prompt: string; cwd?: string; @@ -119,6 +146,10 @@ interface RivetClient { workspace: { getOrCreate(key?: string | string[], opts?: { createWithInput?: unknown }): WorkspaceHandle; }; + task: { + get(key?: string | string[]): TaskHandle; + getOrCreate(key?: string | string[], opts?: { createWithInput?: unknown }): TaskHandle; + }; sandboxInstance: { getOrCreate(key?: string | string[], opts?: { createWithInput?: unknown }): SandboxInstanceHandle; }; @@ -132,6 +163,9 @@ export interface BackendClientOptions { export interface BackendClient { getAppSnapshot(): Promise; + connectWorkspace(workspaceId: string): Promise; + connectTask(workspaceId: string, repoId: string, taskId: string): Promise; + connectSandbox(workspaceId: string, providerId: ProviderId, sandboxId: string): Promise; subscribeApp(listener: () => void): () => void; signInWithGithub(): Promise; signOutApp(): Promise; @@ -222,6 +256,9 @@ export interface BackendClient { sandboxId: string, ): Promise<{ providerId: ProviderId; sandboxId: string; state: string; at: number }>; getSandboxAgentConnection(workspaceId: string, providerId: ProviderId, sandboxId: string): Promise<{ endpoint: string; token?: string }>; + getWorkspaceSummary(workspaceId: string): Promise; + getTaskDetail(workspaceId: string, repoId: string, taskId: string): Promise; + getSessionDetail(workspaceId: string, repoId: string, taskId: string, sessionId: string): Promise; getWorkbench(workspaceId: string): Promise; subscribeWorkbench(workspaceId: string, listener: () => void): () => void; createWorkbenchTask(workspaceId: string, input: TaskWorkbenchCreateTaskInput): Promise; @@ -337,6 +374,8 @@ export function createBackendClient(options: BackendClientOptions): BackendClien createWithInput: workspaceId, }); + const task = async (workspaceId: string, repoId: string, taskId: string): Promise => client.task.get(taskKey(workspaceId, repoId, taskId)); + const sandboxByKey = async (workspaceId: string, providerId: ProviderId, sandboxId: string): Promise => { return (client as any).sandboxInstance.get(sandboxInstanceKey(workspaceId, providerId, sandboxId)); }; @@ -400,6 +439,91 @@ export function createBackendClient(options: BackendClientOptions): BackendClien } }; + const connectWorkspace = async (workspaceId: string): Promise => { + return (await workspace(workspaceId)).connect() as ActorConn; + }; + + const connectTask = async (workspaceId: string, repoId: string, taskIdValue: string): Promise => { + return (await task(workspaceId, repoId, taskIdValue)).connect() as ActorConn; + }; + + const connectSandbox = async (workspaceId: string, providerId: ProviderId, sandboxId: string): Promise => { + try { + return (await sandboxByKey(workspaceId, providerId, sandboxId)).connect() as ActorConn; + } catch (error) { + if (!isActorNotFoundError(error)) { + throw error; + } + const fallback = await sandboxByActorIdFromTask(workspaceId, providerId, sandboxId); + if (!fallback) { + throw error; + } + return fallback.connect() as ActorConn; + } + }; + + const getWorkbenchCompat = async (workspaceId: string): Promise => { + const summary = await (await workspace(workspaceId)).getWorkspaceSummary({ workspaceId }); + const tasks = await Promise.all( + summary.taskSummaries.map(async (taskSummary) => { + const detail = await (await task(workspaceId, taskSummary.repoId, taskSummary.id)).getTaskDetail(); + const sessionDetails = await Promise.all( + detail.sessionsSummary.map(async (session) => { + const full = await (await task(workspaceId, detail.repoId, detail.id)).getSessionDetail({ sessionId: session.id }); + return [session.id, full] as const; + }), + ); + const sessionDetailsById = new Map(sessionDetails); + return { + id: detail.id, + repoId: detail.repoId, + title: detail.title, + status: detail.status, + repoName: detail.repoName, + updatedAtMs: detail.updatedAtMs, + branch: detail.branch, + pullRequest: detail.pullRequest, + tabs: detail.sessionsSummary.map((session) => { + const full = sessionDetailsById.get(session.id); + return { + id: session.id, + sessionId: session.sessionId, + sessionName: session.sessionName, + agent: session.agent, + model: session.model, + status: session.status, + thinkingSinceMs: session.thinkingSinceMs, + unread: session.unread, + created: session.created, + draft: full?.draft ?? { text: "", attachments: [], updatedAtMs: null }, + transcript: full?.transcript ?? [], + }; + }), + fileChanges: detail.fileChanges, + diffs: detail.diffs, + fileTree: detail.fileTree, + minutesUsed: detail.minutesUsed, + }; + }), + ); + + const projects = summary.repos + .map((repo) => ({ + id: repo.id, + label: repo.label, + updatedAtMs: tasks.filter((task) => task.repoId === repo.id).reduce((latest, task) => Math.max(latest, task.updatedAtMs), repo.latestActivityMs), + tasks: tasks.filter((task) => task.repoId === repo.id).sort((left, right) => right.updatedAtMs - left.updatedAtMs), + })) + .filter((repo) => repo.tasks.length > 0); + + return { + workspaceId, + repos: summary.repos.map((repo) => ({ id: repo.id, label: repo.label })), + projects, + tasks: tasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs), + }; + }; + const subscribeWorkbench = (workspaceId: string, listener: () => void): (() => void) => { let entry = workbenchSubscriptions.get(workspaceId); if (!entry) { @@ -544,6 +668,18 @@ export function createBackendClient(options: BackendClientOptions): BackendClien return await appRequest("/app/snapshot"); }, + async connectWorkspace(workspaceId: string): Promise { + return await connectWorkspace(workspaceId); + }, + + async connectTask(workspaceId: string, repoId: string, taskIdValue: string): Promise { + return await connectTask(workspaceId, repoId, taskIdValue); + }, + + async connectSandbox(workspaceId: string, providerId: ProviderId, sandboxId: string): Promise { + return await connectSandbox(workspaceId, providerId, sandboxId); + }, + subscribeApp(listener: () => void): () => void { return subscribeApp(listener); }, @@ -861,8 +997,20 @@ export function createBackendClient(options: BackendClientOptions): BackendClien return await withSandboxHandle(workspaceId, providerId, sandboxId, async (handle) => handle.sandboxAgentConnection()); }, + async getWorkspaceSummary(workspaceId: string): Promise { + return (await workspace(workspaceId)).getWorkspaceSummary({ workspaceId }); + }, + + async getTaskDetail(workspaceId: string, repoId: string, taskIdValue: string): Promise { + return (await task(workspaceId, repoId, taskIdValue)).getTaskDetail(); + }, + + async getSessionDetail(workspaceId: string, repoId: string, taskIdValue: string, sessionId: string): Promise { + return (await task(workspaceId, repoId, taskIdValue)).getSessionDetail({ sessionId }); + }, + async getWorkbench(workspaceId: string): Promise { - return (await workspace(workspaceId)).getWorkbench({ workspaceId }); + return await getWorkbenchCompat(workspaceId); }, subscribeWorkbench(workspaceId: string, listener: () => void): () => void { diff --git a/foundry/packages/client/src/index.ts b/foundry/packages/client/src/index.ts index a959744..7605986 100644 --- a/foundry/packages/client/src/index.ts +++ b/foundry/packages/client/src/index.ts @@ -1,5 +1,10 @@ export * from "./app-client.js"; export * from "./backend-client.js"; +export * from "./interest/manager.js"; +export * from "./interest/mock-manager.js"; +export * from "./interest/remote-manager.js"; +export * from "./interest/topics.js"; +export * from "./interest/use-interest.js"; export * from "./keys.js"; export * from "./mock-app.js"; export * from "./view-model.js"; diff --git a/foundry/packages/client/src/interest/manager.ts b/foundry/packages/client/src/interest/manager.ts new file mode 100644 index 0000000..b2aab57 --- /dev/null +++ b/foundry/packages/client/src/interest/manager.ts @@ -0,0 +1,24 @@ +import type { TopicData, TopicKey, TopicParams } from "./topics.js"; + +export type TopicStatus = "loading" | "connected" | "error"; + +export interface TopicState { + data: TopicData | undefined; + status: TopicStatus; + error: Error | null; +} + +/** + * The InterestManager owns all realtime actor connections and cached state. + * + * Multiple subscribers to the same topic share one connection and one cache + * entry. After the last subscriber leaves, a short grace period keeps the + * connection warm so navigation does not thrash actor connections. + */ +export interface InterestManager { + subscribe(topicKey: K, params: TopicParams, listener: () => void): () => void; + getSnapshot(topicKey: K, params: TopicParams): TopicData | undefined; + getStatus(topicKey: K, params: TopicParams): TopicStatus; + getError(topicKey: K, params: TopicParams): Error | null; + dispose(): void; +} diff --git a/foundry/packages/client/src/interest/mock-manager.ts b/foundry/packages/client/src/interest/mock-manager.ts new file mode 100644 index 0000000..f1c065e --- /dev/null +++ b/foundry/packages/client/src/interest/mock-manager.ts @@ -0,0 +1,12 @@ +import { createMockBackendClient } from "../mock/backend-client.js"; +import { RemoteInterestManager } from "./remote-manager.js"; + +/** + * Mock implementation shares the same interest-manager harness as the remote + * path, but uses the in-memory mock backend that synthesizes actor events. + */ +export class MockInterestManager extends RemoteInterestManager { + constructor() { + super(createMockBackendClient()); + } +} diff --git a/foundry/packages/client/src/interest/remote-manager.ts b/foundry/packages/client/src/interest/remote-manager.ts new file mode 100644 index 0000000..3016ad0 --- /dev/null +++ b/foundry/packages/client/src/interest/remote-manager.ts @@ -0,0 +1,167 @@ +import type { BackendClient } from "../backend-client.js"; +import type { InterestManager, TopicStatus } from "./manager.js"; +import { topicDefinitions, type TopicData, type TopicDefinition, type TopicKey, type TopicParams } from "./topics.js"; + +const GRACE_PERIOD_MS = 30_000; + +/** + * Remote implementation of InterestManager. + * Each cache entry owns one actor connection plus one materialized snapshot. + */ +export class RemoteInterestManager implements InterestManager { + private entries = new Map>(); + + constructor(private readonly backend: BackendClient) {} + + subscribe(topicKey: K, params: TopicParams, listener: () => void): () => void { + const definition = topicDefinitions[topicKey] as unknown as TopicDefinition; + const cacheKey = definition.key(params as any); + let entry = this.entries.get(cacheKey); + + if (!entry) { + entry = new TopicEntry(definition, this.backend, params as any); + this.entries.set(cacheKey, entry); + } + + entry.cancelTeardown(); + entry.addListener(listener); + entry.ensureStarted(); + + return () => { + const current = this.entries.get(cacheKey); + if (!current) { + return; + } + current.removeListener(listener); + if (current.listenerCount === 0) { + current.scheduleTeardown(GRACE_PERIOD_MS, () => { + this.entries.delete(cacheKey); + }); + } + }; + } + + getSnapshot(topicKey: K, params: TopicParams): TopicData | undefined { + return this.entries.get((topicDefinitions[topicKey] as any).key(params))?.data as TopicData | undefined; + } + + getStatus(topicKey: K, params: TopicParams): TopicStatus { + return this.entries.get((topicDefinitions[topicKey] as any).key(params))?.status ?? "loading"; + } + + getError(topicKey: K, params: TopicParams): Error | null { + return this.entries.get((topicDefinitions[topicKey] as any).key(params))?.error ?? null; + } + + dispose(): void { + for (const entry of this.entries.values()) { + entry.dispose(); + } + this.entries.clear(); + } +} + +class TopicEntry { + data: TData | undefined; + status: TopicStatus = "loading"; + error: Error | null = null; + listenerCount = 0; + + private readonly listeners = new Set<() => void>(); + private conn: Awaited["connect"]>> | null = null; + private unsubscribeEvent: (() => void) | null = null; + private unsubscribeError: (() => void) | null = null; + private teardownTimer: ReturnType | null = null; + private startPromise: Promise | null = null; + private started = false; + + constructor( + private readonly definition: TopicDefinition, + private readonly backend: BackendClient, + private readonly params: TParams, + ) {} + + addListener(listener: () => void): void { + this.listeners.add(listener); + this.listenerCount = this.listeners.size; + } + + removeListener(listener: () => void): void { + this.listeners.delete(listener); + this.listenerCount = this.listeners.size; + } + + ensureStarted(): void { + if (this.started || this.startPromise) { + return; + } + this.startPromise = this.start().finally(() => { + this.startPromise = null; + }); + } + + scheduleTeardown(ms: number, onTeardown: () => void): void { + this.teardownTimer = setTimeout(() => { + this.dispose(); + onTeardown(); + }, ms); + } + + cancelTeardown(): void { + if (this.teardownTimer) { + clearTimeout(this.teardownTimer); + this.teardownTimer = null; + } + } + + dispose(): void { + this.cancelTeardown(); + this.unsubscribeEvent?.(); + this.unsubscribeError?.(); + if (this.conn) { + void this.conn.dispose(); + } + this.conn = null; + this.data = undefined; + this.status = "loading"; + this.error = null; + this.started = false; + } + + private async start(): Promise { + this.status = "loading"; + this.error = null; + this.notify(); + + try { + this.conn = await this.definition.connect(this.backend, this.params); + this.unsubscribeEvent = this.conn.on(this.definition.event, (event: TEvent) => { + if (this.data === undefined) { + return; + } + this.data = this.definition.applyEvent(this.data, event); + this.notify(); + }); + this.unsubscribeError = this.conn.onError((error: unknown) => { + this.status = "error"; + this.error = error instanceof Error ? error : new Error(String(error)); + this.notify(); + }); + this.data = await this.definition.fetchInitial(this.backend, this.params); + this.status = "connected"; + this.started = true; + this.notify(); + } catch (error) { + this.status = "error"; + this.error = error instanceof Error ? error : new Error(String(error)); + this.started = false; + this.notify(); + } + } + + private notify(): void { + for (const listener of [...this.listeners]) { + listener(); + } + } +} diff --git a/foundry/packages/client/src/interest/topics.ts b/foundry/packages/client/src/interest/topics.ts new file mode 100644 index 0000000..a111248 --- /dev/null +++ b/foundry/packages/client/src/interest/topics.ts @@ -0,0 +1,131 @@ +import type { + AppEvent, + FoundryAppSnapshot, + ProviderId, + SandboxProcessesEvent, + SessionEvent, + TaskEvent, + WorkbenchSessionDetail, + WorkbenchTaskDetail, + WorkspaceEvent, + WorkspaceSummarySnapshot, +} from "@sandbox-agent/foundry-shared"; +import type { ActorConn, BackendClient, SandboxProcessRecord } from "../backend-client.js"; + +/** + * Topic definitions for the interest manager. + * + * Each topic describes one actor connection plus one materialized read model. + * Events always carry full replacement payloads for the changed entity so the + * client can replace cached state directly instead of reconstructing patches. + */ +export interface TopicDefinition { + key: (params: TParams) => string; + event: string; + connect: (backend: BackendClient, params: TParams) => Promise; + fetchInitial: (backend: BackendClient, params: TParams) => Promise; + applyEvent: (current: TData, event: TEvent) => TData; +} + +export interface AppTopicParams {} +export interface WorkspaceTopicParams { + workspaceId: string; +} +export interface TaskTopicParams { + workspaceId: string; + repoId: string; + taskId: string; +} +export interface SessionTopicParams { + workspaceId: string; + repoId: string; + taskId: string; + sessionId: string; +} +export interface SandboxProcessesTopicParams { + workspaceId: string; + providerId: ProviderId; + sandboxId: string; +} + +function upsertById(items: T[], nextItem: T, sort: (left: T, right: T) => number): T[] { + const filtered = items.filter((item) => item.id !== nextItem.id); + return [...filtered, nextItem].sort(sort); +} + +export const topicDefinitions = { + app: { + key: () => "app", + event: "appUpdated", + connect: (backend: BackendClient, _params: AppTopicParams) => backend.connectWorkspace("app"), + fetchInitial: (backend: BackendClient, _params: AppTopicParams) => backend.getAppSnapshot(), + applyEvent: (_current: FoundryAppSnapshot, event: AppEvent) => event.snapshot, + } satisfies TopicDefinition, + + workspace: { + key: (params: WorkspaceTopicParams) => `workspace:${params.workspaceId}`, + event: "workspaceUpdated", + connect: (backend: BackendClient, params: WorkspaceTopicParams) => backend.connectWorkspace(params.workspaceId), + fetchInitial: (backend: BackendClient, params: WorkspaceTopicParams) => backend.getWorkspaceSummary(params.workspaceId), + applyEvent: (current: WorkspaceSummarySnapshot, event: WorkspaceEvent) => { + switch (event.type) { + case "taskSummaryUpdated": + return { + ...current, + taskSummaries: upsertById(current.taskSummaries, event.taskSummary, (left, right) => right.updatedAtMs - left.updatedAtMs), + }; + case "taskRemoved": + return { + ...current, + taskSummaries: current.taskSummaries.filter((task) => task.id !== event.taskId), + }; + case "repoAdded": + case "repoUpdated": + return { + ...current, + repos: upsertById(current.repos, event.repo, (left, right) => right.latestActivityMs - left.latestActivityMs), + }; + case "repoRemoved": + return { + ...current, + repos: current.repos.filter((repo) => repo.id !== event.repoId), + }; + } + }, + } satisfies TopicDefinition, + + task: { + key: (params: TaskTopicParams) => `task:${params.workspaceId}:${params.taskId}`, + event: "taskUpdated", + connect: (backend: BackendClient, params: TaskTopicParams) => backend.connectTask(params.workspaceId, params.repoId, params.taskId), + fetchInitial: (backend: BackendClient, params: TaskTopicParams) => backend.getTaskDetail(params.workspaceId, params.repoId, params.taskId), + applyEvent: (_current: WorkbenchTaskDetail, event: TaskEvent) => event.detail, + } satisfies TopicDefinition, + + session: { + key: (params: SessionTopicParams) => `session:${params.workspaceId}:${params.taskId}:${params.sessionId}`, + event: "sessionUpdated", + connect: (backend: BackendClient, params: SessionTopicParams) => backend.connectTask(params.workspaceId, params.repoId, params.taskId), + fetchInitial: (backend: BackendClient, params: SessionTopicParams) => + backend.getSessionDetail(params.workspaceId, params.repoId, params.taskId, params.sessionId), + applyEvent: (current: WorkbenchSessionDetail, event: SessionEvent) => { + if (event.session.sessionId !== current.sessionId) { + return current; + } + return event.session; + }, + } satisfies TopicDefinition, + + sandboxProcesses: { + key: (params: SandboxProcessesTopicParams) => `sandbox:${params.workspaceId}:${params.providerId}:${params.sandboxId}`, + event: "processesUpdated", + connect: (backend: BackendClient, params: SandboxProcessesTopicParams) => backend.connectSandbox(params.workspaceId, params.providerId, params.sandboxId), + fetchInitial: async (backend: BackendClient, params: SandboxProcessesTopicParams) => + (await backend.listSandboxProcesses(params.workspaceId, params.providerId, params.sandboxId)).processes, + applyEvent: (_current: SandboxProcessRecord[], event: SandboxProcessesEvent) => event.processes, + } satisfies TopicDefinition, +} as const; + +export type TopicKey = keyof typeof topicDefinitions; +export type TopicParams = Parameters<(typeof topicDefinitions)[K]["fetchInitial"]>[1]; +export type TopicData = Awaited>; diff --git a/foundry/packages/client/src/interest/use-interest.ts b/foundry/packages/client/src/interest/use-interest.ts new file mode 100644 index 0000000..4ffd733 --- /dev/null +++ b/foundry/packages/client/src/interest/use-interest.ts @@ -0,0 +1,56 @@ +import { useMemo, useRef, useSyncExternalStore } from "react"; +import type { InterestManager, TopicState } from "./manager.js"; +import { topicDefinitions, type TopicKey, type TopicParams } from "./topics.js"; + +/** + * React bridge for the interest manager. + * + * `null` params disable the subscription entirely, which is how screens express + * conditional interest in task/session/sandbox topics. + */ +export function useInterest(manager: InterestManager, topicKey: K, params: TopicParams | null): TopicState { + const paramsKey = params ? (topicDefinitions[topicKey] as any).key(params) : null; + const paramsRef = useRef | null>(params); + paramsRef.current = params; + + const subscribe = useMemo(() => { + return (listener: () => void) => { + const currentParams = paramsRef.current; + if (!currentParams) { + return () => {}; + } + return manager.subscribe(topicKey, currentParams, listener); + }; + }, [manager, topicKey, paramsKey]); + + const getSnapshot = useMemo(() => { + let lastSnapshot: TopicState | null = null; + + return (): TopicState => { + const currentParams = paramsRef.current; + const nextSnapshot: TopicState = currentParams + ? { + data: manager.getSnapshot(topicKey, currentParams), + status: manager.getStatus(topicKey, currentParams), + error: manager.getError(topicKey, currentParams), + } + : { + data: undefined, + status: "loading", + error: null, + }; + + // `useSyncExternalStore` requires referentially-stable snapshots when the + // underlying store has not changed. Reuse the previous object whenever + // the topic data/status/error triplet is unchanged. + if (lastSnapshot && lastSnapshot.data === nextSnapshot.data && lastSnapshot.status === nextSnapshot.status && lastSnapshot.error === nextSnapshot.error) { + return lastSnapshot; + } + + lastSnapshot = nextSnapshot; + return nextSnapshot; + }; + }, [manager, topicKey, paramsKey]); + + return useSyncExternalStore(subscribe, getSnapshot, getSnapshot); +} diff --git a/foundry/packages/client/src/mock/backend-client.ts b/foundry/packages/client/src/mock/backend-client.ts index 6f5e7d3..2048a60 100644 --- a/foundry/packages/client/src/mock/backend-client.ts +++ b/foundry/packages/client/src/mock/backend-client.ts @@ -1,7 +1,10 @@ import type { AddRepoInput, + AppEvent, CreateTaskInput, FoundryAppSnapshot, + SandboxProcessesEvent, + SessionEvent, TaskRecord, TaskSummary, TaskWorkbenchChangeModelInput, @@ -16,6 +19,12 @@ import type { TaskWorkbenchSnapshot, TaskWorkbenchTabInput, TaskWorkbenchUpdateDraftInput, + TaskEvent, + WorkbenchSessionDetail, + WorkbenchTaskDetail, + WorkbenchTaskSummary, + WorkspaceEvent, + WorkspaceSummarySnapshot, HistoryEvent, HistoryQueryInput, ProviderId, @@ -27,7 +36,7 @@ import type { SwitchResult, } from "@sandbox-agent/foundry-shared"; import type { ProcessCreateRequest, ProcessLogFollowQuery, ProcessLogsResponse, ProcessSignalQuery } from "sandbox-agent"; -import type { BackendClient, SandboxProcessRecord, SandboxSessionEventRecord, SandboxSessionRecord } from "../backend-client.js"; +import type { ActorConn, BackendClient, SandboxProcessRecord, SandboxSessionEventRecord, SandboxSessionRecord } from "../backend-client.js"; import { getSharedMockWorkbenchClient } from "./workbench-client.js"; interface MockProcessRecord extends SandboxProcessRecord { @@ -86,6 +95,7 @@ export function createMockBackendClient(defaultWorkspaceId = "default"): Backend const workbench = getSharedMockWorkbenchClient(); const listenersBySandboxId = new Map void>>(); const processesBySandboxId = new Map(); + const connectionListeners = new Map void>>(); let nextPid = 4000; let nextProcessId = 1; @@ -110,11 +120,174 @@ export function createMockBackendClient(defaultWorkspaceId = "default"): Backend const notifySandbox = (sandboxId: string): void => { const listeners = listenersBySandboxId.get(sandboxId); if (!listeners) { + emitSandboxProcessesUpdate(sandboxId); return; } for (const listener of [...listeners]) { listener(); } + emitSandboxProcessesUpdate(sandboxId); + }; + + const connectionChannel = (scope: string, event: string): string => `${scope}:${event}`; + + const emitConnectionEvent = (scope: string, event: string, payload: any): void => { + const listeners = connectionListeners.get(connectionChannel(scope, event)); + if (!listeners) { + return; + } + for (const listener of [...listeners]) { + listener(payload); + } + }; + + const createConn = (scope: string): ActorConn => ({ + on(event: string, listener: (payload: any) => void): () => void { + const channel = connectionChannel(scope, event); + let listeners = connectionListeners.get(channel); + if (!listeners) { + listeners = new Set(); + connectionListeners.set(channel, listeners); + } + listeners.add(listener); + return () => { + const current = connectionListeners.get(channel); + if (!current) { + return; + } + current.delete(listener); + if (current.size === 0) { + connectionListeners.delete(channel); + } + }; + }, + onError(): () => void { + return () => {}; + }, + async dispose(): Promise {}, + }); + + const buildTaskSummary = (task: TaskWorkbenchSnapshot["tasks"][number]): WorkbenchTaskSummary => ({ + id: task.id, + repoId: task.repoId, + title: task.title, + status: task.status, + repoName: task.repoName, + updatedAtMs: task.updatedAtMs, + branch: task.branch, + pullRequest: task.pullRequest, + sessionsSummary: task.tabs.map((tab) => ({ + id: tab.id, + sessionId: tab.sessionId, + sessionName: tab.sessionName, + agent: tab.agent, + model: tab.model, + status: tab.status, + thinkingSinceMs: tab.thinkingSinceMs, + unread: tab.unread, + created: tab.created, + })), + }); + + const buildTaskDetail = (task: TaskWorkbenchSnapshot["tasks"][number]): WorkbenchTaskDetail => ({ + ...buildTaskSummary(task), + task: task.title, + agentType: task.tabs[0]?.agent === "Codex" ? "codex" : "claude", + runtimeStatus: toTaskStatus(task.status === "archived" ? "archived" : "running", task.status === "archived"), + statusMessage: task.status === "archived" ? "archived" : "mock sandbox ready", + activeSessionId: task.tabs[0]?.sessionId ?? null, + diffStat: task.fileChanges.length > 0 ? `+${task.fileChanges.length}/-${task.fileChanges.length}` : "+0/-0", + prUrl: task.pullRequest ? `https://example.test/pr/${task.pullRequest.number}` : null, + reviewStatus: null, + fileChanges: task.fileChanges, + diffs: task.diffs, + fileTree: task.fileTree, + minutesUsed: task.minutesUsed, + sandboxes: [ + { + providerId: "local", + sandboxId: task.id, + cwd: mockCwd(task.repoName, task.id), + }, + ], + activeSandboxId: task.id, + }); + + const buildSessionDetail = (task: TaskWorkbenchSnapshot["tasks"][number], tabId: string): WorkbenchSessionDetail => { + const tab = task.tabs.find((candidate) => candidate.id === tabId); + if (!tab) { + throw new Error(`Unknown mock tab ${tabId} for task ${task.id}`); + } + return { + sessionId: tab.id, + tabId: tab.id, + sandboxSessionId: tab.sessionId, + sessionName: tab.sessionName, + agent: tab.agent, + model: tab.model, + status: tab.status, + thinkingSinceMs: tab.thinkingSinceMs, + unread: tab.unread, + created: tab.created, + draft: tab.draft, + transcript: tab.transcript, + }; + }; + + const buildWorkspaceSummary = (): WorkspaceSummarySnapshot => { + const snapshot = workbench.getSnapshot(); + const taskSummaries = snapshot.tasks.map(buildTaskSummary); + return { + workspaceId: defaultWorkspaceId, + repos: snapshot.repos.map((repo) => { + const repoTasks = taskSummaries.filter((task) => task.repoId === repo.id); + return { + id: repo.id, + label: repo.label, + taskCount: repoTasks.length, + latestActivityMs: repoTasks.reduce((latest, task) => Math.max(latest, task.updatedAtMs), 0), + }; + }), + taskSummaries, + }; + }; + + const workspaceScope = (workspaceId: string): string => `workspace:${workspaceId}`; + const taskScope = (workspaceId: string, repoId: string, taskId: string): string => `task:${workspaceId}:${repoId}:${taskId}`; + const sandboxScope = (workspaceId: string, providerId: string, sandboxId: string): string => `sandbox:${workspaceId}:${providerId}:${sandboxId}`; + + const emitWorkspaceSnapshot = (): void => { + const summary = buildWorkspaceSummary(); + const latestTask = [...summary.taskSummaries].sort((left, right) => right.updatedAtMs - left.updatedAtMs)[0] ?? null; + if (latestTask) { + emitConnectionEvent(workspaceScope(defaultWorkspaceId), "workspaceUpdated", { + type: "taskSummaryUpdated", + taskSummary: latestTask, + } satisfies WorkspaceEvent); + } + }; + + const emitTaskUpdate = (taskId: string): void => { + const task = requireTask(taskId); + emitConnectionEvent(taskScope(defaultWorkspaceId, task.repoId, task.id), "taskUpdated", { + type: "taskDetailUpdated", + detail: buildTaskDetail(task), + } satisfies TaskEvent); + }; + + const emitSessionUpdate = (taskId: string, tabId: string): void => { + const task = requireTask(taskId); + emitConnectionEvent(taskScope(defaultWorkspaceId, task.repoId, task.id), "sessionUpdated", { + type: "sessionUpdated", + session: buildSessionDetail(task, tabId), + } satisfies SessionEvent); + }; + + const emitSandboxProcessesUpdate = (sandboxId: string): void => { + emitConnectionEvent(sandboxScope(defaultWorkspaceId, "local", sandboxId), "processesUpdated", { + type: "processesUpdated", + processes: ensureProcessList(sandboxId).map((process) => cloneProcess(process)), + } satisfies SandboxProcessesEvent); }; const buildTaskRecord = (taskId: string): TaskRecord => { @@ -192,7 +365,19 @@ export function createMockBackendClient(defaultWorkspaceId = "default"): Backend return unsupportedAppSnapshot(); }, - subscribeApp(_listener: () => void): () => void { + async connectWorkspace(workspaceId: string): Promise { + return createConn(workspaceScope(workspaceId)); + }, + + async connectTask(workspaceId: string, repoId: string, taskId: string): Promise { + return createConn(taskScope(workspaceId, repoId, taskId)); + }, + + async connectSandbox(workspaceId: string, providerId: ProviderId, sandboxId: string): Promise { + return createConn(sandboxScope(workspaceId, providerId, sandboxId)); + }, + + subscribeApp(): () => void { return () => {}; }, @@ -462,6 +647,18 @@ export function createMockBackendClient(defaultWorkspaceId = "default"): Backend return { endpoint: "mock://terminal-unavailable" }; }, + async getWorkspaceSummary(): Promise { + return buildWorkspaceSummary(); + }, + + async getTaskDetail(_workspaceId: string, _repoId: string, taskId: string): Promise { + return buildTaskDetail(requireTask(taskId)); + }, + + async getSessionDetail(_workspaceId: string, _repoId: string, taskId: string, sessionId: string): Promise { + return buildSessionDetail(requireTask(taskId), sessionId); + }, + async getWorkbench(): Promise { return workbench.getSnapshot(); }, @@ -471,59 +668,99 @@ export function createMockBackendClient(defaultWorkspaceId = "default"): Backend }, async createWorkbenchTask(_workspaceId: string, input: TaskWorkbenchCreateTaskInput): Promise { - return await workbench.createTask(input); + const created = await workbench.createTask(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(created.taskId); + if (created.tabId) { + emitSessionUpdate(created.taskId, created.tabId); + } + return created; }, async markWorkbenchUnread(_workspaceId: string, input: TaskWorkbenchSelectInput): Promise { await workbench.markTaskUnread(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); }, async renameWorkbenchTask(_workspaceId: string, input: TaskWorkbenchRenameInput): Promise { await workbench.renameTask(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); }, async renameWorkbenchBranch(_workspaceId: string, input: TaskWorkbenchRenameInput): Promise { await workbench.renameBranch(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); }, async createWorkbenchSession(_workspaceId: string, input: TaskWorkbenchSelectInput & { model?: string }): Promise<{ tabId: string }> { - return await workbench.addTab(input); + const created = await workbench.addTab(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); + emitSessionUpdate(input.taskId, created.tabId); + return created; }, async renameWorkbenchSession(_workspaceId: string, input: TaskWorkbenchRenameSessionInput): Promise { await workbench.renameSession(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); + emitSessionUpdate(input.taskId, input.tabId); }, async setWorkbenchSessionUnread(_workspaceId: string, input: TaskWorkbenchSetSessionUnreadInput): Promise { await workbench.setSessionUnread(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); + emitSessionUpdate(input.taskId, input.tabId); }, async updateWorkbenchDraft(_workspaceId: string, input: TaskWorkbenchUpdateDraftInput): Promise { await workbench.updateDraft(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); + emitSessionUpdate(input.taskId, input.tabId); }, async changeWorkbenchModel(_workspaceId: string, input: TaskWorkbenchChangeModelInput): Promise { await workbench.changeModel(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); + emitSessionUpdate(input.taskId, input.tabId); }, async sendWorkbenchMessage(_workspaceId: string, input: TaskWorkbenchSendMessageInput): Promise { await workbench.sendMessage(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); + emitSessionUpdate(input.taskId, input.tabId); }, async stopWorkbenchSession(_workspaceId: string, input: TaskWorkbenchTabInput): Promise { await workbench.stopAgent(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); + emitSessionUpdate(input.taskId, input.tabId); }, async closeWorkbenchSession(_workspaceId: string, input: TaskWorkbenchTabInput): Promise { await workbench.closeTab(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); }, async publishWorkbenchPr(_workspaceId: string, input: TaskWorkbenchSelectInput): Promise { await workbench.publishPr(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); }, async revertWorkbenchFile(_workspaceId: string, input: TaskWorkbenchDiffInput): Promise { await workbench.revertFile(input); + emitWorkspaceSnapshot(); + emitTaskUpdate(input.taskId); }, async health(): Promise<{ ok: true }> { diff --git a/foundry/packages/client/test/interest-manager.test.ts b/foundry/packages/client/test/interest-manager.test.ts new file mode 100644 index 0000000..188195c --- /dev/null +++ b/foundry/packages/client/test/interest-manager.test.ts @@ -0,0 +1,171 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { WorkspaceEvent, WorkspaceSummarySnapshot } from "@sandbox-agent/foundry-shared"; +import type { ActorConn, BackendClient } from "../src/backend-client.js"; +import { RemoteInterestManager } from "../src/interest/remote-manager.js"; + +class FakeActorConn implements ActorConn { + private readonly listeners = new Map void>>(); + private readonly errorListeners = new Set<(error: unknown) => void>(); + disposeCount = 0; + + on(event: string, listener: (payload: any) => void): () => void { + let current = this.listeners.get(event); + if (!current) { + current = new Set(); + this.listeners.set(event, current); + } + current.add(listener); + return () => { + current?.delete(listener); + if (current?.size === 0) { + this.listeners.delete(event); + } + }; + } + + onError(listener: (error: unknown) => void): () => void { + this.errorListeners.add(listener); + return () => { + this.errorListeners.delete(listener); + }; + } + + emit(event: string, payload: unknown): void { + for (const listener of this.listeners.get(event) ?? []) { + listener(payload); + } + } + + emitError(error: unknown): void { + for (const listener of this.errorListeners) { + listener(error); + } + } + + async dispose(): Promise { + this.disposeCount += 1; + } +} + +function workspaceSnapshot(): WorkspaceSummarySnapshot { + return { + workspaceId: "ws-1", + repos: [{ id: "repo-1", label: "repo-1", taskCount: 1, latestActivityMs: 10 }], + taskSummaries: [ + { + id: "task-1", + repoId: "repo-1", + title: "Initial task", + status: "idle", + repoName: "repo-1", + updatedAtMs: 10, + branch: "main", + pullRequest: null, + sessionsSummary: [], + }, + ], + }; +} + +function createBackend(conn: FakeActorConn, snapshot: WorkspaceSummarySnapshot): BackendClient { + return { + connectWorkspace: vi.fn(async () => conn), + getWorkspaceSummary: vi.fn(async () => snapshot), + } as unknown as BackendClient; +} + +async function flushAsyncWork(): Promise { + await Promise.resolve(); + await Promise.resolve(); +} + +describe("RemoteInterestManager", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("shares one connection per topic key and applies incoming events", async () => { + const conn = new FakeActorConn(); + const backend = createBackend(conn, workspaceSnapshot()); + const manager = new RemoteInterestManager(backend); + const params = { workspaceId: "ws-1" } as const; + const listenerA = vi.fn(); + const listenerB = vi.fn(); + + const unsubscribeA = manager.subscribe("workspace", params, listenerA); + const unsubscribeB = manager.subscribe("workspace", params, listenerB); + await flushAsyncWork(); + + expect(backend.connectWorkspace).toHaveBeenCalledTimes(1); + expect(backend.getWorkspaceSummary).toHaveBeenCalledTimes(1); + expect(manager.getStatus("workspace", params)).toBe("connected"); + expect(manager.getSnapshot("workspace", params)?.taskSummaries[0]?.title).toBe("Initial task"); + + conn.emit("workspaceUpdated", { + type: "taskSummaryUpdated", + taskSummary: { + id: "task-1", + repoId: "repo-1", + title: "Updated task", + status: "running", + repoName: "repo-1", + updatedAtMs: 20, + branch: "feature/live", + pullRequest: null, + sessionsSummary: [], + }, + } satisfies WorkspaceEvent); + + expect(manager.getSnapshot("workspace", params)?.taskSummaries[0]?.title).toBe("Updated task"); + expect(listenerA).toHaveBeenCalled(); + expect(listenerB).toHaveBeenCalled(); + + unsubscribeA(); + unsubscribeB(); + manager.dispose(); + }); + + it("keeps a topic warm during the grace period and tears it down afterwards", async () => { + const conn = new FakeActorConn(); + const backend = createBackend(conn, workspaceSnapshot()); + const manager = new RemoteInterestManager(backend); + const params = { workspaceId: "ws-1" } as const; + + const unsubscribeA = manager.subscribe("workspace", params, () => {}); + await flushAsyncWork(); + unsubscribeA(); + + vi.advanceTimersByTime(29_000); + + const unsubscribeB = manager.subscribe("workspace", params, () => {}); + await flushAsyncWork(); + + expect(backend.connectWorkspace).toHaveBeenCalledTimes(1); + expect(conn.disposeCount).toBe(0); + + unsubscribeB(); + vi.advanceTimersByTime(30_000); + + expect(conn.disposeCount).toBe(1); + expect(manager.getSnapshot("workspace", params)).toBeUndefined(); + }); + + it("surfaces connection errors to subscribers", async () => { + const conn = new FakeActorConn(); + const backend = createBackend(conn, workspaceSnapshot()); + const manager = new RemoteInterestManager(backend); + const params = { workspaceId: "ws-1" } as const; + + manager.subscribe("workspace", params, () => {}); + await flushAsyncWork(); + + conn.emitError(new Error("socket dropped")); + + expect(manager.getStatus("workspace", params)).toBe("error"); + expect(manager.getError("workspace", params)?.message).toBe("socket dropped"); + }); +}); diff --git a/foundry/packages/frontend/src/app/router.tsx b/foundry/packages/frontend/src/app/router.tsx index af8de1d..d6f8161 100644 --- a/foundry/packages/frontend/src/app/router.tsx +++ b/foundry/packages/frontend/src/app/router.tsx @@ -1,6 +1,7 @@ import { type ReactNode, useEffect } from "react"; import { setFrontendErrorContext } from "@sandbox-agent/foundry-frontend-errors/client"; import type { FoundryBillingPlanId } from "@sandbox-agent/foundry-shared"; +import { useInterest } from "@sandbox-agent/foundry-client"; import { Navigate, Outlet, createRootRoute, createRoute, createRouter, useRouterState } from "@tanstack/react-router"; import { DevPanel } from "../components/dev-panel"; import { MockLayout } from "../components/mock-layout"; @@ -13,8 +14,8 @@ import { MockSignInPage, } from "../components/mock-onboarding"; import { defaultWorkspaceId, isMockFrontendClient } from "../lib/env"; +import { interestManager } from "../lib/interest"; import { activeMockOrganization, getMockOrganizationById, isAppSnapshotBootstrapping, useMockAppClient, useMockAppSnapshot } from "../lib/mock-app"; -import { getTaskWorkbenchClient } from "../lib/workbench"; const rootRoute = createRootRoute({ component: RootLayout, @@ -325,7 +326,7 @@ function AppWorkspaceGate({ workspaceId, children }: { workspaceId: string; chil } function RepoRouteInner({ workspaceId, repoId }: { workspaceId: string; repoId: string }) { - const taskWorkbenchClient = getTaskWorkbenchClient(workspaceId); + const workspaceState = useInterest(interestManager, "workspace", { workspaceId }); useEffect(() => { setFrontendErrorContext({ workspaceId, @@ -333,7 +334,7 @@ function RepoRouteInner({ workspaceId, repoId }: { workspaceId: string; repoId: repoId, }); }, [repoId, workspaceId]); - const activeTaskId = taskWorkbenchClient.getSnapshot().tasks.find((task) => task.repoId === repoId)?.id; + const activeTaskId = workspaceState.data?.taskSummaries.find((task) => task.repoId === repoId)?.id; if (!activeTaskId) { return ; } diff --git a/foundry/packages/frontend/src/components/mock-layout.tsx b/foundry/packages/frontend/src/components/mock-layout.tsx index af8b84e..358d5f1 100644 --- a/foundry/packages/frontend/src/components/mock-layout.tsx +++ b/foundry/packages/frontend/src/components/mock-layout.tsx @@ -1,7 +1,8 @@ -import { memo, useCallback, useEffect, useLayoutEffect, useMemo, useRef, useState, useSyncExternalStore, type PointerEvent as ReactPointerEvent } from "react"; +import { memo, useCallback, useEffect, useLayoutEffect, useMemo, useRef, useState, type PointerEvent as ReactPointerEvent } from "react"; import { useNavigate } from "@tanstack/react-router"; import { useStyletron } from "baseui"; -import { createErrorContext } from "@sandbox-agent/foundry-shared"; +import { createErrorContext, type WorkbenchSessionSummary, type WorkbenchTaskDetail, type WorkbenchTaskSummary } from "@sandbox-agent/foundry-shared"; +import { useInterest } from "@sandbox-agent/foundry-client"; import { PanelLeft, PanelRight } from "lucide-react"; import { useFoundryTokens } from "../app/theme"; @@ -30,7 +31,8 @@ import { type ModelId, } from "./mock-layout/view-model"; import { activeMockOrganization, useMockAppSnapshot } from "../lib/mock-app"; -import { getTaskWorkbenchClient } from "../lib/workbench"; +import { backendClient } from "../lib/backend"; +import { interestManager } from "../lib/interest"; function firstAgentTabId(task: Task): string | null { return task.tabs[0]?.id ?? null; @@ -65,6 +67,81 @@ function sanitizeActiveTabId(task: Task, tabId: string | null | undefined, openD return openDiffs.length > 0 ? diffTabId(openDiffs[openDiffs.length - 1]!) : lastAgentTabId; } +function toLegacyTab( + summary: WorkbenchSessionSummary, + sessionDetail?: { draft: Task["tabs"][number]["draft"]; transcript: Task["tabs"][number]["transcript"] }, +): Task["tabs"][number] { + return { + id: summary.id, + sessionId: summary.sessionId, + sessionName: summary.sessionName, + agent: summary.agent, + model: summary.model, + status: summary.status, + thinkingSinceMs: summary.thinkingSinceMs, + unread: summary.unread, + created: summary.created, + draft: sessionDetail?.draft ?? { + text: "", + attachments: [], + updatedAtMs: null, + }, + transcript: sessionDetail?.transcript ?? [], + }; +} + +function toLegacyTask( + summary: WorkbenchTaskSummary, + detail?: WorkbenchTaskDetail, + sessionCache?: Map, +): Task { + const sessions = detail?.sessionsSummary ?? summary.sessionsSummary; + return { + id: summary.id, + repoId: summary.repoId, + title: detail?.title ?? summary.title, + status: detail?.status ?? summary.status, + repoName: detail?.repoName ?? summary.repoName, + updatedAtMs: detail?.updatedAtMs ?? summary.updatedAtMs, + branch: detail?.branch ?? summary.branch, + pullRequest: detail?.pullRequest ?? summary.pullRequest, + tabs: sessions.map((session) => toLegacyTab(session, sessionCache?.get(session.id))), + fileChanges: detail?.fileChanges ?? [], + diffs: detail?.diffs ?? {}, + fileTree: detail?.fileTree ?? [], + minutesUsed: detail?.minutesUsed ?? 0, + }; +} + +function groupProjects(repos: Array<{ id: string; label: string }>, tasks: Task[]) { + return repos + .map((repo) => ({ + id: repo.id, + label: repo.label, + updatedAtMs: tasks.filter((task) => task.repoId === repo.id).reduce((latest, task) => Math.max(latest, task.updatedAtMs), 0), + tasks: tasks.filter((task) => task.repoId === repo.id).sort((left, right) => right.updatedAtMs - left.updatedAtMs), + })) + .filter((repo) => repo.tasks.length > 0); +} + +interface WorkbenchActions { + createTask(input: { repoId: string; task: string; title?: string; branch?: string; model?: ModelId }): Promise<{ taskId: string; tabId?: string }>; + markTaskUnread(input: { taskId: string }): Promise; + renameTask(input: { taskId: string; value: string }): Promise; + renameBranch(input: { taskId: string; value: string }): Promise; + archiveTask(input: { taskId: string }): Promise; + publishPr(input: { taskId: string }): Promise; + revertFile(input: { taskId: string; path: string }): Promise; + updateDraft(input: { taskId: string; tabId: string; text: string; attachments: LineAttachment[] }): Promise; + sendMessage(input: { taskId: string; tabId: string; text: string; attachments: LineAttachment[] }): Promise; + stopAgent(input: { taskId: string; tabId: string }): Promise; + setSessionUnread(input: { taskId: string; tabId: string; unread: boolean }): Promise; + renameSession(input: { taskId: string; tabId: string; title: string }): Promise; + closeTab(input: { taskId: string; tabId: string }): Promise; + addTab(input: { taskId: string; model?: string }): Promise<{ tabId: string }>; + changeModel(input: { taskId: string; tabId: string; model: ModelId }): Promise; +} + const TranscriptPanel = memo(function TranscriptPanel({ taskWorkbenchClient, task, @@ -83,7 +160,7 @@ const TranscriptPanel = memo(function TranscriptPanel({ onToggleRightSidebar, onNavigateToUsage, }: { - taskWorkbenchClient: ReturnType; + taskWorkbenchClient: WorkbenchActions; task: Task; activeTabId: string | null; lastAgentTabId: string | null; @@ -902,14 +979,82 @@ export function MockLayout({ workspaceId, selectedTaskId, selectedSessionId }: M const [css] = useStyletron(); const t = useFoundryTokens(); const navigate = useNavigate(); - const taskWorkbenchClient = useMemo(() => getTaskWorkbenchClient(workspaceId), [workspaceId]); - const viewModel = useSyncExternalStore( - taskWorkbenchClient.subscribe.bind(taskWorkbenchClient), - taskWorkbenchClient.getSnapshot.bind(taskWorkbenchClient), - taskWorkbenchClient.getSnapshot.bind(taskWorkbenchClient), + const taskWorkbenchClient = useMemo( + () => ({ + createTask: (input) => backendClient.createWorkbenchTask(workspaceId, input), + markTaskUnread: (input) => backendClient.markWorkbenchUnread(workspaceId, input), + renameTask: (input) => backendClient.renameWorkbenchTask(workspaceId, input), + renameBranch: (input) => backendClient.renameWorkbenchBranch(workspaceId, input), + archiveTask: async (input) => backendClient.runAction(workspaceId, input.taskId, "archive"), + publishPr: (input) => backendClient.publishWorkbenchPr(workspaceId, input), + revertFile: (input) => backendClient.revertWorkbenchFile(workspaceId, input), + updateDraft: (input) => backendClient.updateWorkbenchDraft(workspaceId, input), + sendMessage: (input) => backendClient.sendWorkbenchMessage(workspaceId, input), + stopAgent: (input) => backendClient.stopWorkbenchSession(workspaceId, input), + setSessionUnread: (input) => backendClient.setWorkbenchSessionUnread(workspaceId, input), + renameSession: (input) => backendClient.renameWorkbenchSession(workspaceId, input), + closeTab: (input) => backendClient.closeWorkbenchSession(workspaceId, input), + addTab: (input) => backendClient.createWorkbenchSession(workspaceId, input), + changeModel: (input) => backendClient.changeWorkbenchModel(workspaceId, input), + }), + [workspaceId], ); - const tasks = viewModel.tasks ?? []; - const rawProjects = viewModel.projects ?? []; + const workspaceState = useInterest(interestManager, "workspace", { workspaceId }); + const workspaceRepos = workspaceState.data?.repos ?? []; + const taskSummaries = workspaceState.data?.taskSummaries ?? []; + const selectedTaskSummary = useMemo( + () => taskSummaries.find((task) => task.id === selectedTaskId) ?? taskSummaries[0] ?? null, + [selectedTaskId, taskSummaries], + ); + const taskState = useInterest( + interestManager, + "task", + selectedTaskSummary + ? { + workspaceId, + repoId: selectedTaskSummary.repoId, + taskId: selectedTaskSummary.id, + } + : null, + ); + const sessionState = useInterest( + interestManager, + "session", + selectedTaskSummary && selectedSessionId + ? { + workspaceId, + repoId: selectedTaskSummary.repoId, + taskId: selectedTaskSummary.id, + sessionId: selectedSessionId, + } + : null, + ); + const tasks = useMemo(() => { + const sessionCache = new Map(); + if (selectedTaskSummary && taskState.data) { + for (const session of taskState.data.sessionsSummary) { + const cached = + (selectedSessionId && session.id === selectedSessionId ? sessionState.data : undefined) ?? + interestManager.getSnapshot("session", { + workspaceId, + repoId: selectedTaskSummary.repoId, + taskId: selectedTaskSummary.id, + sessionId: session.id, + }); + if (cached) { + sessionCache.set(session.id, { + draft: cached.draft, + transcript: cached.transcript, + }); + } + } + } + + return taskSummaries.map((summary) => + summary.id === selectedTaskSummary?.id ? toLegacyTask(summary, taskState.data, sessionCache) : toLegacyTask(summary), + ); + }, [selectedTaskSummary, selectedSessionId, sessionState.data, taskState.data, taskSummaries, workspaceId]); + const rawProjects = useMemo(() => groupProjects(workspaceRepos, tasks), [tasks, workspaceRepos]); const appSnapshot = useMockAppSnapshot(); const activeOrg = activeMockOrganization(appSnapshot); const navigateToUsage = useCallback(() => { @@ -1084,16 +1229,16 @@ export function MockLayout({ workspaceId, selectedTaskId, selectedSessionId }: M }, [activeTask, lastAgentTabIdByTask, selectedSessionId, syncRouteSession]); useEffect(() => { - if (selectedNewTaskRepoId && viewModel.repos.some((repo) => repo.id === selectedNewTaskRepoId)) { + if (selectedNewTaskRepoId && workspaceRepos.some((repo) => repo.id === selectedNewTaskRepoId)) { return; } const fallbackRepoId = - activeTask?.repoId && viewModel.repos.some((repo) => repo.id === activeTask.repoId) ? activeTask.repoId : (viewModel.repos[0]?.id ?? ""); + activeTask?.repoId && workspaceRepos.some((repo) => repo.id === activeTask.repoId) ? activeTask.repoId : (workspaceRepos[0]?.id ?? ""); if (fallbackRepoId !== selectedNewTaskRepoId) { setSelectedNewTaskRepoId(fallbackRepoId); } - }, [activeTask?.repoId, selectedNewTaskRepoId, viewModel.repos]); + }, [activeTask?.repoId, selectedNewTaskRepoId, workspaceRepos]); useEffect(() => { if (!activeTask) { @@ -1366,7 +1511,7 @@ export function MockLayout({ workspaceId, selectedTaskId, selectedSessionId }: M

Create your first task

- {viewModel.repos.length > 0 + {workspaceRepos.length > 0 ? "Start from the sidebar to create a task on the first available repo." : "No repos are available in this workspace yet."}