WIP: async action fixes and interest manager

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-13 18:48:07 -07:00
parent 0185130230
commit 2022a6ec18
35 changed files with 2950 additions and 385 deletions

View file

@ -278,10 +278,12 @@ async function getSandboxAgentClient(c: any) {
});
}
function broadcastProcessesUpdated(c: any): void {
async function broadcastProcessesUpdated(c: any): Promise<void> {
const client = await getSandboxAgentClient(c);
const { processes } = await client.listProcesses();
c.broadcast("processesUpdated", {
sandboxId: c.state.sandboxId,
at: Date.now(),
type: "processesUpdated",
processes,
});
}
@ -475,7 +477,7 @@ export const sandboxInstance = actor({
async createProcess(c: any, request: ProcessCreateRequest): Promise<ProcessInfo> {
const client = await getSandboxAgentClient(c);
const created = await client.createProcess(request);
broadcastProcessesUpdated(c);
await broadcastProcessesUpdated(c);
return created;
},
@ -492,21 +494,21 @@ export const sandboxInstance = actor({
async stopProcess(c: any, request: { processId: string; query?: ProcessSignalQuery }): Promise<ProcessInfo> {
const client = await getSandboxAgentClient(c);
const stopped = await client.stopProcess(request.processId, request.query);
broadcastProcessesUpdated(c);
await broadcastProcessesUpdated(c);
return stopped;
},
async killProcess(c: any, request: { processId: string; query?: ProcessSignalQuery }): Promise<ProcessInfo> {
const client = await getSandboxAgentClient(c);
const killed = await client.killProcess(request.processId, request.query);
broadcastProcessesUpdated(c);
await broadcastProcessesUpdated(c);
return killed;
},
async deleteProcess(c: any, request: { processId: string }): Promise<void> {
const client = await getSandboxAgentClient(c);
await client.deleteProcess(request.processId);
broadcastProcessesUpdated(c);
await broadcastProcessesUpdated(c);
},
async providerState(c: any): Promise<{ providerId: ProviderId; sandboxId: string; state: string; at: number }> {

View file

@ -19,7 +19,9 @@ import {
changeWorkbenchModel,
closeWorkbenchSession,
createWorkbenchSession,
getWorkbenchTask,
getSessionDetail,
getTaskDetail,
getTaskSummary,
markWorkbenchUnread,
publishWorkbenchPr,
renameWorkbenchBranch,
@ -228,8 +230,16 @@ export const task = actor({
return await getCurrentRecord({ db: c.db, state: c.state });
},
async getWorkbench(c) {
return await getWorkbenchTask(c);
async getTaskSummary(c) {
return await getTaskSummary(c);
},
async getTaskDetail(c) {
return await getTaskDetail(c);
},
async getSessionDetail(c, input: { sessionId: string }) {
return await getSessionDetail(c, input.sessionId);
},
async markWorkbenchUnread(c): Promise<void> {

View file

@ -286,11 +286,6 @@ async function requireReadySessionMeta(c: any, tabId: string): Promise<any> {
return meta;
}
async function notifyWorkbenchUpdated(c: any): Promise<void> {
const workspace = await getOrCreateWorkspace(c, c.state.workspaceId);
await workspace.notifyWorkbenchUpdated({});
}
function shellFragment(parts: string[]): string {
return parts.join(" && ");
}
@ -600,42 +595,60 @@ export async function ensureWorkbenchSeeded(c: any): Promise<any> {
return record;
}
export async function getWorkbenchTask(c: any): Promise<any> {
function buildSessionSummary(record: any, meta: any): any {
const derivedSandboxSessionId = meta.sandboxSessionId ?? (meta.status === "pending_provision" && record.activeSessionId ? record.activeSessionId : null);
const sessionStatus =
meta.status === "ready" && derivedSandboxSessionId ? activeSessionStatus(record, derivedSandboxSessionId) : meta.status === "error" ? "error" : "idle";
let thinkingSinceMs = meta.thinkingSinceMs ?? null;
let unread = Boolean(meta.unread);
if (thinkingSinceMs && sessionStatus !== "running") {
thinkingSinceMs = null;
unread = true;
}
return {
id: meta.id,
sessionId: derivedSandboxSessionId,
sessionName: meta.sessionName,
agent: agentKindForModel(meta.model),
model: meta.model,
status: sessionStatus,
thinkingSinceMs: sessionStatus === "running" ? thinkingSinceMs : null,
unread,
created: Boolean(meta.created || derivedSandboxSessionId),
};
}
function buildSessionDetailFromMeta(record: any, meta: any): any {
const summary = buildSessionSummary(record, meta);
return {
sessionId: meta.tabId,
tabId: meta.tabId,
sandboxSessionId: summary.sessionId,
sessionName: summary.sessionName,
agent: summary.agent,
model: summary.model,
status: summary.status,
thinkingSinceMs: summary.thinkingSinceMs,
unread: summary.unread,
created: summary.created,
draft: {
text: meta.draftText ?? "",
attachments: Array.isArray(meta.draftAttachments) ? meta.draftAttachments : [],
updatedAtMs: meta.draftUpdatedAtMs ?? null,
},
transcript: meta.transcript ?? [],
};
}
/**
* Builds a WorkbenchTaskSummary from local task actor state. Task actors push
* this to the parent workspace actor so workspace sidebar reads stay local.
*/
export async function buildTaskSummary(c: any): Promise<any> {
const record = await ensureWorkbenchSeeded(c);
const gitState = await readCachedGitState(c);
const sessions = await listSessionMetaRows(c);
await maybeScheduleWorkbenchRefreshes(c, record, sessions);
const tabs = [];
for (const meta of sessions) {
const derivedSandboxSessionId = meta.sandboxSessionId ?? (meta.status === "pending_provision" && record.activeSessionId ? record.activeSessionId : null);
const sessionStatus =
meta.status === "ready" && derivedSandboxSessionId ? activeSessionStatus(record, derivedSandboxSessionId) : meta.status === "error" ? "error" : "idle";
let thinkingSinceMs = meta.thinkingSinceMs ?? null;
let unread = Boolean(meta.unread);
if (thinkingSinceMs && sessionStatus !== "running") {
thinkingSinceMs = null;
unread = true;
}
tabs.push({
id: meta.id,
sessionId: derivedSandboxSessionId,
sessionName: meta.sessionName,
agent: agentKindForModel(meta.model),
model: meta.model,
status: sessionStatus,
thinkingSinceMs: sessionStatus === "running" ? thinkingSinceMs : null,
unread,
created: Boolean(meta.created || derivedSandboxSessionId),
draft: {
text: meta.draftText ?? "",
attachments: Array.isArray(meta.draftAttachments) ? meta.draftAttachments : [],
updatedAtMs: meta.draftUpdatedAtMs ?? null,
},
transcript: meta.transcript ?? [],
});
}
return {
id: c.state.taskId,
@ -646,19 +659,98 @@ export async function getWorkbenchTask(c: any): Promise<any> {
updatedAtMs: record.updatedAt,
branch: record.branchName,
pullRequest: await readPullRequestSummary(c, record.branchName),
tabs,
sessionsSummary: sessions.map((meta) => buildSessionSummary(record, meta)),
};
}
/**
* Builds a WorkbenchTaskDetail from local task actor state for direct task
* subscribers. This is a full replacement payload, not a patch.
*/
export async function buildTaskDetail(c: any): Promise<any> {
const record = await ensureWorkbenchSeeded(c);
const gitState = await readCachedGitState(c);
const sessions = await listSessionMetaRows(c);
await maybeScheduleWorkbenchRefreshes(c, record, sessions);
const summary = await buildTaskSummary(c);
return {
...summary,
task: record.task,
agentType: record.agentType === "claude" || record.agentType === "codex" ? record.agentType : null,
runtimeStatus: record.status,
statusMessage: record.statusMessage ?? null,
activeSessionId: record.activeSessionId ?? null,
diffStat: record.diffStat ?? null,
prUrl: record.prUrl ?? null,
reviewStatus: record.reviewStatus ?? null,
fileChanges: gitState.fileChanges,
diffs: gitState.diffs,
fileTree: gitState.fileTree,
minutesUsed: 0,
sandboxes: (record.sandboxes ?? []).map((sandbox: any) => ({
providerId: sandbox.providerId,
sandboxId: sandbox.sandboxId,
cwd: sandbox.cwd ?? null,
})),
activeSandboxId: record.activeSandboxId ?? null,
};
}
/**
* Builds a WorkbenchSessionDetail for a specific session tab.
*/
export async function buildSessionDetail(c: any, tabId: string): Promise<any> {
const record = await ensureWorkbenchSeeded(c);
const meta = await readSessionMeta(c, tabId);
if (!meta || meta.closed) {
throw new Error(`Unknown workbench session tab: ${tabId}`);
}
return buildSessionDetailFromMeta(record, meta);
}
export async function getTaskSummary(c: any): Promise<any> {
return await buildTaskSummary(c);
}
export async function getTaskDetail(c: any): Promise<any> {
return await buildTaskDetail(c);
}
export async function getSessionDetail(c: any, tabId: string): Promise<any> {
return await buildSessionDetail(c, tabId);
}
/**
* Replaces the old notifyWorkbenchUpdated pattern.
*
* The task actor emits two kinds of updates:
* - Push summary state up to the parent workspace actor so the sidebar
* materialized projection stays current.
* - Broadcast full detail/session payloads down to direct task subscribers.
*/
export async function broadcastTaskUpdate(c: any, options?: { sessionId?: string }): Promise<void> {
const workspace = await getOrCreateWorkspace(c, c.state.workspaceId);
await workspace.applyTaskSummaryUpdate({ taskSummary: await buildTaskSummary(c) });
c.broadcast("taskUpdated", {
type: "taskDetailUpdated",
detail: await buildTaskDetail(c),
});
if (options?.sessionId) {
c.broadcast("sessionUpdated", {
type: "sessionUpdated",
session: await buildSessionDetail(c, options.sessionId),
});
}
}
export async function refreshWorkbenchDerivedState(c: any): Promise<void> {
const record = await ensureWorkbenchSeeded(c);
const gitState = await collectWorkbenchGitState(c, record);
await writeCachedGitState(c, gitState);
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}
export async function refreshWorkbenchSessionTranscript(c: any, sessionId: string): Promise<void> {
@ -670,7 +762,7 @@ export async function refreshWorkbenchSessionTranscript(c: any, sessionId: strin
const transcript = await readSessionTranscript(c, record, meta.sandboxSessionId);
await writeSessionTranscript(c, meta.tabId, transcript);
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: meta.tabId });
}
export async function renameWorkbenchTask(c: any, value: string): Promise<void> {
@ -688,7 +780,7 @@ export async function renameWorkbenchTask(c: any, value: string): Promise<void>
.where(eq(taskTable.id, 1))
.run();
c.state.title = nextTitle;
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}
export async function renameWorkbenchBranch(c: any, value: string): Promise<void> {
@ -739,7 +831,7 @@ export async function renameWorkbenchBranch(c: any, value: string): Promise<void
taskId: c.state.taskId,
branchName: nextBranch,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}
export async function createWorkbenchSession(c: any, model?: string): Promise<{ tabId: string }> {
@ -755,7 +847,7 @@ export async function createWorkbenchSession(c: any, model?: string): Promise<{
sessionName: "Session 1",
status: "ready",
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: record.activeSessionId });
return { tabId: record.activeSessionId };
}
}
@ -780,7 +872,7 @@ export async function createWorkbenchSession(c: any, model?: string): Promise<{
wait: false,
},
);
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: tabId });
return { tabId };
}
@ -815,7 +907,7 @@ export async function ensureWorkbenchSession(c: any, tabId: string, model?: stri
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", {
sessionId: record.activeSessionId,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: tabId });
return;
}
@ -827,7 +919,7 @@ export async function ensureWorkbenchSession(c: any, tabId: string, model?: stri
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", {
sessionId: meta.sandboxSessionId,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: tabId });
return;
}
@ -838,7 +930,7 @@ export async function ensureWorkbenchSession(c: any, tabId: string, model?: stri
status: "error",
errorMessage: "cannot create session without a sandbox cwd",
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: tabId });
return;
}
@ -873,7 +965,7 @@ export async function ensureWorkbenchSession(c: any, tabId: string, model?: stri
});
}
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: tabId });
}
export async function enqueuePendingWorkbenchSessions(c: any): Promise<void> {
@ -904,14 +996,14 @@ export async function renameWorkbenchSession(c: any, sessionId: string, title: s
await updateSessionMeta(c, sessionId, {
sessionName: trimmed,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function setWorkbenchSessionUnread(c: any, sessionId: string, unread: boolean): Promise<void> {
await updateSessionMeta(c, sessionId, {
unread: unread ? 1 : 0,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function updateWorkbenchDraft(c: any, sessionId: string, text: string, attachments: Array<any>): Promise<void> {
@ -920,14 +1012,14 @@ export async function updateWorkbenchDraft(c: any, sessionId: string, text: stri
draftAttachmentsJson: JSON.stringify(attachments),
draftUpdatedAt: Date.now(),
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function changeWorkbenchModel(c: any, sessionId: string, model: string): Promise<void> {
await updateSessionMeta(c, sessionId, {
model,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function sendWorkbenchMessage(c: any, sessionId: string, text: string, attachments: Array<any>): Promise<void> {
@ -984,7 +1076,7 @@ export async function sendWorkbenchMessage(c: any, sessionId: string, text: stri
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", {
sessionId: meta.sandboxSessionId,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function stopWorkbenchSession(c: any, sessionId: string): Promise<void> {
@ -998,7 +1090,7 @@ export async function stopWorkbenchSession(c: any, sessionId: string): Promise<v
await updateSessionMeta(c, sessionId, {
thinkingSinceMs: null,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function syncWorkbenchSessionStatus(c: any, sessionId: string, status: "running" | "idle" | "error", at: number): Promise<void> {
@ -1063,7 +1155,7 @@ export async function syncWorkbenchSessionStatus(c: any, sessionId: string, stat
});
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_derived", {});
}
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: meta.tabId });
}
}
@ -1096,7 +1188,7 @@ export async function closeWorkbenchSession(c: any, sessionId: string): Promise<
.where(eq(taskRuntime.id, 1))
.run();
}
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}
export async function markWorkbenchUnread(c: any): Promise<void> {
@ -1108,7 +1200,7 @@ export async function markWorkbenchUnread(c: any): Promise<void> {
await updateSessionMeta(c, latest.tabId, {
unread: 1,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: latest.tabId });
}
export async function publishWorkbenchPr(c: any): Promise<void> {
@ -1129,7 +1221,7 @@ export async function publishWorkbenchPr(c: any): Promise<void> {
})
.where(eq(taskTable.id, 1))
.run();
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}
export async function revertWorkbenchFile(c: any, path: string): Promise<void> {
@ -1152,5 +1244,5 @@ export async function revertWorkbenchFile(c: any, path: string): Promise<void> {
throw new Error(`file revert failed (${result.exitCode}): ${result.result}`);
}
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_derived", {});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}

View file

@ -1,9 +1,9 @@
// @ts-nocheck
import { eq } from "drizzle-orm";
import type { TaskRecord, TaskStatus } from "@sandbox-agent/foundry-shared";
import { getOrCreateWorkspace } from "../../handles.js";
import { task as taskTable, taskRuntime, taskSandboxes } from "../db/schema.js";
import { historyKey } from "../../keys.js";
import { broadcastTaskUpdate } from "../workbench.js";
export const TASK_ROW_ID = 1;
@ -83,8 +83,7 @@ export async function setTaskState(ctx: any, status: TaskStatus, statusMessage?:
.run();
}
const workspace = await getOrCreateWorkspace(ctx, ctx.state.workspaceId);
await workspace.notifyWorkbenchUpdated({});
await broadcastTaskUpdate(ctx);
}
export async function getCurrentRecord(ctx: any): Promise<TaskRecord> {
@ -176,6 +175,5 @@ export async function appendHistory(ctx: any, kind: string, payload: Record<stri
payload,
});
const workspace = await getOrCreateWorkspace(ctx, ctx.state.workspaceId);
await workspace.notifyWorkbenchUpdated({});
await broadcastTaskUpdate(ctx);
}

View file

@ -35,6 +35,13 @@ function debugInit(loopCtx: any, message: string, context?: Record<string, unkno
});
}
async function ensureTaskRuntimeCacheColumns(db: any): Promise<void> {
await db.execute(`ALTER TABLE task_runtime ADD COLUMN git_state_json text`).catch(() => {});
await db.execute(`ALTER TABLE task_runtime ADD COLUMN git_state_updated_at integer`).catch(() => {});
await db.execute(`ALTER TABLE task_runtime ADD COLUMN provision_stage text`).catch(() => {});
await db.execute(`ALTER TABLE task_runtime ADD COLUMN provision_stage_updated_at integer`).catch(() => {});
}
async function withActivityTimeout<T>(timeoutMs: number, label: string, run: () => Promise<T>): Promise<T> {
let timer: ReturnType<typeof setTimeout> | null = null;
try {
@ -61,6 +68,8 @@ export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise<
const initialStatusMessage = loopCtx.state.branchName && loopCtx.state.title ? "provisioning" : "naming";
try {
await ensureTaskRuntimeCacheColumns(db);
await db
.insert(taskTable)
.values({

View file

@ -4,6 +4,17 @@ import { Loop } from "rivetkit/workflow";
import type {
AddRepoInput,
CreateTaskInput,
HistoryEvent,
HistoryQueryInput,
ListTasksInput,
ProviderId,
RepoOverview,
RepoRecord,
RepoStackActionInput,
RepoStackActionResult,
StarSandboxAgentRepoInput,
StarSandboxAgentRepoResult,
SwitchResult,
TaskRecord,
TaskSummary,
TaskWorkbenchChangeModelInput,
@ -14,20 +25,13 @@ import type {
TaskWorkbenchSelectInput,
TaskWorkbenchSetSessionUnreadInput,
TaskWorkbenchSendMessageInput,
TaskWorkbenchSnapshot,
TaskWorkbenchTabInput,
TaskWorkbenchUpdateDraftInput,
HistoryEvent,
HistoryQueryInput,
ListTasksInput,
ProviderId,
RepoOverview,
RepoStackActionInput,
RepoStackActionResult,
RepoRecord,
StarSandboxAgentRepoInput,
StarSandboxAgentRepoResult,
SwitchResult,
WorkbenchRepoSummary,
WorkbenchSessionSummary,
WorkbenchTaskSummary,
WorkspaceEvent,
WorkspaceSummarySnapshot,
WorkspaceUseInput,
} from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js";
@ -35,7 +39,7 @@ import { getTask, getOrCreateHistory, getOrCreateProject, selfWorkspace } from "
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { normalizeRemoteUrl, repoIdFromRemote } from "../../services/repo.js";
import { resolveWorkspaceGithubAuth } from "../../services/github-auth.js";
import { taskLookup, repos, providerProfiles } from "./db/schema.js";
import { taskLookup, repos, providerProfiles, taskSummaries } from "./db/schema.js";
import { agentTypeForModel } from "../task/workbench.js";
import { expectQueueResponse } from "../../services/queue.js";
import { workspaceAppActions } from "./app-shell.js";
@ -109,6 +113,18 @@ async function upsertTaskLookupRow(c: any, taskId: string, repoId: string): Prom
.run();
}
function parseJsonValue<T>(value: string | null | undefined, fallback: T): T {
if (!value) {
return fallback;
}
try {
return JSON.parse(value) as T;
} catch {
return fallback;
}
}
async function collectAllTaskSummaries(c: any): Promise<TaskSummary[]> {
const repoRows = await c.db.select({ repoId: repos.repoId, remoteUrl: repos.remoteUrl }).from(repos).orderBy(desc(repos.updatedAt)).all();
@ -145,17 +161,55 @@ function repoLabelFromRemote(remoteUrl: string): string {
return remoteUrl;
}
async function buildWorkbenchSnapshot(c: any): Promise<TaskWorkbenchSnapshot> {
function buildRepoSummary(repoRow: { repoId: string; remoteUrl: string; updatedAt: number }, taskRows: WorkbenchTaskSummary[]): WorkbenchRepoSummary {
const repoTasks = taskRows.filter((task) => task.repoId === repoRow.repoId);
const latestActivityMs = repoTasks.reduce((latest, task) => Math.max(latest, task.updatedAtMs), repoRow.updatedAt);
return {
id: repoRow.repoId,
label: repoLabelFromRemote(repoRow.remoteUrl),
taskCount: repoTasks.length,
latestActivityMs,
};
}
function taskSummaryRowFromSummary(taskSummary: WorkbenchTaskSummary) {
return {
taskId: taskSummary.id,
repoId: taskSummary.repoId,
title: taskSummary.title,
status: taskSummary.status,
repoName: taskSummary.repoName,
updatedAtMs: taskSummary.updatedAtMs,
branch: taskSummary.branch,
pullRequestJson: JSON.stringify(taskSummary.pullRequest),
sessionsSummaryJson: JSON.stringify(taskSummary.sessionsSummary),
};
}
function taskSummaryFromRow(row: any): WorkbenchTaskSummary {
return {
id: row.taskId,
repoId: row.repoId,
title: row.title,
status: row.status,
repoName: row.repoName,
updatedAtMs: row.updatedAtMs,
branch: row.branch ?? null,
pullRequest: parseJsonValue(row.pullRequestJson, null),
sessionsSummary: parseJsonValue<WorkbenchSessionSummary[]>(row.sessionsSummaryJson, []),
};
}
async function reconcileWorkbenchProjection(c: any): Promise<WorkspaceSummarySnapshot> {
const repoRows = await c.db
.select({ repoId: repos.repoId, remoteUrl: repos.remoteUrl, updatedAt: repos.updatedAt })
.from(repos)
.orderBy(desc(repos.updatedAt))
.all();
const tasks: Array<any> = [];
const projects: Array<any> = [];
const taskRows: WorkbenchTaskSummary[] = [];
for (const row of repoRows) {
const projectTasks: Array<any> = [];
try {
const project = await getOrCreateProject(c, c.state.workspaceId, row.repoId, row.remoteUrl);
const summaries = await project.listTaskSummaries({ includeArchived: true });
@ -163,11 +217,18 @@ async function buildWorkbenchSnapshot(c: any): Promise<TaskWorkbenchSnapshot> {
try {
await upsertTaskLookupRow(c, summary.taskId, row.repoId);
const task = getTask(c, c.state.workspaceId, row.repoId, summary.taskId);
const snapshot = await task.getWorkbench({});
tasks.push(snapshot);
projectTasks.push(snapshot);
const taskSummary = await task.getTaskSummary({});
taskRows.push(taskSummary);
await c.db
.insert(taskSummaries)
.values(taskSummaryRowFromSummary(taskSummary))
.onConflictDoUpdate({
target: taskSummaries.taskId,
set: taskSummaryRowFromSummary(taskSummary),
})
.run();
} catch (error) {
logActorWarning("workspace", "failed collecting workbench task", {
logActorWarning("workspace", "failed collecting task summary during reconciliation", {
workspaceId: c.state.workspaceId,
repoId: row.repoId,
taskId: summary.taskId,
@ -175,17 +236,8 @@ async function buildWorkbenchSnapshot(c: any): Promise<TaskWorkbenchSnapshot> {
});
}
}
if (projectTasks.length > 0) {
projects.push({
id: row.repoId,
label: repoLabelFromRemote(row.remoteUrl),
updatedAtMs: projectTasks[0]?.updatedAtMs ?? row.updatedAt,
tasks: projectTasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs),
});
}
} catch (error) {
logActorWarning("workspace", "failed collecting workbench repo snapshot", {
logActorWarning("workspace", "failed collecting repo during workbench reconciliation", {
workspaceId: c.state.workspaceId,
repoId: row.repoId,
error: resolveErrorMessage(error),
@ -193,16 +245,11 @@ async function buildWorkbenchSnapshot(c: any): Promise<TaskWorkbenchSnapshot> {
}
}
tasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs);
projects.sort((left, right) => right.updatedAtMs - left.updatedAtMs);
taskRows.sort((left, right) => right.updatedAtMs - left.updatedAtMs);
return {
workspaceId: c.state.workspaceId,
repos: repoRows.map((row) => ({
id: row.repoId,
label: repoLabelFromRemote(row.remoteUrl),
})),
projects,
tasks,
repos: repoRows.map((row) => buildRepoSummary(row, taskRows)).sort((left, right) => right.latestActivityMs - left.latestActivityMs),
taskSummaries: taskRows,
};
}
@ -211,6 +258,41 @@ async function requireWorkbenchTask(c: any, taskId: string) {
return getTask(c, c.state.workspaceId, repoId, taskId);
}
/**
* Reads the workspace sidebar snapshot from the workspace actor's local SQLite
* only. Task actors push summary updates into `task_summaries`, so clients do
* not need this action to fan out to every child actor on the hot read path.
*/
async function getWorkspaceSummarySnapshot(c: any): Promise<WorkspaceSummarySnapshot> {
const repoRows = await c.db
.select({
repoId: repos.repoId,
remoteUrl: repos.remoteUrl,
updatedAt: repos.updatedAt,
})
.from(repos)
.orderBy(desc(repos.updatedAt))
.all();
const taskRows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all();
const summaries = taskRows.map(taskSummaryFromRow);
return {
workspaceId: c.state.workspaceId,
repos: repoRows.map((row) => buildRepoSummary(row, summaries)).sort((left, right) => right.latestActivityMs - left.latestActivityMs),
taskSummaries: summaries,
};
}
async function broadcastRepoSummary(
c: any,
type: "repoAdded" | "repoUpdated",
repoRow: { repoId: string; remoteUrl: string; updatedAt: number },
): Promise<void> {
const matchingTaskRows = await c.db.select().from(taskSummaries).where(eq(taskSummaries.repoId, repoRow.repoId)).all();
const repo = buildRepoSummary(repoRow, matchingTaskRows.map(taskSummaryFromRow));
c.broadcast("workspaceUpdated", { type, repo } satisfies WorkspaceEvent);
}
async function addRepoMutation(c: any, input: AddRepoInput): Promise<RepoRecord> {
assertWorkspace(c, input.workspaceId);
@ -225,6 +307,7 @@ async function addRepoMutation(c: any, input: AddRepoInput): Promise<RepoRecord>
const repoId = repoIdFromRemote(remoteUrl);
const now = Date.now();
const existing = await c.db.select({ repoId: repos.repoId }).from(repos).where(eq(repos.repoId, repoId)).get();
await c.db
.insert(repos)
@ -243,7 +326,11 @@ async function addRepoMutation(c: any, input: AddRepoInput): Promise<RepoRecord>
})
.run();
await workspaceActions.notifyWorkbenchUpdated(c);
await broadcastRepoSummary(c, existing ? "repoUpdated" : "repoAdded", {
repoId,
remoteUrl,
updatedAt: now,
});
return {
workspaceId: c.state.workspaceId,
repoId,
@ -306,7 +393,20 @@ async function createTaskMutation(c: any, input: CreateTaskInput): Promise<TaskR
})
.run();
await workspaceActions.notifyWorkbenchUpdated(c);
try {
const task = getTask(c, c.state.workspaceId, repoId, created.taskId);
await workspaceActions.applyTaskSummaryUpdate(c, {
taskSummary: await task.getTaskSummary({}),
});
} catch (error) {
logActorWarning("workspace", "failed seeding task summary after task creation", {
workspaceId: c.state.workspaceId,
repoId,
taskId: created.taskId,
error: resolveErrorMessage(error),
});
}
return created;
}
@ -462,13 +562,37 @@ export const workspaceActions = {
};
},
async getWorkbench(c: any, input: WorkspaceUseInput): Promise<TaskWorkbenchSnapshot> {
assertWorkspace(c, input.workspaceId);
return await buildWorkbenchSnapshot(c);
/**
* Called by task actors when their summary-level state changes.
* This is the write path for the local materialized projection; clients read
* the projection via `getWorkspaceSummary`, but only task actors should push
* rows into it.
*/
async applyTaskSummaryUpdate(c: any, input: { taskSummary: WorkbenchTaskSummary }): Promise<void> {
await c.db
.insert(taskSummaries)
.values(taskSummaryRowFromSummary(input.taskSummary))
.onConflictDoUpdate({
target: taskSummaries.taskId,
set: taskSummaryRowFromSummary(input.taskSummary),
})
.run();
c.broadcast("workspaceUpdated", { type: "taskSummaryUpdated", taskSummary: input.taskSummary } satisfies WorkspaceEvent);
},
async notifyWorkbenchUpdated(c: any): Promise<void> {
c.broadcast("workbenchUpdated", { at: Date.now() });
async removeTaskSummary(c: any, input: { taskId: string }): Promise<void> {
await c.db.delete(taskSummaries).where(eq(taskSummaries.taskId, input.taskId)).run();
c.broadcast("workspaceUpdated", { type: "taskRemoved", taskId: input.taskId } satisfies WorkspaceEvent);
},
async getWorkspaceSummary(c: any, input: WorkspaceUseInput): Promise<WorkspaceSummarySnapshot> {
assertWorkspace(c, input.workspaceId);
return await getWorkspaceSummarySnapshot(c);
},
async reconcileWorkbenchState(c: any, input: WorkspaceUseInput): Promise<WorkspaceSummarySnapshot> {
assertWorkspace(c, input.workspaceId);
return await reconcileWorkbenchProjection(c);
},
async createWorkbenchTask(c: any, input: TaskWorkbenchCreateTaskInput): Promise<{ taskId: string; tabId?: string }> {

View file

@ -152,6 +152,10 @@ function encodeEligibleOrganizationIds(value: string[]): string {
return JSON.stringify([...new Set(value)]);
}
function errorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
function seatsIncludedForPlan(planId: FoundryBillingPlanId): number {
switch (planId) {
case "free":
@ -217,7 +221,76 @@ async function getOrganizationState(workspace: any) {
return await workspace.getOrganizationShellState({});
}
async function buildAppSnapshot(c: any, sessionId: string): Promise<FoundryAppSnapshot> {
async function getOrganizationStateIfInitialized(workspace: any) {
return await workspace.getOrganizationShellStateIfInitialized({});
}
async function listSnapshotOrganizations(c: any, sessionId: string, organizationIds: string[]) {
const results = await Promise.all(
organizationIds.map(async (organizationId) => {
const organizationStartedAt = performance.now();
try {
const workspace = await getOrCreateWorkspace(c, organizationId);
const organizationState = await getOrganizationStateIfInitialized(workspace);
if (!organizationState) {
logger.warn(
{
sessionId,
workspaceId: c.state.workspaceId,
organizationId,
durationMs: roundDurationMs(organizationStartedAt),
},
"build_app_snapshot_organization_uninitialized",
);
return { organizationId, snapshot: null, status: "uninitialized" as const };
}
logger.info(
{
sessionId,
workspaceId: c.state.workspaceId,
organizationId,
durationMs: roundDurationMs(organizationStartedAt),
},
"build_app_snapshot_organization_completed",
);
return { organizationId, snapshot: organizationState.snapshot, status: "ok" as const };
} catch (error) {
const message = errorMessage(error);
if (!message.includes("Actor not found")) {
logger.error(
{
sessionId,
workspaceId: c.state.workspaceId,
organizationId,
durationMs: roundDurationMs(organizationStartedAt),
errorMessage: message,
errorStack: error instanceof Error ? error.stack : undefined,
},
"build_app_snapshot_organization_failed",
);
throw error;
}
logger.info(
{
sessionId,
workspaceId: c.state.workspaceId,
organizationId,
durationMs: roundDurationMs(organizationStartedAt),
},
"build_app_snapshot_organization_missing",
);
return { organizationId, snapshot: null, status: "missing" as const };
}
}),
);
return {
organizations: results.map((result) => result.snapshot).filter((organization): organization is FoundryOrganization => organization !== null),
uninitializedOrganizationIds: results.filter((result) => result.status === "uninitialized").map((result) => result.organizationId),
};
}
async function buildAppSnapshot(c: any, sessionId: string, allowOrganizationRepair = true): Promise<FoundryAppSnapshot> {
assertAppWorkspace(c);
const startedAt = performance.now();
const auth = getBetterAuthService();
@ -252,53 +325,31 @@ async function buildAppSnapshot(c: any, sessionId: string): Promise<FoundryAppSn
"build_app_snapshot_started",
);
const organizations = (
await Promise.all(
eligibleOrganizationIds.map(async (organizationId) => {
const organizationStartedAt = performance.now();
try {
const workspace = await getOrCreateWorkspace(c, organizationId);
const organizationState = await getOrganizationState(workspace);
logger.info(
{
sessionId,
workspaceId: c.state.workspaceId,
organizationId,
durationMs: roundDurationMs(organizationStartedAt),
},
"build_app_snapshot_organization_completed",
);
return organizationState.snapshot;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (!message.includes("Actor not found")) {
logger.error(
{
sessionId,
workspaceId: c.state.workspaceId,
organizationId,
durationMs: roundDurationMs(organizationStartedAt),
errorMessage: message,
errorStack: error instanceof Error ? error.stack : undefined,
},
"build_app_snapshot_organization_failed",
);
throw error;
}
logger.info(
{
sessionId,
workspaceId: c.state.workspaceId,
organizationId,
durationMs: roundDurationMs(organizationStartedAt),
},
"build_app_snapshot_organization_missing",
);
return null;
}
}),
)
).filter((organization): organization is FoundryOrganization => organization !== null);
let { organizations, uninitializedOrganizationIds } = await listSnapshotOrganizations(c, sessionId, eligibleOrganizationIds);
if (allowOrganizationRepair && uninitializedOrganizationIds.length > 0) {
const token = await auth.getAccessTokenForSession(sessionId);
if (token?.accessToken) {
logger.info(
{
sessionId,
workspaceId: c.state.workspaceId,
organizationIds: uninitializedOrganizationIds,
},
"build_app_snapshot_repairing_organizations",
);
await syncGithubOrganizationsInternal(c, { sessionId, accessToken: token.accessToken }, { broadcast: false });
return await buildAppSnapshot(c, sessionId, false);
}
logger.warn(
{
sessionId,
workspaceId: c.state.workspaceId,
organizationIds: uninitializedOrganizationIds,
},
"build_app_snapshot_repair_skipped_no_access_token",
);
}
const currentUser: FoundryUser | null = user
? {
@ -466,6 +517,10 @@ async function safeListInstallations(accessToken: string): Promise<any[]> {
* already returned a redirect to the browser.
*/
export async function syncGithubOrganizations(c: any, input: { sessionId: string; accessToken: string }): Promise<void> {
await syncGithubOrganizationsInternal(c, input, { broadcast: true });
}
async function syncGithubOrganizationsInternal(c: any, input: { sessionId: string; accessToken: string }, options: { broadcast: boolean }): Promise<void> {
assertAppWorkspace(c);
const auth = getBetterAuthService();
const { appShell } = getActorRuntimeContext();
@ -532,7 +587,13 @@ export async function syncGithubOrganizations(c: any, input: { sessionId: string
roleLabel: "GitHub user",
eligibleOrganizationIdsJson: encodeEligibleOrganizationIds(linkedOrganizationIds),
});
c.broadcast("appUpdated", { at: Date.now(), sessionId });
if (!options.broadcast) {
return;
}
c.broadcast("appUpdated", {
type: "appUpdated",
snapshot: await buildAppSnapshot(c, sessionId),
});
}
export async function syncGithubOrganizationRepos(c: any, input: { sessionId: string; organizationId: string }): Promise<void> {
@ -639,6 +700,19 @@ async function listOrganizationRepoCatalog(c: any): Promise<string[]> {
async function buildOrganizationState(c: any) {
const startedAt = performance.now();
const row = await requireOrganizationProfileRow(c);
return await buildOrganizationStateFromRow(c, row, startedAt);
}
async function buildOrganizationStateIfInitialized(c: any) {
const startedAt = performance.now();
const row = await readOrganizationProfileRow(c);
if (!row) {
return null;
}
return await buildOrganizationStateFromRow(c, row, startedAt);
}
async function buildOrganizationStateFromRow(c: any, row: any, startedAt: number) {
const repoCatalog = await listOrganizationRepoCatalog(c);
const members = await listOrganizationMembers(c);
const seatAssignmentEmails = await listOrganizationSeatAssignments(c);
@ -1579,6 +1653,11 @@ export const workspaceAppActions = {
return await buildOrganizationState(c);
},
async getOrganizationShellStateIfInitialized(c: any): Promise<any | null> {
assertOrganizationWorkspace(c);
return await buildOrganizationStateIfInitialized(c);
},
async updateOrganizationShellProfile(c: any, input: Pick<UpdateFoundryOrganizationProfileInput, "displayName" | "slug" | "primaryDomain">): Promise<void> {
assertOrganizationWorkspace(c);
const existing = await requireOrganizationProfileRow(c);

View file

@ -16,6 +16,12 @@ const journal = {
tag: "0001_auth_index_tables",
breakpoints: true,
},
{
idx: 2,
when: 1773720000000,
tag: "0002_task_summaries",
breakpoints: true,
},
],
} as const;
@ -150,6 +156,18 @@ CREATE TABLE IF NOT EXISTS \`auth_verification\` (
\`created_at\` integer NOT NULL,
\`updated_at\` integer NOT NULL
);
`,
m0002: `CREATE TABLE IF NOT EXISTS \`task_summaries\` (
\`task_id\` text PRIMARY KEY NOT NULL,
\`repo_id\` text NOT NULL,
\`title\` text NOT NULL,
\`status\` text NOT NULL,
\`repo_name\` text NOT NULL,
\`updated_at_ms\` integer NOT NULL,
\`branch\` text,
\`pull_request_json\` text,
\`sessions_summary_json\` text DEFAULT '[]' NOT NULL
);
`,
} as const,
};

View file

@ -20,6 +20,23 @@ export const taskLookup = sqliteTable("task_lookup", {
repoId: text("repo_id").notNull(),
});
/**
* Materialized sidebar projection maintained by task actors.
* The source of truth still lives on each task actor; this table exists so
* workspace reads can stay local and avoid fan-out across child actors.
*/
export const taskSummaries = sqliteTable("task_summaries", {
taskId: text("task_id").notNull().primaryKey(),
repoId: text("repo_id").notNull(),
title: text("title").notNull(),
status: text("status").notNull(),
repoName: text("repo_name").notNull(),
updatedAtMs: integer("updated_at_ms").notNull(),
branch: text("branch"),
pullRequestJson: text("pull_request_json"),
sessionsSummaryJson: text("sessions_summary_json").notNull().default("[]"),
});
export const organizationProfile = sqliteTable("organization_profile", {
id: text("id").notNull().primaryKey(),
kind: text("kind").notNull(),