Fix Foundry UI bugs: org names, sessions, and repo selection (#250)

* Fix Foundry auth: migrate to Better Auth adapter, fix access token retrieval

- Remove @ts-nocheck from better-auth.ts, auth-user/index.ts, app-shell.ts
  and fix all type errors
- Fix getAccessTokenForSession: read GitHub token directly from account
  record instead of calling Better Auth's internal /get-access-token
  endpoint which returns 403 on server-side calls
- Re-implement workspaceAuth helper functions (workspaceAuthColumn,
  normalizeAuthValue, workspaceAuthClause, workspaceAuthWhere) that were
  accidentally deleted
- Remove all retry logic (withRetries, isRetryableAppActorError)
- Implement CORS origin allowlist from configured environment
- Document cachedAppWorkspace singleton pattern
- Add inline org sync fallback in buildAppSnapshot for post-OAuth flow
- Add no-retry rule to CLAUDE.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Add Foundry dev panel from fix-git-data branch

Port the dev panel component that was left out when PR #243 was replaced
by PR #247. Adapted to remove runtime/mock-debug references that don't
exist on the current branch.

- Toggle with Shift+D, persists visibility to localStorage
- Shows context, session, GitHub sync status sections
- Dev-only (import.meta.env.DEV)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Add full Docker image defaults, fix actor deadlocks, and improve dev experience

- Add Dockerfile.full and --all flag to install-agent CLI for pre-built images
- Centralize Docker image constant (FULL_IMAGE) pinned to 0.3.1-full
- Remove examples/shared/Dockerfile{,.dev} and daytona snapshot example
- Expand Docker docs with full runnable Dockerfile
- Fix self-deadlock in createWorkbenchSession (fire-and-forget provisioning)
- Audit and convert 12 task actions from wait:true to wait:false
- Add bun --hot for dev backend hot reload
- Remove --force from pnpm install in dev Dockerfile for faster startup
- Add env_file support to compose.dev.yaml for automatic credential loading
- Add mock frontend compose config and dev panel
- Update CLAUDE.md with wait:true policy and dev environment setup

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* WIP: async action fixes and interest manager

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Fix Foundry UI bugs: org names, hanging sessions, and wrong repo creation

- Fix org display name using GitHub description instead of name field
- Fix createWorkbenchSession hanging when sandbox is provisioning
- Fix auto-session creation retry storm on errors
- Fix task creation using wrong repo due to React state race conditions
- Remove Bun hot-reload from backend Dockerfile (causes port drift)
- Add GitHub sync/install status to dev panel

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-13 20:48:22 -07:00 committed by GitHub
parent 58c54156f1
commit d8b8b49f37
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
88 changed files with 9252 additions and 1933 deletions

View file

@ -19,6 +19,7 @@
"@iarna/toml": "^2.2.5",
"@sandbox-agent/foundry-shared": "workspace:*",
"@sandbox-agent/persist-rivet": "workspace:*",
"better-auth": "^1.5.5",
"drizzle-kit": "^0.31.8",
"drizzle-orm": "^0.44.5",
"hono": "^4.11.9",

View file

@ -0,0 +1,5 @@
import { db } from "rivetkit/db/drizzle";
import * as schema from "./schema.js";
import migrations from "./migrations.js";
export const authUserDb = db({ schema, migrations });

View file

@ -0,0 +1,80 @@
// This file is generated by src/actors/_scripts/generate-actor-migrations.ts.
// Source of truth is drizzle-kit output under ./drizzle (meta/_journal.json + *.sql).
// Do not hand-edit this file.
const journal = {
entries: [
{
idx: 0,
when: 1773446400000,
tag: "0000_auth_user",
breakpoints: true,
},
],
} as const;
export default {
journal,
migrations: {
m0000: `CREATE TABLE \`user\` (
\`id\` text PRIMARY KEY NOT NULL,
\`name\` text NOT NULL,
\`email\` text NOT NULL,
\`email_verified\` integer NOT NULL,
\`image\` text,
\`created_at\` integer NOT NULL,
\`updated_at\` integer NOT NULL
);
--> statement-breakpoint
CREATE TABLE \`session\` (
\`id\` text PRIMARY KEY NOT NULL,
\`token\` text NOT NULL,
\`user_id\` text NOT NULL,
\`expires_at\` integer NOT NULL,
\`ip_address\` text,
\`user_agent\` text,
\`created_at\` integer NOT NULL,
\`updated_at\` integer NOT NULL
);
--> statement-breakpoint
CREATE UNIQUE INDEX \`session_token_idx\` ON \`session\` (\`token\`);
--> statement-breakpoint
CREATE TABLE \`account\` (
\`id\` text PRIMARY KEY NOT NULL,
\`account_id\` text NOT NULL,
\`provider_id\` text NOT NULL,
\`user_id\` text NOT NULL,
\`access_token\` text,
\`refresh_token\` text,
\`id_token\` text,
\`access_token_expires_at\` integer,
\`refresh_token_expires_at\` integer,
\`scope\` text,
\`password\` text,
\`created_at\` integer NOT NULL,
\`updated_at\` integer NOT NULL
);
--> statement-breakpoint
CREATE UNIQUE INDEX \`account_provider_account_idx\` ON \`account\` (\`provider_id\`, \`account_id\`);
--> statement-breakpoint
CREATE TABLE \`user_profiles\` (
\`user_id\` text PRIMARY KEY NOT NULL,
\`github_account_id\` text,
\`github_login\` text,
\`role_label\` text NOT NULL,
\`eligible_organization_ids_json\` text NOT NULL,
\`starter_repo_status\` text NOT NULL,
\`starter_repo_starred_at\` integer,
\`starter_repo_skipped_at\` integer,
\`created_at\` integer NOT NULL,
\`updated_at\` integer NOT NULL
);
--> statement-breakpoint
CREATE TABLE \`session_state\` (
\`session_id\` text PRIMARY KEY NOT NULL,
\`active_organization_id\` text,
\`created_at\` integer NOT NULL,
\`updated_at\` integer NOT NULL
);`,
} as const,
};

View file

@ -0,0 +1,70 @@
import { integer, sqliteTable, text, uniqueIndex } from "drizzle-orm/sqlite-core";
export const authUsers = sqliteTable("user", {
id: text("id").notNull().primaryKey(),
name: text("name").notNull(),
email: text("email").notNull(),
emailVerified: integer("email_verified").notNull(),
image: text("image"),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
});
export const authSessions = sqliteTable(
"session",
{
id: text("id").notNull().primaryKey(),
token: text("token").notNull(),
userId: text("user_id").notNull(),
expiresAt: integer("expires_at").notNull(),
ipAddress: text("ip_address"),
userAgent: text("user_agent"),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
},
(table) => ({
tokenIdx: uniqueIndex("session_token_idx").on(table.token),
}),
);
export const authAccounts = sqliteTable(
"account",
{
id: text("id").notNull().primaryKey(),
accountId: text("account_id").notNull(),
providerId: text("provider_id").notNull(),
userId: text("user_id").notNull(),
accessToken: text("access_token"),
refreshToken: text("refresh_token"),
idToken: text("id_token"),
accessTokenExpiresAt: integer("access_token_expires_at"),
refreshTokenExpiresAt: integer("refresh_token_expires_at"),
scope: text("scope"),
password: text("password"),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
},
(table) => ({
providerAccountIdx: uniqueIndex("account_provider_account_idx").on(table.providerId, table.accountId),
}),
);
export const userProfiles = sqliteTable("user_profiles", {
userId: text("user_id").notNull().primaryKey(),
githubAccountId: text("github_account_id"),
githubLogin: text("github_login"),
roleLabel: text("role_label").notNull(),
eligibleOrganizationIdsJson: text("eligible_organization_ids_json").notNull(),
starterRepoStatus: text("starter_repo_status").notNull(),
starterRepoStarredAt: integer("starter_repo_starred_at"),
starterRepoSkippedAt: integer("starter_repo_skipped_at"),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
});
export const sessionState = sqliteTable("session_state", {
sessionId: text("session_id").notNull().primaryKey(),
activeOrganizationId: text("active_organization_id"),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
});

View file

@ -0,0 +1,353 @@
import { and, asc, count as sqlCount, desc, eq, gt, gte, inArray, isNotNull, isNull, like, lt, lte, ne, notInArray, or } from "drizzle-orm";
import { actor } from "rivetkit";
import { authUserDb } from "./db/db.js";
import { authAccounts, authSessions, authUsers, sessionState, userProfiles } from "./db/schema.js";
const tables = {
user: authUsers,
session: authSessions,
account: authAccounts,
userProfiles,
sessionState,
} as const;
function tableFor(model: string) {
const table = tables[model as keyof typeof tables];
if (!table) {
throw new Error(`Unsupported auth user model: ${model}`);
}
return table as any;
}
function columnFor(table: any, field: string) {
const column = table[field];
if (!column) {
throw new Error(`Unsupported auth user field: ${field}`);
}
return column;
}
function normalizeValue(value: unknown): unknown {
if (value instanceof Date) {
return value.getTime();
}
if (Array.isArray(value)) {
return value.map((entry) => normalizeValue(entry));
}
return value;
}
function clauseToExpr(table: any, clause: any) {
const column = columnFor(table, clause.field);
const value = normalizeValue(clause.value);
switch (clause.operator) {
case "ne":
return value === null ? isNotNull(column) : ne(column, value as any);
case "lt":
return lt(column, value as any);
case "lte":
return lte(column, value as any);
case "gt":
return gt(column, value as any);
case "gte":
return gte(column, value as any);
case "in":
return inArray(column, Array.isArray(value) ? (value as any[]) : [value as any]);
case "not_in":
return notInArray(column, Array.isArray(value) ? (value as any[]) : [value as any]);
case "contains":
return like(column, `%${String(value ?? "")}%`);
case "starts_with":
return like(column, `${String(value ?? "")}%`);
case "ends_with":
return like(column, `%${String(value ?? "")}`);
case "eq":
default:
return value === null ? isNull(column) : eq(column, value as any);
}
}
function buildWhere(table: any, where: any[] | undefined) {
if (!where || where.length === 0) {
return undefined;
}
let expr = clauseToExpr(table, where[0]);
for (const clause of where.slice(1)) {
const next = clauseToExpr(table, clause);
expr = clause.connector === "OR" ? or(expr, next) : and(expr, next);
}
return expr;
}
function applyJoinToRow(c: any, model: string, row: any, join: any) {
if (!row || !join) {
return row;
}
if (model === "session" && join.user) {
return c.db
.select()
.from(authUsers)
.where(eq(authUsers.id, row.userId))
.get()
.then((user: any) => ({ ...row, user: user ?? null }));
}
if (model === "account" && join.user) {
return c.db
.select()
.from(authUsers)
.where(eq(authUsers.id, row.userId))
.get()
.then((user: any) => ({ ...row, user: user ?? null }));
}
if (model === "user" && join.account) {
return c.db
.select()
.from(authAccounts)
.where(eq(authAccounts.userId, row.id))
.all()
.then((accounts: any[]) => ({ ...row, account: accounts }));
}
return Promise.resolve(row);
}
async function applyJoinToRows(c: any, model: string, rows: any[], join: any) {
if (!join || rows.length === 0) {
return rows;
}
if (model === "session" && join.user) {
const userIds = [...new Set(rows.map((row) => row.userId).filter(Boolean))];
const users = userIds.length > 0 ? await c.db.select().from(authUsers).where(inArray(authUsers.id, userIds)).all() : [];
const userMap = new Map(users.map((user: any) => [user.id, user]));
return rows.map((row) => ({ ...row, user: userMap.get(row.userId) ?? null }));
}
if (model === "account" && join.user) {
const userIds = [...new Set(rows.map((row) => row.userId).filter(Boolean))];
const users = userIds.length > 0 ? await c.db.select().from(authUsers).where(inArray(authUsers.id, userIds)).all() : [];
const userMap = new Map(users.map((user: any) => [user.id, user]));
return rows.map((row) => ({ ...row, user: userMap.get(row.userId) ?? null }));
}
if (model === "user" && join.account) {
const userIds = rows.map((row) => row.id);
const accounts = userIds.length > 0 ? await c.db.select().from(authAccounts).where(inArray(authAccounts.userId, userIds)).all() : [];
const accountsByUserId = new Map<string, any[]>();
for (const account of accounts) {
const entries = accountsByUserId.get(account.userId) ?? [];
entries.push(account);
accountsByUserId.set(account.userId, entries);
}
return rows.map((row) => ({ ...row, account: accountsByUserId.get(row.id) ?? [] }));
}
return rows;
}
export const authUser = actor({
db: authUserDb,
options: {
name: "Auth User",
icon: "shield",
actionTimeout: 60_000,
},
createState: (_c, input: { userId: string }) => ({
userId: input.userId,
}),
actions: {
async createAuthRecord(c, input: { model: string; data: Record<string, unknown> }) {
const table = tableFor(input.model);
await c.db
.insert(table)
.values(input.data as any)
.run();
return await c.db
.select()
.from(table)
.where(eq(columnFor(table, "id"), input.data.id as any))
.get();
},
async findOneAuthRecord(c, input: { model: string; where: any[]; join?: any }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
const row = predicate ? await c.db.select().from(table).where(predicate).get() : await c.db.select().from(table).get();
return await applyJoinToRow(c, input.model, row ?? null, input.join);
},
async findManyAuthRecords(c, input: { model: string; where?: any[]; limit?: number; offset?: number; sortBy?: any; join?: any }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
let query: any = c.db.select().from(table);
if (predicate) {
query = query.where(predicate);
}
if (input.sortBy?.field) {
const column = columnFor(table, input.sortBy.field);
query = query.orderBy(input.sortBy.direction === "asc" ? asc(column) : desc(column));
}
if (typeof input.limit === "number") {
query = query.limit(input.limit);
}
if (typeof input.offset === "number") {
query = query.offset(input.offset);
}
const rows = await query.all();
return await applyJoinToRows(c, input.model, rows, input.join);
},
async updateAuthRecord(c, input: { model: string; where: any[]; update: Record<string, unknown> }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) {
throw new Error("updateAuthRecord requires a where clause");
}
await c.db
.update(table)
.set(input.update as any)
.where(predicate)
.run();
return await c.db.select().from(table).where(predicate).get();
},
async updateManyAuthRecords(c, input: { model: string; where: any[]; update: Record<string, unknown> }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) {
throw new Error("updateManyAuthRecords requires a where clause");
}
await c.db
.update(table)
.set(input.update as any)
.where(predicate)
.run();
const row = await c.db.select({ value: sqlCount() }).from(table).where(predicate).get();
return row?.value ?? 0;
},
async deleteAuthRecord(c, input: { model: string; where: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) {
throw new Error("deleteAuthRecord requires a where clause");
}
await c.db.delete(table).where(predicate).run();
},
async deleteManyAuthRecords(c, input: { model: string; where: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) {
throw new Error("deleteManyAuthRecords requires a where clause");
}
const rows = await c.db.select().from(table).where(predicate).all();
await c.db.delete(table).where(predicate).run();
return rows.length;
},
async countAuthRecords(c, input: { model: string; where?: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
const row = predicate
? await c.db.select({ value: sqlCount() }).from(table).where(predicate).get()
: await c.db.select({ value: sqlCount() }).from(table).get();
return row?.value ?? 0;
},
async getAppAuthState(c, input: { sessionId: string }) {
const session = await c.db.select().from(authSessions).where(eq(authSessions.id, input.sessionId)).get();
if (!session) {
return null;
}
const [user, profile, currentSessionState, accounts] = await Promise.all([
c.db.select().from(authUsers).where(eq(authUsers.id, session.userId)).get(),
c.db.select().from(userProfiles).where(eq(userProfiles.userId, session.userId)).get(),
c.db.select().from(sessionState).where(eq(sessionState.sessionId, input.sessionId)).get(),
c.db.select().from(authAccounts).where(eq(authAccounts.userId, session.userId)).all(),
]);
return {
session,
user,
profile: profile ?? null,
sessionState: currentSessionState ?? null,
accounts,
};
},
async upsertUserProfile(
c,
input: {
userId: string;
patch: {
githubAccountId?: string | null;
githubLogin?: string | null;
roleLabel?: string;
eligibleOrganizationIdsJson?: string;
starterRepoStatus?: string;
starterRepoStarredAt?: number | null;
starterRepoSkippedAt?: number | null;
};
},
) {
const now = Date.now();
await c.db
.insert(userProfiles)
.values({
userId: input.userId,
githubAccountId: input.patch.githubAccountId ?? null,
githubLogin: input.patch.githubLogin ?? null,
roleLabel: input.patch.roleLabel ?? "GitHub user",
eligibleOrganizationIdsJson: input.patch.eligibleOrganizationIdsJson ?? "[]",
starterRepoStatus: input.patch.starterRepoStatus ?? "pending",
starterRepoStarredAt: input.patch.starterRepoStarredAt ?? null,
starterRepoSkippedAt: input.patch.starterRepoSkippedAt ?? null,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: userProfiles.userId,
set: {
...(input.patch.githubAccountId !== undefined ? { githubAccountId: input.patch.githubAccountId } : {}),
...(input.patch.githubLogin !== undefined ? { githubLogin: input.patch.githubLogin } : {}),
...(input.patch.roleLabel !== undefined ? { roleLabel: input.patch.roleLabel } : {}),
...(input.patch.eligibleOrganizationIdsJson !== undefined ? { eligibleOrganizationIdsJson: input.patch.eligibleOrganizationIdsJson } : {}),
...(input.patch.starterRepoStatus !== undefined ? { starterRepoStatus: input.patch.starterRepoStatus } : {}),
...(input.patch.starterRepoStarredAt !== undefined ? { starterRepoStarredAt: input.patch.starterRepoStarredAt } : {}),
...(input.patch.starterRepoSkippedAt !== undefined ? { starterRepoSkippedAt: input.patch.starterRepoSkippedAt } : {}),
updatedAt: now,
},
})
.run();
return await c.db.select().from(userProfiles).where(eq(userProfiles.userId, input.userId)).get();
},
async upsertSessionState(c, input: { sessionId: string; activeOrganizationId: string | null }) {
const now = Date.now();
await c.db
.insert(sessionState)
.values({
sessionId: input.sessionId,
activeOrganizationId: input.activeOrganizationId,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: sessionState.sessionId,
set: {
activeOrganizationId: input.activeOrganizationId,
updatedAt: now,
},
})
.run();
return await c.db.select().from(sessionState).where(eq(sessionState.sessionId, input.sessionId)).get();
},
},
});

View file

@ -1,4 +1,14 @@
import { taskKey, taskStatusSyncKey, historyKey, projectBranchSyncKey, projectKey, projectPrSyncKey, sandboxInstanceKey, workspaceKey } from "./keys.js";
import {
authUserKey,
taskKey,
taskStatusSyncKey,
historyKey,
projectBranchSyncKey,
projectKey,
projectPrSyncKey,
sandboxInstanceKey,
workspaceKey,
} from "./keys.js";
import type { ProviderId } from "@sandbox-agent/foundry-shared";
export function actorClient(c: any) {
@ -11,6 +21,16 @@ export async function getOrCreateWorkspace(c: any, workspaceId: string) {
});
}
export async function getOrCreateAuthUser(c: any, userId: string) {
return await actorClient(c).authUser.getOrCreate(authUserKey(userId), {
createWithInput: { userId },
});
}
export function getAuthUser(c: any, userId: string) {
return actorClient(c).authUser.get(authUserKey(userId));
}
export async function getOrCreateProject(c: any, workspaceId: string, repoId: string, remoteUrl: string) {
return await actorClient(c).project.getOrCreate(projectKey(workspaceId, repoId), {
createWithInput: {
@ -125,3 +145,7 @@ export function selfProject(c: any) {
export function selfSandboxInstance(c: any) {
return actorClient(c).sandboxInstance.getForId(c.actorId);
}
export function selfAuthUser(c: any) {
return actorClient(c).authUser.getForId(c.actorId);
}

View file

@ -1,3 +1,4 @@
import { authUser } from "./auth-user/index.js";
import { setup } from "rivetkit";
import { taskStatusSync } from "./task-status-sync/index.js";
import { task } from "./task/index.js";
@ -22,6 +23,7 @@ export const registry = setup({
baseLogger: logger,
},
use: {
authUser,
workspace,
project,
task,
@ -35,6 +37,7 @@ export const registry = setup({
export * from "./context.js";
export * from "./events.js";
export * from "./auth-user/index.js";
export * from "./task-status-sync/index.js";
export * from "./task/index.js";
export * from "./history/index.js";

View file

@ -4,6 +4,10 @@ export function workspaceKey(workspaceId: string): ActorKey {
return ["ws", workspaceId];
}
export function authUserKey(userId: string): ActorKey {
return ["ws", "app", "user", userId];
}
export function projectKey(workspaceId: string, repoId: string): ActorKey {
return ["ws", workspaceId, "project", repoId];
}

View file

@ -10,7 +10,7 @@ import { foundryRepoClonePath } from "../../services/foundry-paths.js";
import { resolveWorkspaceGithubAuth } from "../../services/github-auth.js";
import { expectQueueResponse } from "../../services/queue.js";
import { withRepoGitLock } from "../../services/repo-git-lock.js";
import { branches, taskIndex, prCache, repoMeta } from "./db/schema.js";
import { branches, taskIndex, prCache, repoActionJobs, repoMeta } from "./db/schema.js";
import { deriveFallbackTitle } from "../../services/create-flow.js";
import { normalizeBaseBranchName } from "../../integrations/git-spice/index.js";
import { sortBranchesForOverview } from "./stack-model.js";
@ -87,6 +87,7 @@ interface BranchSyncResult {
interface RepoOverviewCommand {}
interface RunRepoStackActionCommand {
jobId?: string;
action: RepoStackAction;
branchName?: string;
parentBranch?: string;
@ -133,6 +134,90 @@ async function ensureProjectSyncActors(c: any, localPath: string): Promise<void>
c.state.syncActorsStarted = true;
}
async function ensureRepoActionJobsTable(c: any): Promise<void> {
await c.db.execute(`
CREATE TABLE IF NOT EXISTS repo_action_jobs (
job_id text PRIMARY KEY NOT NULL,
action text NOT NULL,
branch_name text,
parent_branch text,
status text NOT NULL,
message text NOT NULL,
created_at integer NOT NULL,
updated_at integer NOT NULL,
completed_at integer
)
`);
}
async function writeRepoActionJob(
c: any,
input: {
jobId: string;
action: RepoStackAction;
branchName: string | null;
parentBranch: string | null;
status: "queued" | "running" | "completed" | "error";
message: string;
createdAt?: number;
completedAt?: number | null;
},
): Promise<void> {
await ensureRepoActionJobsTable(c);
const now = Date.now();
await c.db
.insert(repoActionJobs)
.values({
jobId: input.jobId,
action: input.action,
branchName: input.branchName,
parentBranch: input.parentBranch,
status: input.status,
message: input.message,
createdAt: input.createdAt ?? now,
updatedAt: now,
completedAt: input.completedAt ?? null,
})
.onConflictDoUpdate({
target: repoActionJobs.jobId,
set: {
status: input.status,
message: input.message,
updatedAt: now,
completedAt: input.completedAt ?? null,
},
})
.run();
}
async function listRepoActionJobRows(c: any): Promise<
Array<{
jobId: string;
action: RepoStackAction;
branchName: string | null;
parentBranch: string | null;
status: "queued" | "running" | "completed" | "error";
message: string;
createdAt: number;
updatedAt: number;
completedAt: number | null;
}>
> {
await ensureRepoActionJobsTable(c);
const rows = await c.db.select().from(repoActionJobs).orderBy(desc(repoActionJobs.updatedAt)).limit(20).all();
return rows.map((row: any) => ({
jobId: row.jobId,
action: row.action,
branchName: row.branchName ?? null,
parentBranch: row.parentBranch ?? null,
status: row.status,
message: row.message,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
completedAt: row.completedAt ?? null,
}));
}
async function deleteStaleTaskIndexRow(c: any, taskId: string): Promise<void> {
try {
await c.db.delete(taskIndex).where(eq(taskIndex.taskId, taskId)).run();
@ -359,8 +444,6 @@ async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promise<TaskR
const taskId = randomUUID();
if (onBranch) {
await forceProjectSync(c, localPath);
const branchRow = await c.db.select({ branchName: branches.branchName }).from(branches).where(eq(branches.branchName, onBranch)).get();
if (!branchRow) {
throw new Error(`Branch not found in repo snapshot: ${onBranch}`);
@ -573,14 +656,37 @@ async function runRepoStackActionMutation(c: any, cmd: RunRepoStackActionCommand
const { driver } = getActorRuntimeContext();
const at = Date.now();
const jobId = cmd.jobId ?? randomUUID();
const action = cmd.action;
const branchName = cmd.branchName?.trim() || null;
const parentBranch = cmd.parentBranch?.trim() || null;
await writeRepoActionJob(c, {
jobId,
action,
branchName,
parentBranch,
status: "running",
message: `Running ${action}`,
createdAt: at,
});
if (!(await driver.stack.available(localPath).catch(() => false))) {
await writeRepoActionJob(c, {
jobId,
action,
branchName,
parentBranch,
status: "error",
message: "git-spice is not available for this repo",
createdAt: at,
completedAt: Date.now(),
});
return {
jobId,
action,
executed: false,
status: "error",
message: "git-spice is not available for this repo",
at,
};
@ -615,48 +721,77 @@ async function runRepoStackActionMutation(c: any, cmd: RunRepoStackActionCommand
}
}
await withRepoGitLock(localPath, async () => {
if (action === "sync_repo") {
await driver.stack.syncRepo(localPath);
} else if (action === "restack_repo") {
await driver.stack.restackRepo(localPath);
} else if (action === "restack_subtree") {
await driver.stack.restackSubtree(localPath, branchName!);
} else if (action === "rebase_branch") {
await driver.stack.rebaseBranch(localPath, branchName!);
} else if (action === "reparent_branch") {
await driver.stack.reparentBranch(localPath, branchName!, parentBranch!);
} else {
throw new Error(`Unsupported repo stack action: ${action}`);
}
});
await forceProjectSync(c, localPath);
try {
const history = await getOrCreateHistory(c, c.state.workspaceId, c.state.repoId);
await history.append({
kind: "repo.stack_action",
branchName: branchName ?? null,
payload: {
action,
await withRepoGitLock(localPath, async () => {
if (action === "sync_repo") {
await driver.stack.syncRepo(localPath);
} else if (action === "restack_repo") {
await driver.stack.restackRepo(localPath);
} else if (action === "restack_subtree") {
await driver.stack.restackSubtree(localPath, branchName!);
} else if (action === "rebase_branch") {
await driver.stack.rebaseBranch(localPath, branchName!);
} else if (action === "reparent_branch") {
await driver.stack.reparentBranch(localPath, branchName!, parentBranch!);
} else {
throw new Error(`Unsupported repo stack action: ${action}`);
}
});
try {
const history = await getOrCreateHistory(c, c.state.workspaceId, c.state.repoId);
await history.append({
kind: "repo.stack_action",
branchName: branchName ?? null,
parentBranch: parentBranch ?? null,
},
payload: {
action,
branchName: branchName ?? null,
parentBranch: parentBranch ?? null,
jobId,
},
});
} catch (error) {
logActorWarning("project", "failed appending repo stack history event", {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
action,
error: resolveErrorMessage(error),
});
}
await forceProjectSync(c, localPath);
await writeRepoActionJob(c, {
jobId,
action,
branchName,
parentBranch,
status: "completed",
message: `Completed ${action}`,
createdAt: at,
completedAt: Date.now(),
});
} catch (error) {
logActorWarning("project", "failed appending repo stack history event", {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
const message = resolveErrorMessage(error);
await writeRepoActionJob(c, {
jobId,
action,
error: resolveErrorMessage(error),
branchName,
parentBranch,
status: "error",
message,
createdAt: at,
completedAt: Date.now(),
});
throw error;
}
return {
jobId,
action,
executed: true,
message: `stack action executed: ${action}`,
status: "completed",
message: `Completed ${action}`,
at,
};
}
@ -999,7 +1134,6 @@ export const projectActions = {
async getRepoOverview(c: any, _cmd?: RepoOverviewCommand): Promise<RepoOverview> {
const localPath = await ensureProjectReadyForRead(c);
await ensureTaskIndexHydratedForRead(c);
await forceProjectSync(c, localPath);
const { driver } = getActorRuntimeContext();
const now = Date.now();
@ -1118,6 +1252,9 @@ export const projectActions = {
};
});
const latestBranchSync = await c.db.select({ updatedAt: branches.updatedAt }).from(branches).orderBy(desc(branches.updatedAt)).limit(1).get();
const latestPrSync = await c.db.select({ updatedAt: prCache.updatedAt }).from(prCache).orderBy(desc(prCache.updatedAt)).limit(1).get();
return {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
@ -1125,6 +1262,11 @@ export const projectActions = {
baseRef,
stackAvailable,
fetchedAt: now,
branchSyncAt: latestBranchSync?.updatedAt ?? null,
prSyncAt: latestPrSync?.updatedAt ?? null,
branchSyncStatus: latestBranchSync ? "synced" : "pending",
prSyncStatus: latestPrSync ? "synced" : "pending",
repoActionJobs: await listRepoActionJobRows(c),
branches: branchRows,
};
},
@ -1156,12 +1298,41 @@ export const projectActions = {
async runRepoStackAction(c: any, cmd: RunRepoStackActionCommand): Promise<RepoStackActionResult> {
const self = selfProject(c);
return expectQueueResponse<RepoStackActionResult>(
await self.send(projectWorkflowQueueName("project.command.runRepoStackAction"), cmd, {
wait: true,
timeout: 12 * 60_000,
}),
const jobId = randomUUID();
const at = Date.now();
const action = cmd.action;
const branchName = cmd.branchName?.trim() || null;
const parentBranch = cmd.parentBranch?.trim() || null;
await writeRepoActionJob(c, {
jobId,
action,
branchName,
parentBranch,
status: "queued",
message: `Queued ${action}`,
createdAt: at,
});
await self.send(
projectWorkflowQueueName("project.command.runRepoStackAction"),
{
...cmd,
jobId,
},
{
wait: false,
},
);
return {
jobId,
action,
executed: true,
status: "queued",
message: `Queued ${action}`,
at,
};
},
async applyPrSyncResult(c: any, body: PrSyncResult): Promise<void> {

View file

@ -42,3 +42,15 @@ export const taskIndex = sqliteTable("task_index", {
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
});
export const repoActionJobs = sqliteTable("repo_action_jobs", {
jobId: text("job_id").notNull().primaryKey(),
action: text("action").notNull(),
branchName: text("branch_name"),
parentBranch: text("parent_branch"),
status: text("status").notNull(),
message: text("message").notNull(),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
completedAt: integer("completed_at"),
});

View file

@ -278,10 +278,12 @@ async function getSandboxAgentClient(c: any) {
});
}
function broadcastProcessesUpdated(c: any): void {
async function broadcastProcessesUpdated(c: any): Promise<void> {
const client = await getSandboxAgentClient(c);
const { processes } = await client.listProcesses();
c.broadcast("processesUpdated", {
sandboxId: c.state.sandboxId,
at: Date.now(),
type: "processesUpdated",
processes,
});
}
@ -475,7 +477,7 @@ export const sandboxInstance = actor({
async createProcess(c: any, request: ProcessCreateRequest): Promise<ProcessInfo> {
const client = await getSandboxAgentClient(c);
const created = await client.createProcess(request);
broadcastProcessesUpdated(c);
await broadcastProcessesUpdated(c);
return created;
},
@ -492,21 +494,21 @@ export const sandboxInstance = actor({
async stopProcess(c: any, request: { processId: string; query?: ProcessSignalQuery }): Promise<ProcessInfo> {
const client = await getSandboxAgentClient(c);
const stopped = await client.stopProcess(request.processId, request.query);
broadcastProcessesUpdated(c);
await broadcastProcessesUpdated(c);
return stopped;
},
async killProcess(c: any, request: { processId: string; query?: ProcessSignalQuery }): Promise<ProcessInfo> {
const client = await getSandboxAgentClient(c);
const killed = await client.killProcess(request.processId, request.query);
broadcastProcessesUpdated(c);
await broadcastProcessesUpdated(c);
return killed;
},
async deleteProcess(c: any, request: { processId: string }): Promise<void> {
const client = await getSandboxAgentClient(c);
await client.deleteProcess(request.processId);
broadcastProcessesUpdated(c);
await broadcastProcessesUpdated(c);
},
async providerState(c: any): Promise<{ providerId: ProviderId; sandboxId: string; state: string; at: number }> {

View file

@ -28,6 +28,10 @@ export const taskRuntime = sqliteTable(
activeSwitchTarget: text("active_switch_target"),
activeCwd: text("active_cwd"),
statusMessage: text("status_message"),
gitStateJson: text("git_state_json"),
gitStateUpdatedAt: integer("git_state_updated_at"),
provisionStage: text("provision_stage"),
provisionStageUpdatedAt: integer("provision_stage_updated_at"),
updatedAt: integer("updated_at").notNull(),
},
(table) => [check("task_runtime_singleton_id_check", sql`${table.id} = 1`)],
@ -46,8 +50,13 @@ export const taskSandboxes = sqliteTable("task_sandboxes", {
export const taskWorkbenchSessions = sqliteTable("task_workbench_sessions", {
sessionId: text("session_id").notNull().primaryKey(),
sandboxSessionId: text("sandbox_session_id"),
sessionName: text("session_name").notNull(),
model: text("model").notNull(),
status: text("status").notNull().default("ready"),
errorMessage: text("error_message"),
transcriptJson: text("transcript_json").notNull().default("[]"),
transcriptUpdatedAt: integer("transcript_updated_at"),
unread: integer("unread").notNull().default(0),
draftText: text("draft_text").notNull().default(""),
// Structured by the workbench composer attachment payload format.

View file

@ -19,7 +19,9 @@ import {
changeWorkbenchModel,
closeWorkbenchSession,
createWorkbenchSession,
getWorkbenchTask,
getSessionDetail,
getTaskDetail,
getTaskSummary,
markWorkbenchUnread,
publishWorkbenchPr,
renameWorkbenchBranch,
@ -144,14 +146,9 @@ export const task = actor({
async provision(c, cmd: InitializeCommand): Promise<{ ok: true }> {
const self = selfTask(c);
const result = await self.send(taskWorkflowQueueName("task.command.provision"), cmd ?? {}, {
wait: true,
timeout: 30 * 60_000,
await self.send(taskWorkflowQueueName("task.command.provision"), cmd ?? {}, {
wait: false,
});
const response = expectQueueResponse<{ ok: boolean; error?: string }>(result);
if (!response.ok) {
throw new Error(response.error ?? "task provisioning failed");
}
return { ok: true };
},
@ -180,47 +177,35 @@ export const task = actor({
async push(c, cmd?: TaskActionCommand): Promise<void> {
const self = selfTask(c);
await self.send(taskWorkflowQueueName("task.command.push"), cmd ?? {}, {
wait: true,
timeout: 180_000,
wait: false,
});
},
async sync(c, cmd?: TaskActionCommand): Promise<void> {
const self = selfTask(c);
await self.send(taskWorkflowQueueName("task.command.sync"), cmd ?? {}, {
wait: true,
timeout: 30_000,
wait: false,
});
},
async merge(c, cmd?: TaskActionCommand): Promise<void> {
const self = selfTask(c);
await self.send(taskWorkflowQueueName("task.command.merge"), cmd ?? {}, {
wait: true,
timeout: 30_000,
wait: false,
});
},
async archive(c, cmd?: TaskActionCommand): Promise<void> {
const self = selfTask(c);
void self
.send(taskWorkflowQueueName("task.command.archive"), cmd ?? {}, {
wait: true,
timeout: 60_000,
})
.catch((error: unknown) => {
c.log.warn({
msg: "archive command failed",
error: error instanceof Error ? error.message : String(error),
});
});
await self.send(taskWorkflowQueueName("task.command.archive"), cmd ?? {}, {
wait: false,
});
},
async kill(c, cmd?: TaskActionCommand): Promise<void> {
const self = selfTask(c);
await self.send(taskWorkflowQueueName("task.command.kill"), cmd ?? {}, {
wait: true,
timeout: 60_000,
wait: false,
});
},
@ -228,8 +213,16 @@ export const task = actor({
return await getCurrentRecord({ db: c.db, state: c.state });
},
async getWorkbench(c) {
return await getWorkbenchTask(c);
async getTaskSummary(c) {
return await getTaskSummary(c);
},
async getTaskDetail(c) {
return await getTaskDetail(c);
},
async getSessionDetail(c, input: { sessionId: string }) {
return await getSessionDetail(c, input.sessionId);
},
async markWorkbenchUnread(c): Promise<void> {
@ -255,8 +248,7 @@ export const task = actor({
async renameWorkbenchBranch(c, input: TaskWorkbenchRenameInput): Promise<void> {
const self = selfTask(c);
await self.send(taskWorkflowQueueName("task.command.workbench.rename_branch"), { value: input.value } satisfies TaskWorkbenchValueCommand, {
wait: true,
timeout: 5 * 60_000,
wait: false,
});
},
@ -335,8 +327,7 @@ export const task = actor({
attachments: input.attachments,
} satisfies TaskWorkbenchSendMessageCommand,
{
wait: true,
timeout: 10 * 60_000,
wait: false,
},
);
},
@ -344,8 +335,7 @@ export const task = actor({
async stopWorkbenchSession(c, input: TaskTabCommand): Promise<void> {
const self = selfTask(c);
await self.send(taskWorkflowQueueName("task.command.workbench.stop_session"), { sessionId: input.tabId } satisfies TaskWorkbenchSessionCommand, {
wait: true,
timeout: 5 * 60_000,
wait: false,
});
},
@ -360,8 +350,7 @@ export const task = actor({
async closeWorkbenchSession(c, input: TaskTabCommand): Promise<void> {
const self = selfTask(c);
await self.send(taskWorkflowQueueName("task.command.workbench.close_session"), { sessionId: input.tabId } satisfies TaskWorkbenchSessionCommand, {
wait: true,
timeout: 5 * 60_000,
wait: false,
});
},
@ -371,8 +360,7 @@ export const task = actor({
taskWorkflowQueueName("task.command.workbench.publish_pr"),
{},
{
wait: true,
timeout: 10 * 60_000,
wait: false,
},
);
},
@ -380,8 +368,7 @@ export const task = actor({
async revertWorkbenchFile(c, input: { path: string }): Promise<void> {
const self = selfTask(c);
await self.send(taskWorkflowQueueName("task.command.workbench.revert_file"), input, {
wait: true,
timeout: 5 * 60_000,
wait: false,
});
},
},

View file

@ -1,4 +1,5 @@
// @ts-nocheck
import { randomUUID } from "node:crypto";
import { basename } from "node:path";
import { asc, eq } from "drizzle-orm";
import { getActorRuntimeContext } from "../context.js";
@ -6,15 +7,30 @@ import { getOrCreateTaskStatusSync, getOrCreateProject, getOrCreateWorkspace, ge
import { resolveWorkspaceGithubAuth } from "../../services/github-auth.js";
import { task as taskTable, taskRuntime, taskWorkbenchSessions } from "./db/schema.js";
import { getCurrentRecord } from "./workflow/common.js";
import { taskWorkflowQueueName } from "./workflow/queue.js";
const STATUS_SYNC_INTERVAL_MS = 1_000;
function emptyGitState() {
return {
fileChanges: [],
diffs: {},
fileTree: [],
updatedAt: null as number | null,
};
}
async function ensureWorkbenchSessionTable(c: any): Promise<void> {
await c.db.execute(`
CREATE TABLE IF NOT EXISTS task_workbench_sessions (
session_id text PRIMARY KEY NOT NULL,
sandbox_session_id text,
session_name text NOT NULL,
model text NOT NULL,
status text DEFAULT 'ready' NOT NULL,
error_message text,
transcript_json text DEFAULT '[]' NOT NULL,
transcript_updated_at integer,
unread integer DEFAULT 0 NOT NULL,
draft_text text DEFAULT '' NOT NULL,
draft_attachments_json text DEFAULT '[]' NOT NULL,
@ -26,6 +42,18 @@ async function ensureWorkbenchSessionTable(c: any): Promise<void> {
updated_at integer NOT NULL
)
`);
await c.db.execute(`ALTER TABLE task_workbench_sessions ADD COLUMN sandbox_session_id text`).catch(() => {});
await c.db.execute(`ALTER TABLE task_workbench_sessions ADD COLUMN status text DEFAULT 'ready' NOT NULL`).catch(() => {});
await c.db.execute(`ALTER TABLE task_workbench_sessions ADD COLUMN error_message text`).catch(() => {});
await c.db.execute(`ALTER TABLE task_workbench_sessions ADD COLUMN transcript_json text DEFAULT '[]' NOT NULL`).catch(() => {});
await c.db.execute(`ALTER TABLE task_workbench_sessions ADD COLUMN transcript_updated_at integer`).catch(() => {});
}
async function ensureTaskRuntimeCacheColumns(c: any): Promise<void> {
await c.db.execute(`ALTER TABLE task_runtime ADD COLUMN git_state_json text`).catch(() => {});
await c.db.execute(`ALTER TABLE task_runtime ADD COLUMN git_state_updated_at integer`).catch(() => {});
await c.db.execute(`ALTER TABLE task_runtime ADD COLUMN provision_stage text`).catch(() => {});
await c.db.execute(`ALTER TABLE task_runtime ADD COLUMN provision_stage_updated_at integer`).catch(() => {});
}
function defaultModelForAgent(agentType: string | null | undefined) {
@ -74,6 +102,40 @@ function parseDraftAttachments(value: string | null | undefined): Array<any> {
}
}
function parseTranscript(value: string | null | undefined): Array<any> {
if (!value) {
return [];
}
try {
const parsed = JSON.parse(value) as unknown;
return Array.isArray(parsed) ? parsed : [];
} catch {
return [];
}
}
function parseGitState(value: string | null | undefined): { fileChanges: Array<any>; diffs: Record<string, string>; fileTree: Array<any> } {
if (!value) {
return emptyGitState();
}
try {
const parsed = JSON.parse(value) as {
fileChanges?: unknown;
diffs?: unknown;
fileTree?: unknown;
};
return {
fileChanges: Array.isArray(parsed.fileChanges) ? parsed.fileChanges : [],
diffs: parsed.diffs && typeof parsed.diffs === "object" ? (parsed.diffs as Record<string, string>) : {},
fileTree: Array.isArray(parsed.fileTree) ? parsed.fileTree : [],
};
} catch {
return emptyGitState();
}
}
export function shouldMarkSessionUnreadForStatus(meta: { thinkingSinceMs?: number | null }, status: "running" | "idle" | "error"): boolean {
if (status === "running") {
return false;
@ -90,7 +152,13 @@ async function listSessionMetaRows(c: any, options?: { includeClosed?: boolean }
const mapped = rows.map((row: any) => ({
...row,
id: row.sessionId,
sessionId: row.sessionId,
sessionId: row.sandboxSessionId ?? null,
tabId: row.sessionId,
sandboxSessionId: row.sandboxSessionId ?? null,
status: row.status ?? "ready",
errorMessage: row.errorMessage ?? null,
transcript: parseTranscript(row.transcriptJson),
transcriptUpdatedAt: row.transcriptUpdatedAt ?? null,
draftAttachments: parseDraftAttachments(row.draftAttachmentsJson),
draftUpdatedAtMs: row.draftUpdatedAt ?? null,
unread: row.unread === 1,
@ -121,7 +189,13 @@ async function readSessionMeta(c: any, sessionId: string): Promise<any | null> {
return {
...row,
id: row.sessionId,
sessionId: row.sessionId,
sessionId: row.sandboxSessionId ?? null,
tabId: row.sessionId,
sandboxSessionId: row.sandboxSessionId ?? null,
status: row.status ?? "ready",
errorMessage: row.errorMessage ?? null,
transcript: parseTranscript(row.transcriptJson),
transcriptUpdatedAt: row.transcriptUpdatedAt ?? null,
draftAttachments: parseDraftAttachments(row.draftAttachmentsJson),
draftUpdatedAtMs: row.draftUpdatedAt ?? null,
unread: row.unread === 1,
@ -133,14 +207,18 @@ async function readSessionMeta(c: any, sessionId: string): Promise<any | null> {
async function ensureSessionMeta(
c: any,
params: {
sessionId: string;
tabId: string;
sandboxSessionId?: string | null;
model?: string;
sessionName?: string;
unread?: boolean;
created?: boolean;
status?: "pending_provision" | "pending_session_create" | "ready" | "error";
errorMessage?: string | null;
},
): Promise<any> {
await ensureWorkbenchSessionTable(c);
const existing = await readSessionMeta(c, params.sessionId);
const existing = await readSessionMeta(c, params.tabId);
if (existing) {
return existing;
}
@ -153,14 +231,19 @@ async function ensureSessionMeta(
await c.db
.insert(taskWorkbenchSessions)
.values({
sessionId: params.sessionId,
sessionId: params.tabId,
sandboxSessionId: params.sandboxSessionId ?? null,
sessionName,
model,
status: params.status ?? "ready",
errorMessage: params.errorMessage ?? null,
transcriptJson: "[]",
transcriptUpdatedAt: null,
unread: unread ? 1 : 0,
draftText: "",
draftAttachmentsJson: "[]",
draftUpdatedAt: null,
created: 1,
created: params.created === false ? 0 : 1,
closed: 0,
thinkingSinceMs: null,
createdAt: now,
@ -168,25 +251,40 @@ async function ensureSessionMeta(
})
.run();
return await readSessionMeta(c, params.sessionId);
return await readSessionMeta(c, params.tabId);
}
async function updateSessionMeta(c: any, sessionId: string, values: Record<string, unknown>): Promise<any> {
await ensureSessionMeta(c, { sessionId });
async function updateSessionMeta(c: any, tabId: string, values: Record<string, unknown>): Promise<any> {
await ensureSessionMeta(c, { tabId });
await c.db
.update(taskWorkbenchSessions)
.set({
...values,
updatedAt: Date.now(),
})
.where(eq(taskWorkbenchSessions.sessionId, sessionId))
.where(eq(taskWorkbenchSessions.sessionId, tabId))
.run();
return await readSessionMeta(c, sessionId);
return await readSessionMeta(c, tabId);
}
async function notifyWorkbenchUpdated(c: any): Promise<void> {
const workspace = await getOrCreateWorkspace(c, c.state.workspaceId);
await workspace.notifyWorkbenchUpdated({});
async function readSessionMetaBySandboxSessionId(c: any, sandboxSessionId: string): Promise<any | null> {
await ensureWorkbenchSessionTable(c);
const row = await c.db.select().from(taskWorkbenchSessions).where(eq(taskWorkbenchSessions.sandboxSessionId, sandboxSessionId)).get();
if (!row) {
return null;
}
return await readSessionMeta(c, row.sessionId);
}
async function requireReadySessionMeta(c: any, tabId: string): Promise<any> {
const meta = await readSessionMeta(c, tabId);
if (!meta) {
throw new Error(`Unknown workbench tab: ${tabId}`);
}
if (meta.status !== "ready" || !meta.sandboxSessionId) {
throw new Error(meta.errorMessage ?? "This workbench tab is still preparing");
}
return meta;
}
function shellFragment(parts: string[]): string {
@ -333,17 +431,6 @@ async function collectWorkbenchGitState(c: any, record: any) {
label: "git diff numstat",
});
const numstat = parseNumstat(numstatResult.result);
const diffs: Record<string, string> = {};
for (const row of statusRows) {
const diffResult = await executeInSandbox(c, {
sandboxId: activeSandboxId,
cwd,
command: `if git ls-files --error-unmatch -- ${JSON.stringify(row.path)} >/dev/null 2>&1; then git diff -- ${JSON.stringify(row.path)}; else git diff --no-index -- /dev/null ${JSON.stringify(row.path)} || true; fi`,
label: `git diff ${row.path}`,
});
diffs[row.path] = diffResult.result;
}
const filesResult = await executeInSandbox(c, {
sandboxId: activeSandboxId,
@ -356,6 +443,17 @@ async function collectWorkbenchGitState(c: any, record: any) {
.map((line) => line.trim())
.filter(Boolean);
const diffs: Record<string, string> = {};
for (const row of statusRows) {
const diffResult = await executeInSandbox(c, {
sandboxId: activeSandboxId,
cwd,
command: `git diff -- ${JSON.stringify(row.path)}`,
label: `git diff ${row.path}`,
});
diffs[row.path] = diffResult.exitCode === 0 ? diffResult.result : "";
}
return {
fileChanges: statusRows.map((row) => {
const counts = numstat.get(row.path) ?? { added: 0, removed: 0 };
@ -371,6 +469,37 @@ async function collectWorkbenchGitState(c: any, record: any) {
};
}
async function readCachedGitState(c: any): Promise<{ fileChanges: Array<any>; diffs: Record<string, string>; fileTree: Array<any>; updatedAt: number | null }> {
await ensureTaskRuntimeCacheColumns(c);
const row = await c.db
.select({
gitStateJson: taskRuntime.gitStateJson,
gitStateUpdatedAt: taskRuntime.gitStateUpdatedAt,
})
.from(taskRuntime)
.where(eq(taskRuntime.id, 1))
.get();
const parsed = parseGitState(row?.gitStateJson);
return {
...parsed,
updatedAt: row?.gitStateUpdatedAt ?? null,
};
}
async function writeCachedGitState(c: any, gitState: { fileChanges: Array<any>; diffs: Record<string, string>; fileTree: Array<any> }): Promise<void> {
await ensureTaskRuntimeCacheColumns(c);
const now = Date.now();
await c.db
.update(taskRuntime)
.set({
gitStateJson: JSON.stringify(gitState),
gitStateUpdatedAt: now,
updatedAt: now,
})
.where(eq(taskRuntime.id, 1))
.run();
}
async function readSessionTranscript(c: any, record: any, sessionId: string) {
const sandboxId = record.activeSandboxId ?? record.sandboxes?.[0]?.sandboxId ?? null;
if (!sandboxId) {
@ -380,7 +509,7 @@ async function readSessionTranscript(c: any, record: any, sessionId: string) {
const sandbox = getSandboxInstance(c, c.state.workspaceId, c.state.providerId, sandboxId);
const page = await sandbox.listSessionEvents({
sessionId,
limit: 500,
limit: 100,
});
return page.items.map((event: any) => ({
id: event.id,
@ -393,14 +522,50 @@ async function readSessionTranscript(c: any, record: any, sessionId: string) {
}));
}
async function activeSessionStatus(c: any, record: any, sessionId: string) {
if (record.activeSessionId !== sessionId || !record.activeSandboxId) {
async function writeSessionTranscript(c: any, tabId: string, transcript: Array<any>): Promise<void> {
await updateSessionMeta(c, tabId, {
transcriptJson: JSON.stringify(transcript),
transcriptUpdatedAt: Date.now(),
});
}
async function enqueueWorkbenchRefresh(
c: any,
command: "task.command.workbench.refresh_derived" | "task.command.workbench.refresh_session_transcript",
body: Record<string, unknown>,
): Promise<void> {
const self = selfTask(c);
await self.send(command, body, { wait: false });
}
async function maybeScheduleWorkbenchRefreshes(c: any, record: any, sessions: Array<any>): Promise<void> {
const gitState = await readCachedGitState(c);
if (record.activeSandboxId && !gitState.updatedAt) {
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_derived", {});
}
for (const session of sessions) {
if (session.closed || session.status !== "ready" || !session.sandboxSessionId || session.transcriptUpdatedAt) {
continue;
}
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", {
sessionId: session.sandboxSessionId,
});
}
}
function activeSessionStatus(record: any, sessionId: string) {
if (record.activeSessionId !== sessionId) {
return "idle";
}
const sandbox = getSandboxInstance(c, c.state.workspaceId, c.state.providerId, record.activeSandboxId);
const status = await sandbox.sessionStatus({ sessionId });
return status.status;
if (record.status === "running") {
return "running";
}
if (record.status === "error") {
return "error";
}
return "idle";
}
async function readPullRequestSummary(c: any, branchName: string | null) {
@ -417,51 +582,75 @@ async function readPullRequestSummary(c: any, branchName: string | null) {
}
export async function ensureWorkbenchSeeded(c: any): Promise<any> {
await ensureTaskRuntimeCacheColumns(c);
const record = await getCurrentRecord({ db: c.db, state: c.state });
if (record.activeSessionId) {
await ensureSessionMeta(c, {
sessionId: record.activeSessionId,
tabId: record.activeSessionId,
sandboxSessionId: record.activeSessionId,
model: defaultModelForAgent(record.agentType),
sessionName: "Session 1",
status: "ready",
});
}
return record;
}
export async function getWorkbenchTask(c: any): Promise<any> {
const record = await ensureWorkbenchSeeded(c);
const gitState = await collectWorkbenchGitState(c, record);
const sessions = await listSessionMetaRows(c);
const tabs = [];
for (const meta of sessions) {
const status = await activeSessionStatus(c, record, meta.sessionId);
let thinkingSinceMs = meta.thinkingSinceMs ?? null;
let unread = Boolean(meta.unread);
if (thinkingSinceMs && status !== "running") {
thinkingSinceMs = null;
unread = true;
}
tabs.push({
id: meta.id,
sessionId: meta.sessionId,
sessionName: meta.sessionName,
agent: agentKindForModel(meta.model),
model: meta.model,
status,
thinkingSinceMs: status === "running" ? thinkingSinceMs : null,
unread,
created: Boolean(meta.created),
draft: {
text: meta.draftText ?? "",
attachments: Array.isArray(meta.draftAttachments) ? meta.draftAttachments : [],
updatedAtMs: meta.draftUpdatedAtMs ?? null,
},
transcript: await readSessionTranscript(c, record, meta.sessionId),
});
function buildSessionSummary(record: any, meta: any): any {
const derivedSandboxSessionId = meta.sandboxSessionId ?? (meta.status === "pending_provision" && record.activeSessionId ? record.activeSessionId : null);
const sessionStatus =
meta.status === "ready" && derivedSandboxSessionId ? activeSessionStatus(record, derivedSandboxSessionId) : meta.status === "error" ? "error" : "idle";
let thinkingSinceMs = meta.thinkingSinceMs ?? null;
let unread = Boolean(meta.unread);
if (thinkingSinceMs && sessionStatus !== "running") {
thinkingSinceMs = null;
unread = true;
}
return {
id: meta.id,
sessionId: derivedSandboxSessionId,
sessionName: meta.sessionName,
agent: agentKindForModel(meta.model),
model: meta.model,
status: sessionStatus,
thinkingSinceMs: sessionStatus === "running" ? thinkingSinceMs : null,
unread,
created: Boolean(meta.created || derivedSandboxSessionId),
};
}
function buildSessionDetailFromMeta(record: any, meta: any): any {
const summary = buildSessionSummary(record, meta);
return {
sessionId: meta.tabId,
tabId: meta.tabId,
sandboxSessionId: summary.sessionId,
sessionName: summary.sessionName,
agent: summary.agent,
model: summary.model,
status: summary.status,
thinkingSinceMs: summary.thinkingSinceMs,
unread: summary.unread,
created: summary.created,
draft: {
text: meta.draftText ?? "",
attachments: Array.isArray(meta.draftAttachments) ? meta.draftAttachments : [],
updatedAtMs: meta.draftUpdatedAtMs ?? null,
},
transcript: meta.transcript ?? [],
};
}
/**
* Builds a WorkbenchTaskSummary from local task actor state. Task actors push
* this to the parent workspace actor so workspace sidebar reads stay local.
*/
export async function buildTaskSummary(c: any): Promise<any> {
const record = await ensureWorkbenchSeeded(c);
const sessions = await listSessionMetaRows(c);
await maybeScheduleWorkbenchRefreshes(c, record, sessions);
return {
id: c.state.taskId,
repoId: c.state.repoId,
@ -471,14 +660,112 @@ export async function getWorkbenchTask(c: any): Promise<any> {
updatedAtMs: record.updatedAt,
branch: record.branchName,
pullRequest: await readPullRequestSummary(c, record.branchName),
tabs,
sessionsSummary: sessions.map((meta) => buildSessionSummary(record, meta)),
};
}
/**
* Builds a WorkbenchTaskDetail from local task actor state for direct task
* subscribers. This is a full replacement payload, not a patch.
*/
export async function buildTaskDetail(c: any): Promise<any> {
const record = await ensureWorkbenchSeeded(c);
const gitState = await readCachedGitState(c);
const sessions = await listSessionMetaRows(c);
await maybeScheduleWorkbenchRefreshes(c, record, sessions);
const summary = await buildTaskSummary(c);
return {
...summary,
task: record.task,
agentType: record.agentType === "claude" || record.agentType === "codex" ? record.agentType : null,
runtimeStatus: record.status,
statusMessage: record.statusMessage ?? null,
activeSessionId: record.activeSessionId ?? null,
diffStat: record.diffStat ?? null,
prUrl: record.prUrl ?? null,
reviewStatus: record.reviewStatus ?? null,
fileChanges: gitState.fileChanges,
diffs: gitState.diffs,
fileTree: gitState.fileTree,
minutesUsed: 0,
sandboxes: (record.sandboxes ?? []).map((sandbox: any) => ({
providerId: sandbox.providerId,
sandboxId: sandbox.sandboxId,
cwd: sandbox.cwd ?? null,
})),
activeSandboxId: record.activeSandboxId ?? null,
};
}
/**
* Builds a WorkbenchSessionDetail for a specific session tab.
*/
export async function buildSessionDetail(c: any, tabId: string): Promise<any> {
const record = await ensureWorkbenchSeeded(c);
const meta = await readSessionMeta(c, tabId);
if (!meta || meta.closed) {
throw new Error(`Unknown workbench session tab: ${tabId}`);
}
return buildSessionDetailFromMeta(record, meta);
}
export async function getTaskSummary(c: any): Promise<any> {
return await buildTaskSummary(c);
}
export async function getTaskDetail(c: any): Promise<any> {
return await buildTaskDetail(c);
}
export async function getSessionDetail(c: any, tabId: string): Promise<any> {
return await buildSessionDetail(c, tabId);
}
/**
* Replaces the old notifyWorkbenchUpdated pattern.
*
* The task actor emits two kinds of updates:
* - Push summary state up to the parent workspace actor so the sidebar
* materialized projection stays current.
* - Broadcast full detail/session payloads down to direct task subscribers.
*/
export async function broadcastTaskUpdate(c: any, options?: { sessionId?: string }): Promise<void> {
const workspace = await getOrCreateWorkspace(c, c.state.workspaceId);
await workspace.applyTaskSummaryUpdate({ taskSummary: await buildTaskSummary(c) });
c.broadcast("taskUpdated", {
type: "taskDetailUpdated",
detail: await buildTaskDetail(c),
});
if (options?.sessionId) {
c.broadcast("sessionUpdated", {
type: "sessionUpdated",
session: await buildSessionDetail(c, options.sessionId),
});
}
}
export async function refreshWorkbenchDerivedState(c: any): Promise<void> {
const record = await ensureWorkbenchSeeded(c);
const gitState = await collectWorkbenchGitState(c, record);
await writeCachedGitState(c, gitState);
await broadcastTaskUpdate(c);
}
export async function refreshWorkbenchSessionTranscript(c: any, sessionId: string): Promise<void> {
const record = await ensureWorkbenchSeeded(c);
const meta = (await readSessionMetaBySandboxSessionId(c, sessionId)) ?? (await readSessionMeta(c, sessionId));
if (!meta?.sandboxSessionId) {
return;
}
const transcript = await readSessionTranscript(c, record, meta.sandboxSessionId);
await writeSessionTranscript(c, meta.tabId, transcript);
await broadcastTaskUpdate(c, { sessionId: meta.tabId });
}
export async function renameWorkbenchTask(c: any, value: string): Promise<void> {
const nextTitle = value.trim();
if (!nextTitle) {
@ -494,7 +781,7 @@ export async function renameWorkbenchTask(c: any, value: string): Promise<void>
.where(eq(taskTable.id, 1))
.run();
c.state.title = nextTitle;
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}
export async function renameWorkbenchBranch(c: any, value: string): Promise<void> {
@ -545,55 +832,168 @@ export async function renameWorkbenchBranch(c: any, value: string): Promise<void
taskId: c.state.taskId,
branchName: nextBranch,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}
export async function createWorkbenchSession(c: any, model?: string): Promise<{ tabId: string }> {
let record = await ensureWorkbenchSeeded(c);
if (!record.activeSandboxId) {
// Fire-and-forget: enqueue provisioning without waiting to avoid self-deadlock
// (this handler already runs inside the task workflow loop, so wait:true would deadlock).
const providerId = record.providerId ?? c.state.providerId ?? getActorRuntimeContext().providers.defaultProviderId();
await selfTask(c).provision({ providerId });
record = await ensureWorkbenchSeeded(c);
await selfTask(c).send(taskWorkflowQueueName("task.command.provision"), { providerId }, { wait: false });
throw new Error("sandbox is provisioning — retry shortly");
}
if (record.activeSessionId) {
const existingSessions = await listSessionMetaRows(c);
if (existingSessions.length === 0) {
await ensureSessionMeta(c, {
sessionId: record.activeSessionId,
tabId: record.activeSessionId,
sandboxSessionId: record.activeSessionId,
model: model ?? defaultModelForAgent(record.agentType),
sessionName: "Session 1",
status: "ready",
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: record.activeSessionId });
return { tabId: record.activeSessionId };
}
}
if (!record.activeSandboxId) {
throw new Error("cannot create session without an active sandbox");
const tabId = `tab-${randomUUID()}`;
await ensureSessionMeta(c, {
tabId,
model: model ?? defaultModelForAgent(record.agentType),
status: record.activeSandboxId ? "pending_session_create" : "pending_provision",
created: false,
});
const providerId = record.providerId ?? c.state.providerId ?? getActorRuntimeContext().providers.defaultProviderId();
const self = selfTask(c);
if (!record.activeSandboxId && !String(record.status ?? "").startsWith("init_")) {
await self.send("task.command.provision", { providerId }, { wait: false });
}
await self.send(
"task.command.workbench.ensure_session",
{ tabId, ...(model ? { model } : {}) },
{
wait: false,
},
);
await broadcastTaskUpdate(c, { sessionId: tabId });
return { tabId };
}
export async function ensureWorkbenchSession(c: any, tabId: string, model?: string): Promise<void> {
const meta = await readSessionMeta(c, tabId);
if (!meta || meta.closed) {
return;
}
const record = await ensureWorkbenchSeeded(c);
if (!record.activeSandboxId) {
await updateSessionMeta(c, tabId, {
status: "pending_provision",
errorMessage: null,
});
return;
}
if (!meta.sandboxSessionId && record.activeSessionId && meta.status === "pending_provision") {
const existingTabForActiveSession = await readSessionMetaBySandboxSessionId(c, record.activeSessionId);
if (existingTabForActiveSession && existingTabForActiveSession.tabId !== tabId) {
await updateSessionMeta(c, existingTabForActiveSession.tabId, {
closed: 1,
});
}
await updateSessionMeta(c, tabId, {
sandboxSessionId: record.activeSessionId,
status: "ready",
errorMessage: null,
created: 1,
});
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", {
sessionId: record.activeSessionId,
});
await broadcastTaskUpdate(c, { sessionId: tabId });
return;
}
if (meta.sandboxSessionId) {
await updateSessionMeta(c, tabId, {
status: "ready",
errorMessage: null,
});
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", {
sessionId: meta.sandboxSessionId,
});
await broadcastTaskUpdate(c, { sessionId: tabId });
return;
}
const activeSandbox = (record.sandboxes ?? []).find((candidate: any) => candidate.sandboxId === record.activeSandboxId) ?? null;
const cwd = activeSandbox?.cwd ?? record.sandboxes?.[0]?.cwd ?? null;
if (!cwd) {
throw new Error("cannot create session without a sandbox cwd");
await updateSessionMeta(c, tabId, {
status: "error",
errorMessage: "cannot create session without a sandbox cwd",
});
await broadcastTaskUpdate(c, { sessionId: tabId });
return;
}
const sandbox = getSandboxInstance(c, c.state.workspaceId, c.state.providerId, record.activeSandboxId);
const created = await sandbox.createSession({
prompt: "",
cwd,
agent: agentTypeForModel(model ?? defaultModelForAgent(record.agentType)),
await updateSessionMeta(c, tabId, {
status: "pending_session_create",
errorMessage: null,
});
if (!created.id) {
throw new Error(created.error ?? "sandbox-agent session creation failed");
try {
const sandbox = getSandboxInstance(c, c.state.workspaceId, c.state.providerId, record.activeSandboxId);
const created = await sandbox.createSession({
prompt: "",
cwd,
agent: agentTypeForModel(model ?? meta.model ?? defaultModelForAgent(record.agentType)),
});
if (!created.id) {
throw new Error(created.error ?? "sandbox-agent session creation failed");
}
await updateSessionMeta(c, tabId, {
sandboxSessionId: created.id,
status: "ready",
errorMessage: null,
});
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", {
sessionId: created.id,
});
} catch (error) {
await updateSessionMeta(c, tabId, {
status: "error",
errorMessage: error instanceof Error ? error.message : String(error),
});
}
await ensureSessionMeta(c, {
sessionId: created.id,
model: model ?? defaultModelForAgent(record.agentType),
});
await notifyWorkbenchUpdated(c);
return { tabId: created.id };
await broadcastTaskUpdate(c, { sessionId: tabId });
}
export async function enqueuePendingWorkbenchSessions(c: any): Promise<void> {
const self = selfTask(c);
const pending = (await listSessionMetaRows(c, { includeClosed: true })).filter(
(row) => row.closed !== true && row.status !== "ready" && row.status !== "error",
);
for (const row of pending) {
await self.send(
"task.command.workbench.ensure_session",
{
tabId: row.tabId,
model: row.model,
},
{
wait: false,
},
);
}
}
export async function renameWorkbenchSession(c: any, sessionId: string, title: string): Promise<void> {
@ -604,14 +1004,14 @@ export async function renameWorkbenchSession(c: any, sessionId: string, title: s
await updateSessionMeta(c, sessionId, {
sessionName: trimmed,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function setWorkbenchSessionUnread(c: any, sessionId: string, unread: boolean): Promise<void> {
await updateSessionMeta(c, sessionId, {
unread: unread ? 1 : 0,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function updateWorkbenchDraft(c: any, sessionId: string, text: string, attachments: Array<any>): Promise<void> {
@ -620,14 +1020,14 @@ export async function updateWorkbenchDraft(c: any, sessionId: string, text: stri
draftAttachmentsJson: JSON.stringify(attachments),
draftUpdatedAt: Date.now(),
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function changeWorkbenchModel(c: any, sessionId: string, model: string): Promise<void> {
await updateSessionMeta(c, sessionId, {
model,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function sendWorkbenchMessage(c: any, sessionId: string, text: string, attachments: Array<any>): Promise<void> {
@ -636,7 +1036,7 @@ export async function sendWorkbenchMessage(c: any, sessionId: string, text: stri
throw new Error("cannot send message without an active sandbox");
}
await ensureSessionMeta(c, { sessionId });
const meta = await requireReadySessionMeta(c, sessionId);
const sandbox = getSandboxInstance(c, c.state.workspaceId, c.state.providerId, record.activeSandboxId);
const prompt = [text.trim(), ...attachments.map((attachment: any) => `@ ${attachment.filePath}:${attachment.lineNumber}\n${attachment.lineContent}`)]
.filter(Boolean)
@ -646,7 +1046,7 @@ export async function sendWorkbenchMessage(c: any, sessionId: string, text: stri
}
await sandbox.sendPrompt({
sessionId,
sessionId: meta.sandboxSessionId,
prompt,
notification: true,
});
@ -663,25 +1063,28 @@ export async function sendWorkbenchMessage(c: any, sessionId: string, text: stri
await c.db
.update(taskRuntime)
.set({
activeSessionId: sessionId,
activeSessionId: meta.sandboxSessionId,
updatedAt: Date.now(),
})
.where(eq(taskRuntime.id, 1))
.run();
const sync = await getOrCreateTaskStatusSync(c, c.state.workspaceId, c.state.repoId, c.state.taskId, record.activeSandboxId, sessionId, {
const sync = await getOrCreateTaskStatusSync(c, c.state.workspaceId, c.state.repoId, c.state.taskId, record.activeSandboxId, meta.sandboxSessionId, {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
taskId: c.state.taskId,
providerId: c.state.providerId,
sandboxId: record.activeSandboxId,
sessionId,
sessionId: meta.sandboxSessionId,
intervalMs: STATUS_SYNC_INTERVAL_MS,
});
await sync.setIntervalMs({ intervalMs: STATUS_SYNC_INTERVAL_MS });
await sync.start();
await sync.force();
await notifyWorkbenchUpdated(c);
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", {
sessionId: meta.sandboxSessionId,
});
await broadcastTaskUpdate(c, { sessionId });
}
export async function stopWorkbenchSession(c: any, sessionId: string): Promise<void> {
@ -689,20 +1092,21 @@ export async function stopWorkbenchSession(c: any, sessionId: string): Promise<v
if (!record.activeSandboxId) {
return;
}
const meta = await requireReadySessionMeta(c, sessionId);
const sandbox = getSandboxInstance(c, c.state.workspaceId, c.state.providerId, record.activeSandboxId);
await sandbox.cancelSession({ sessionId });
await sandbox.cancelSession({ sessionId: meta.sandboxSessionId });
await updateSessionMeta(c, sessionId, {
thinkingSinceMs: null,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId });
}
export async function syncWorkbenchSessionStatus(c: any, sessionId: string, status: "running" | "idle" | "error", at: number): Promise<void> {
const record = await ensureWorkbenchSeeded(c);
const meta = await ensureSessionMeta(c, { sessionId });
const meta = (await readSessionMetaBySandboxSessionId(c, sessionId)) ?? (await ensureSessionMeta(c, { tabId: sessionId, sandboxSessionId: sessionId }));
let changed = false;
if (record.activeSessionId === sessionId) {
if (record.activeSessionId === sessionId || record.activeSessionId === meta.sandboxSessionId) {
const mappedStatus = status === "running" ? "running" : status === "error" ? "error" : "idle";
if (record.status !== mappedStatus) {
await c.db
@ -753,27 +1157,36 @@ export async function syncWorkbenchSessionStatus(c: any, sessionId: string, stat
}
if (changed) {
await notifyWorkbenchUpdated(c);
if (status !== "running") {
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_session_transcript", {
sessionId,
});
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_derived", {});
}
await broadcastTaskUpdate(c, { sessionId: meta.tabId });
}
}
export async function closeWorkbenchSession(c: any, sessionId: string): Promise<void> {
const record = await ensureWorkbenchSeeded(c);
if (!record.activeSandboxId) {
return;
}
const sessions = await listSessionMetaRows(c);
if (sessions.filter((candidate) => candidate.closed !== true).length <= 1) {
return;
}
const sandbox = getSandboxInstance(c, c.state.workspaceId, c.state.providerId, record.activeSandboxId);
await sandbox.destroySession({ sessionId });
const meta = await readSessionMeta(c, sessionId);
if (!meta) {
return;
}
if (record.activeSandboxId && meta.sandboxSessionId) {
const sandbox = getSandboxInstance(c, c.state.workspaceId, c.state.providerId, record.activeSandboxId);
await sandbox.destroySession({ sessionId: meta.sandboxSessionId });
}
await updateSessionMeta(c, sessionId, {
closed: 1,
thinkingSinceMs: null,
});
if (record.activeSessionId === sessionId) {
if (record.activeSessionId === sessionId || record.activeSessionId === meta.sandboxSessionId) {
await c.db
.update(taskRuntime)
.set({
@ -783,7 +1196,7 @@ export async function closeWorkbenchSession(c: any, sessionId: string): Promise<
.where(eq(taskRuntime.id, 1))
.run();
}
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}
export async function markWorkbenchUnread(c: any): Promise<void> {
@ -792,10 +1205,10 @@ export async function markWorkbenchUnread(c: any): Promise<void> {
if (!latest) {
return;
}
await updateSessionMeta(c, latest.sessionId, {
await updateSessionMeta(c, latest.tabId, {
unread: 1,
});
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c, { sessionId: latest.tabId });
}
export async function publishWorkbenchPr(c: any): Promise<void> {
@ -816,7 +1229,7 @@ export async function publishWorkbenchPr(c: any): Promise<void> {
})
.where(eq(taskTable.id, 1))
.run();
await notifyWorkbenchUpdated(c);
await broadcastTaskUpdate(c);
}
export async function revertWorkbenchFile(c: any, path: string): Promise<void> {
@ -838,5 +1251,6 @@ export async function revertWorkbenchFile(c: any, path: string): Promise<void> {
if (result.exitCode !== 0) {
throw new Error(`file revert failed (${result.exitCode}): ${result.result}`);
}
await notifyWorkbenchUpdated(c);
await enqueueWorkbenchRefresh(c, "task.command.workbench.refresh_derived", {});
await broadcastTaskUpdate(c);
}

View file

@ -1,9 +1,9 @@
// @ts-nocheck
import { eq } from "drizzle-orm";
import type { TaskRecord, TaskStatus } from "@sandbox-agent/foundry-shared";
import { getOrCreateWorkspace } from "../../handles.js";
import { task as taskTable, taskRuntime, taskSandboxes } from "../db/schema.js";
import { historyKey } from "../../keys.js";
import { broadcastTaskUpdate } from "../workbench.js";
export const TASK_ROW_ID = 1;
@ -83,8 +83,7 @@ export async function setTaskState(ctx: any, status: TaskStatus, statusMessage?:
.run();
}
const workspace = await getOrCreateWorkspace(ctx, ctx.state.workspaceId);
await workspace.notifyWorkbenchUpdated({});
await broadcastTaskUpdate(ctx);
}
export async function getCurrentRecord(ctx: any): Promise<TaskRecord> {
@ -176,6 +175,5 @@ export async function appendHistory(ctx: any, kind: string, payload: Record<stri
payload,
});
const workspace = await getOrCreateWorkspace(ctx, ctx.state.workspaceId);
await workspace.notifyWorkbenchUpdated({});
await broadcastTaskUpdate(ctx);
}

View file

@ -8,6 +8,7 @@ import {
initCompleteActivity,
initCreateSandboxActivity,
initCreateSessionActivity,
initEnqueueProvisionActivity,
initEnsureAgentActivity,
initEnsureNameActivity,
initExposeSandboxActivity,
@ -32,6 +33,9 @@ import {
changeWorkbenchModel,
closeWorkbenchSession,
createWorkbenchSession,
ensureWorkbenchSession,
refreshWorkbenchDerivedState,
refreshWorkbenchSessionTranscript,
markWorkbenchUnread,
publishWorkbenchPr,
renameWorkbenchBranch,
@ -56,7 +60,7 @@ const commandHandlers: Record<TaskQueueName, WorkflowHandler> = {
const body = msg.body;
await loopCtx.step("init-bootstrap-db", async () => initBootstrapDbActivity(loopCtx, body));
await loopCtx.removed("init-enqueue-provision", "step");
await loopCtx.step("init-enqueue-provision", async () => initEnqueueProvisionActivity(loopCtx, body));
await loopCtx.removed("init-dispatch-provision-v2", "step");
const currentRecord = await loopCtx.step("init-read-current-record", async () => getCurrentRecord(loopCtx));
@ -164,12 +168,25 @@ const commandHandlers: Record<TaskQueueName, WorkflowHandler> = {
},
"task.command.workbench.create_session": async (loopCtx, msg) => {
const created = await loopCtx.step({
name: "workbench-create-session",
try {
const created = await loopCtx.step({
name: "workbench-create-session",
timeout: 30_000,
run: async () => createWorkbenchSession(loopCtx, msg.body?.model),
});
await msg.complete(created);
} catch (error) {
await msg.complete({ error: resolveErrorMessage(error) });
}
},
"task.command.workbench.ensure_session": async (loopCtx, msg) => {
await loopCtx.step({
name: "workbench-ensure-session",
timeout: 5 * 60_000,
run: async () => createWorkbenchSession(loopCtx, msg.body?.model),
run: async () => ensureWorkbenchSession(loopCtx, msg.body.tabId, msg.body?.model),
});
await msg.complete(created);
await msg.complete({ ok: true });
},
"task.command.workbench.rename_session": async (loopCtx, msg) => {
@ -215,6 +232,24 @@ const commandHandlers: Record<TaskQueueName, WorkflowHandler> = {
await msg.complete({ ok: true });
},
"task.command.workbench.refresh_derived": async (loopCtx, msg) => {
await loopCtx.step({
name: "workbench-refresh-derived",
timeout: 5 * 60_000,
run: async () => refreshWorkbenchDerivedState(loopCtx),
});
await msg.complete({ ok: true });
},
"task.command.workbench.refresh_session_transcript": async (loopCtx, msg) => {
await loopCtx.step({
name: "workbench-refresh-session-transcript",
timeout: 60_000,
run: async () => refreshWorkbenchSessionTranscript(loopCtx, msg.body.sessionId),
});
await msg.complete({ ok: true });
},
"task.command.workbench.close_session": async (loopCtx, msg) => {
await loopCtx.step({
name: "workbench-close-session",

View file

@ -8,6 +8,7 @@ import { logActorWarning, resolveErrorMessage } from "../../logging.js";
import { task as taskTable, taskRuntime, taskSandboxes } from "../db/schema.js";
import { TASK_ROW_ID, appendHistory, buildAgentPrompt, collectErrorMessages, resolveErrorDetail, setTaskState } from "./common.js";
import { taskWorkflowQueueName } from "./queue.js";
import { enqueuePendingWorkbenchSessions } from "../workbench.js";
const DEFAULT_INIT_CREATE_SANDBOX_ACTIVITY_TIMEOUT_MS = 180_000;
@ -34,6 +35,13 @@ function debugInit(loopCtx: any, message: string, context?: Record<string, unkno
});
}
async function ensureTaskRuntimeCacheColumns(db: any): Promise<void> {
await db.execute(`ALTER TABLE task_runtime ADD COLUMN git_state_json text`).catch(() => {});
await db.execute(`ALTER TABLE task_runtime ADD COLUMN git_state_updated_at integer`).catch(() => {});
await db.execute(`ALTER TABLE task_runtime ADD COLUMN provision_stage text`).catch(() => {});
await db.execute(`ALTER TABLE task_runtime ADD COLUMN provision_stage_updated_at integer`).catch(() => {});
}
async function withActivityTimeout<T>(timeoutMs: number, label: string, run: () => Promise<T>): Promise<T> {
let timer: ReturnType<typeof setTimeout> | null = null;
try {
@ -60,6 +68,8 @@ export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise<
const initialStatusMessage = loopCtx.state.branchName && loopCtx.state.title ? "provisioning" : "naming";
try {
await ensureTaskRuntimeCacheColumns(db);
await db
.insert(taskTable)
.values({
@ -96,6 +106,10 @@ export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise<
activeSwitchTarget: null,
activeCwd: null,
statusMessage: initialStatusMessage,
gitStateJson: null,
gitStateUpdatedAt: null,
provisionStage: "queued",
provisionStageUpdatedAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
@ -106,6 +120,8 @@ export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise<
activeSwitchTarget: null,
activeCwd: null,
statusMessage: initialStatusMessage,
provisionStage: "queued",
provisionStageUpdatedAt: now,
updatedAt: now,
},
})
@ -118,19 +134,29 @@ export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise<
export async function initEnqueueProvisionActivity(loopCtx: any, body: any): Promise<void> {
await setTaskState(loopCtx, "init_enqueue_provision", "provision queued");
const self = selfTask(loopCtx);
void self
.send(taskWorkflowQueueName("task.command.provision"), body, {
wait: false,
await loopCtx.db
.update(taskRuntime)
.set({
provisionStage: "queued",
provisionStageUpdatedAt: Date.now(),
updatedAt: Date.now(),
})
.catch((error: unknown) => {
logActorWarning("task.init", "background provision command failed", {
workspaceId: loopCtx.state.workspaceId,
repoId: loopCtx.state.repoId,
taskId: loopCtx.state.taskId,
error: resolveErrorMessage(error),
});
.where(eq(taskRuntime.id, TASK_ROW_ID))
.run();
const self = selfTask(loopCtx);
try {
await self.send(taskWorkflowQueueName("task.command.provision"), body, {
wait: false,
});
} catch (error: unknown) {
logActorWarning("task.init", "background provision command failed", {
workspaceId: loopCtx.state.workspaceId,
repoId: loopCtx.state.repoId,
taskId: loopCtx.state.taskId,
error: resolveErrorMessage(error),
});
throw error;
}
}
export async function initEnsureNameActivity(loopCtx: any): Promise<void> {
@ -197,6 +223,8 @@ export async function initEnsureNameActivity(loopCtx: any): Promise<void> {
.update(taskRuntime)
.set({
statusMessage: "provisioning",
provisionStage: "repo_prepared",
provisionStageUpdatedAt: now,
updatedAt: now,
})
.where(eq(taskRuntime.id, TASK_ROW_ID))
@ -222,6 +250,15 @@ export async function initAssertNameActivity(loopCtx: any): Promise<void> {
export async function initCreateSandboxActivity(loopCtx: any, body: any): Promise<any> {
await setTaskState(loopCtx, "init_create_sandbox", "creating sandbox");
await loopCtx.db
.update(taskRuntime)
.set({
provisionStage: "sandbox_allocated",
provisionStageUpdatedAt: Date.now(),
updatedAt: Date.now(),
})
.where(eq(taskRuntime.id, TASK_ROW_ID))
.run();
const { providers } = getActorRuntimeContext();
const providerId = body?.providerId ?? loopCtx.state.providerId;
const provider = providers.get(providerId);
@ -307,6 +344,15 @@ export async function initCreateSandboxActivity(loopCtx: any, body: any): Promis
export async function initEnsureAgentActivity(loopCtx: any, body: any, sandbox: any): Promise<any> {
await setTaskState(loopCtx, "init_ensure_agent", "ensuring sandbox agent");
await loopCtx.db
.update(taskRuntime)
.set({
provisionStage: "agent_installing",
provisionStageUpdatedAt: Date.now(),
updatedAt: Date.now(),
})
.where(eq(taskRuntime.id, TASK_ROW_ID))
.run();
const { providers } = getActorRuntimeContext();
const providerId = body?.providerId ?? loopCtx.state.providerId;
const provider = providers.get(providerId);
@ -318,6 +364,15 @@ export async function initEnsureAgentActivity(loopCtx: any, body: any, sandbox:
export async function initStartSandboxInstanceActivity(loopCtx: any, body: any, sandbox: any, agent: any): Promise<any> {
await setTaskState(loopCtx, "init_start_sandbox_instance", "starting sandbox runtime");
await loopCtx.db
.update(taskRuntime)
.set({
provisionStage: "agent_starting",
provisionStageUpdatedAt: Date.now(),
updatedAt: Date.now(),
})
.where(eq(taskRuntime.id, TASK_ROW_ID))
.run();
try {
const providerId = body?.providerId ?? loopCtx.state.providerId;
const sandboxInstance = await getOrCreateSandboxInstance(loopCtx, loopCtx.state.workspaceId, providerId, sandbox.sandboxId, {
@ -350,6 +405,15 @@ export async function initStartSandboxInstanceActivity(loopCtx: any, body: any,
export async function initCreateSessionActivity(loopCtx: any, body: any, sandbox: any, sandboxInstanceReady: any): Promise<any> {
await setTaskState(loopCtx, "init_create_session", "creating agent session");
await loopCtx.db
.update(taskRuntime)
.set({
provisionStage: "session_creating",
provisionStageUpdatedAt: Date.now(),
updatedAt: Date.now(),
})
.where(eq(taskRuntime.id, TASK_ROW_ID))
.run();
if (!sandboxInstanceReady.ok) {
return {
id: null,
@ -481,6 +545,8 @@ export async function initWriteDbActivity(
activeSwitchTarget: sandbox.switchTarget,
activeCwd,
statusMessage,
provisionStage: sessionHealthy ? "ready" : "error",
provisionStageUpdatedAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
@ -491,6 +557,8 @@ export async function initWriteDbActivity(
activeSwitchTarget: sandbox.switchTarget,
activeCwd,
statusMessage,
provisionStage: sessionHealthy ? "ready" : "error",
provisionStageUpdatedAt: now,
updatedAt: now,
},
})
@ -535,6 +603,12 @@ export async function initCompleteActivity(loopCtx: any, body: any, sandbox: any
});
loopCtx.state.initialized = true;
await enqueuePendingWorkbenchSessions(loopCtx);
const self = selfTask(loopCtx);
await self.send(taskWorkflowQueueName("task.command.workbench.refresh_derived"), {}, { wait: false });
if (sessionId) {
await self.send(taskWorkflowQueueName("task.command.workbench.refresh_session_transcript"), { sessionId }, { wait: false });
}
return;
}
@ -591,6 +665,8 @@ export async function initFailedActivity(loopCtx: any, error: unknown): Promise<
activeSwitchTarget: null,
activeCwd: null,
statusMessage: detail,
provisionStage: "error",
provisionStageUpdatedAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
@ -601,6 +677,8 @@ export async function initFailedActivity(loopCtx: any, error: unknown): Promise<
activeSwitchTarget: null,
activeCwd: null,
statusMessage: detail,
provisionStage: "error",
provisionStageUpdatedAt: now,
updatedAt: now,
},
})

View file

@ -13,6 +13,7 @@ export const TASK_QUEUE_NAMES = [
"task.command.workbench.rename_task",
"task.command.workbench.rename_branch",
"task.command.workbench.create_session",
"task.command.workbench.ensure_session",
"task.command.workbench.rename_session",
"task.command.workbench.set_session_unread",
"task.command.workbench.update_draft",
@ -20,6 +21,8 @@ export const TASK_QUEUE_NAMES = [
"task.command.workbench.send_message",
"task.command.workbench.stop_session",
"task.command.workbench.sync_session_status",
"task.command.workbench.refresh_derived",
"task.command.workbench.refresh_session_transcript",
"task.command.workbench.close_session",
"task.command.workbench.publish_pr",
"task.command.workbench.revert_file",

View file

@ -4,6 +4,17 @@ import { Loop } from "rivetkit/workflow";
import type {
AddRepoInput,
CreateTaskInput,
HistoryEvent,
HistoryQueryInput,
ListTasksInput,
ProviderId,
RepoOverview,
RepoRecord,
RepoStackActionInput,
RepoStackActionResult,
StarSandboxAgentRepoInput,
StarSandboxAgentRepoResult,
SwitchResult,
TaskRecord,
TaskSummary,
TaskWorkbenchChangeModelInput,
@ -14,20 +25,13 @@ import type {
TaskWorkbenchSelectInput,
TaskWorkbenchSetSessionUnreadInput,
TaskWorkbenchSendMessageInput,
TaskWorkbenchSnapshot,
TaskWorkbenchTabInput,
TaskWorkbenchUpdateDraftInput,
HistoryEvent,
HistoryQueryInput,
ListTasksInput,
ProviderId,
RepoOverview,
RepoStackActionInput,
RepoStackActionResult,
RepoRecord,
StarSandboxAgentRepoInput,
StarSandboxAgentRepoResult,
SwitchResult,
WorkbenchRepoSummary,
WorkbenchSessionSummary,
WorkbenchTaskSummary,
WorkspaceEvent,
WorkspaceSummarySnapshot,
WorkspaceUseInput,
} from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js";
@ -35,7 +39,7 @@ import { getTask, getOrCreateHistory, getOrCreateProject, selfWorkspace } from "
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { normalizeRemoteUrl, repoIdFromRemote } from "../../services/repo.js";
import { resolveWorkspaceGithubAuth } from "../../services/github-auth.js";
import { taskLookup, repos, providerProfiles } from "./db/schema.js";
import { taskLookup, repos, providerProfiles, taskSummaries } from "./db/schema.js";
import { agentTypeForModel } from "../task/workbench.js";
import { expectQueueResponse } from "../../services/queue.js";
import { workspaceAppActions } from "./app-shell.js";
@ -109,6 +113,18 @@ async function upsertTaskLookupRow(c: any, taskId: string, repoId: string): Prom
.run();
}
function parseJsonValue<T>(value: string | null | undefined, fallback: T): T {
if (!value) {
return fallback;
}
try {
return JSON.parse(value) as T;
} catch {
return fallback;
}
}
async function collectAllTaskSummaries(c: any): Promise<TaskSummary[]> {
const repoRows = await c.db.select({ repoId: repos.repoId, remoteUrl: repos.remoteUrl }).from(repos).orderBy(desc(repos.updatedAt)).all();
@ -145,17 +161,55 @@ function repoLabelFromRemote(remoteUrl: string): string {
return remoteUrl;
}
async function buildWorkbenchSnapshot(c: any): Promise<TaskWorkbenchSnapshot> {
function buildRepoSummary(repoRow: { repoId: string; remoteUrl: string; updatedAt: number }, taskRows: WorkbenchTaskSummary[]): WorkbenchRepoSummary {
const repoTasks = taskRows.filter((task) => task.repoId === repoRow.repoId);
const latestActivityMs = repoTasks.reduce((latest, task) => Math.max(latest, task.updatedAtMs), repoRow.updatedAt);
return {
id: repoRow.repoId,
label: repoLabelFromRemote(repoRow.remoteUrl),
taskCount: repoTasks.length,
latestActivityMs,
};
}
function taskSummaryRowFromSummary(taskSummary: WorkbenchTaskSummary) {
return {
taskId: taskSummary.id,
repoId: taskSummary.repoId,
title: taskSummary.title,
status: taskSummary.status,
repoName: taskSummary.repoName,
updatedAtMs: taskSummary.updatedAtMs,
branch: taskSummary.branch,
pullRequestJson: JSON.stringify(taskSummary.pullRequest),
sessionsSummaryJson: JSON.stringify(taskSummary.sessionsSummary),
};
}
function taskSummaryFromRow(row: any): WorkbenchTaskSummary {
return {
id: row.taskId,
repoId: row.repoId,
title: row.title,
status: row.status,
repoName: row.repoName,
updatedAtMs: row.updatedAtMs,
branch: row.branch ?? null,
pullRequest: parseJsonValue(row.pullRequestJson, null),
sessionsSummary: parseJsonValue<WorkbenchSessionSummary[]>(row.sessionsSummaryJson, []),
};
}
async function reconcileWorkbenchProjection(c: any): Promise<WorkspaceSummarySnapshot> {
const repoRows = await c.db
.select({ repoId: repos.repoId, remoteUrl: repos.remoteUrl, updatedAt: repos.updatedAt })
.from(repos)
.orderBy(desc(repos.updatedAt))
.all();
const tasks: Array<any> = [];
const projects: Array<any> = [];
const taskRows: WorkbenchTaskSummary[] = [];
for (const row of repoRows) {
const projectTasks: Array<any> = [];
try {
const project = await getOrCreateProject(c, c.state.workspaceId, row.repoId, row.remoteUrl);
const summaries = await project.listTaskSummaries({ includeArchived: true });
@ -163,11 +217,18 @@ async function buildWorkbenchSnapshot(c: any): Promise<TaskWorkbenchSnapshot> {
try {
await upsertTaskLookupRow(c, summary.taskId, row.repoId);
const task = getTask(c, c.state.workspaceId, row.repoId, summary.taskId);
const snapshot = await task.getWorkbench({});
tasks.push(snapshot);
projectTasks.push(snapshot);
const taskSummary = await task.getTaskSummary({});
taskRows.push(taskSummary);
await c.db
.insert(taskSummaries)
.values(taskSummaryRowFromSummary(taskSummary))
.onConflictDoUpdate({
target: taskSummaries.taskId,
set: taskSummaryRowFromSummary(taskSummary),
})
.run();
} catch (error) {
logActorWarning("workspace", "failed collecting workbench task", {
logActorWarning("workspace", "failed collecting task summary during reconciliation", {
workspaceId: c.state.workspaceId,
repoId: row.repoId,
taskId: summary.taskId,
@ -175,17 +236,8 @@ async function buildWorkbenchSnapshot(c: any): Promise<TaskWorkbenchSnapshot> {
});
}
}
if (projectTasks.length > 0) {
projects.push({
id: row.repoId,
label: repoLabelFromRemote(row.remoteUrl),
updatedAtMs: projectTasks[0]?.updatedAtMs ?? row.updatedAt,
tasks: projectTasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs),
});
}
} catch (error) {
logActorWarning("workspace", "failed collecting workbench repo snapshot", {
logActorWarning("workspace", "failed collecting repo during workbench reconciliation", {
workspaceId: c.state.workspaceId,
repoId: row.repoId,
error: resolveErrorMessage(error),
@ -193,16 +245,11 @@ async function buildWorkbenchSnapshot(c: any): Promise<TaskWorkbenchSnapshot> {
}
}
tasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs);
projects.sort((left, right) => right.updatedAtMs - left.updatedAtMs);
taskRows.sort((left, right) => right.updatedAtMs - left.updatedAtMs);
return {
workspaceId: c.state.workspaceId,
repos: repoRows.map((row) => ({
id: row.repoId,
label: repoLabelFromRemote(row.remoteUrl),
})),
projects,
tasks,
repos: repoRows.map((row) => buildRepoSummary(row, taskRows)).sort((left, right) => right.latestActivityMs - left.latestActivityMs),
taskSummaries: taskRows,
};
}
@ -211,6 +258,41 @@ async function requireWorkbenchTask(c: any, taskId: string) {
return getTask(c, c.state.workspaceId, repoId, taskId);
}
/**
* Reads the workspace sidebar snapshot from the workspace actor's local SQLite
* only. Task actors push summary updates into `task_summaries`, so clients do
* not need this action to fan out to every child actor on the hot read path.
*/
async function getWorkspaceSummarySnapshot(c: any): Promise<WorkspaceSummarySnapshot> {
const repoRows = await c.db
.select({
repoId: repos.repoId,
remoteUrl: repos.remoteUrl,
updatedAt: repos.updatedAt,
})
.from(repos)
.orderBy(desc(repos.updatedAt))
.all();
const taskRows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all();
const summaries = taskRows.map(taskSummaryFromRow);
return {
workspaceId: c.state.workspaceId,
repos: repoRows.map((row) => buildRepoSummary(row, summaries)).sort((left, right) => right.latestActivityMs - left.latestActivityMs),
taskSummaries: summaries,
};
}
async function broadcastRepoSummary(
c: any,
type: "repoAdded" | "repoUpdated",
repoRow: { repoId: string; remoteUrl: string; updatedAt: number },
): Promise<void> {
const matchingTaskRows = await c.db.select().from(taskSummaries).where(eq(taskSummaries.repoId, repoRow.repoId)).all();
const repo = buildRepoSummary(repoRow, matchingTaskRows.map(taskSummaryFromRow));
c.broadcast("workspaceUpdated", { type, repo } satisfies WorkspaceEvent);
}
async function addRepoMutation(c: any, input: AddRepoInput): Promise<RepoRecord> {
assertWorkspace(c, input.workspaceId);
@ -225,6 +307,7 @@ async function addRepoMutation(c: any, input: AddRepoInput): Promise<RepoRecord>
const repoId = repoIdFromRemote(remoteUrl);
const now = Date.now();
const existing = await c.db.select({ repoId: repos.repoId }).from(repos).where(eq(repos.repoId, repoId)).get();
await c.db
.insert(repos)
@ -243,7 +326,11 @@ async function addRepoMutation(c: any, input: AddRepoInput): Promise<RepoRecord>
})
.run();
await workspaceActions.notifyWorkbenchUpdated(c);
await broadcastRepoSummary(c, existing ? "repoUpdated" : "repoAdded", {
repoId,
remoteUrl,
updatedAt: now,
});
return {
workspaceId: c.state.workspaceId,
repoId,
@ -306,10 +393,20 @@ async function createTaskMutation(c: any, input: CreateTaskInput): Promise<TaskR
})
.run();
const task = getTask(c, c.state.workspaceId, repoId, created.taskId);
await task.provision({ providerId });
try {
const task = getTask(c, c.state.workspaceId, repoId, created.taskId);
await workspaceActions.applyTaskSummaryUpdate(c, {
taskSummary: await task.getTaskSummary({}),
});
} catch (error) {
logActorWarning("workspace", "failed seeding task summary after task creation", {
workspaceId: c.state.workspaceId,
repoId,
taskId: created.taskId,
error: resolveErrorMessage(error),
});
}
await workspaceActions.notifyWorkbenchUpdated(c);
return created;
}
@ -465,13 +562,37 @@ export const workspaceActions = {
};
},
async getWorkbench(c: any, input: WorkspaceUseInput): Promise<TaskWorkbenchSnapshot> {
assertWorkspace(c, input.workspaceId);
return await buildWorkbenchSnapshot(c);
/**
* Called by task actors when their summary-level state changes.
* This is the write path for the local materialized projection; clients read
* the projection via `getWorkspaceSummary`, but only task actors should push
* rows into it.
*/
async applyTaskSummaryUpdate(c: any, input: { taskSummary: WorkbenchTaskSummary }): Promise<void> {
await c.db
.insert(taskSummaries)
.values(taskSummaryRowFromSummary(input.taskSummary))
.onConflictDoUpdate({
target: taskSummaries.taskId,
set: taskSummaryRowFromSummary(input.taskSummary),
})
.run();
c.broadcast("workspaceUpdated", { type: "taskSummaryUpdated", taskSummary: input.taskSummary } satisfies WorkspaceEvent);
},
async notifyWorkbenchUpdated(c: any): Promise<void> {
c.broadcast("workbenchUpdated", { at: Date.now() });
async removeTaskSummary(c: any, input: { taskId: string }): Promise<void> {
await c.db.delete(taskSummaries).where(eq(taskSummaries.taskId, input.taskId)).run();
c.broadcast("workspaceUpdated", { type: "taskRemoved", taskId: input.taskId } satisfies WorkspaceEvent);
},
async getWorkspaceSummary(c: any, input: WorkspaceUseInput): Promise<WorkspaceSummarySnapshot> {
assertWorkspace(c, input.workspaceId);
return await getWorkspaceSummarySnapshot(c);
},
async reconcileWorkbenchState(c: any, input: WorkspaceUseInput): Promise<WorkspaceSummarySnapshot> {
assertWorkspace(c, input.workspaceId);
return await reconcileWorkbenchProjection(c);
},
async createWorkbenchTask(c: any, input: TaskWorkbenchCreateTaskInput): Promise<{ taskId: string; tabId?: string }> {
@ -483,11 +604,8 @@ export const workspaceActions = {
...(input.branch ? { explicitBranchName: input.branch } : {}),
...(input.model ? { agentType: agentTypeForModel(input.model) } : {}),
});
const task = await requireWorkbenchTask(c, created.taskId);
const snapshot = await task.getWorkbench({});
return {
taskId: created.taskId,
tabId: snapshot.tabs[0]?.id,
};
},

File diff suppressed because it is too large Load diff

View file

@ -10,6 +10,18 @@ const journal = {
tag: "0000_melted_viper",
breakpoints: true,
},
{
idx: 1,
when: 1773638400000,
tag: "0001_auth_index_tables",
breakpoints: true,
},
{
idx: 2,
when: 1773720000000,
tag: "0002_task_summaries",
breakpoints: true,
},
],
} as const;
@ -113,6 +125,49 @@ CREATE TABLE \`task_lookup\` (
\`task_id\` text PRIMARY KEY NOT NULL,
\`repo_id\` text NOT NULL
);
`,
m0001: `CREATE TABLE IF NOT EXISTS \`auth_session_index\` (
\`session_id\` text PRIMARY KEY NOT NULL,
\`session_token\` text NOT NULL,
\`user_id\` text NOT NULL,
\`created_at\` integer NOT NULL,
\`updated_at\` integer NOT NULL
);
--> statement-breakpoint
CREATE TABLE IF NOT EXISTS \`auth_email_index\` (
\`email\` text PRIMARY KEY NOT NULL,
\`user_id\` text NOT NULL,
\`updated_at\` integer NOT NULL
);
--> statement-breakpoint
CREATE TABLE IF NOT EXISTS \`auth_account_index\` (
\`id\` text PRIMARY KEY NOT NULL,
\`provider_id\` text NOT NULL,
\`account_id\` text NOT NULL,
\`user_id\` text NOT NULL,
\`updated_at\` integer NOT NULL
);
--> statement-breakpoint
CREATE TABLE IF NOT EXISTS \`auth_verification\` (
\`id\` text PRIMARY KEY NOT NULL,
\`identifier\` text NOT NULL,
\`value\` text NOT NULL,
\`expires_at\` integer NOT NULL,
\`created_at\` integer NOT NULL,
\`updated_at\` integer NOT NULL
);
`,
m0002: `CREATE TABLE IF NOT EXISTS \`task_summaries\` (
\`task_id\` text PRIMARY KEY NOT NULL,
\`repo_id\` text NOT NULL,
\`title\` text NOT NULL,
\`status\` text NOT NULL,
\`repo_name\` text NOT NULL,
\`updated_at_ms\` integer NOT NULL,
\`branch\` text,
\`pull_request_json\` text,
\`sessions_summary_json\` text DEFAULT '[]' NOT NULL
);
`,
} as const,
};

View file

@ -20,6 +20,23 @@ export const taskLookup = sqliteTable("task_lookup", {
repoId: text("repo_id").notNull(),
});
/**
* Materialized sidebar projection maintained by task actors.
* The source of truth still lives on each task actor; this table exists so
* workspace reads can stay local and avoid fan-out across child actors.
*/
export const taskSummaries = sqliteTable("task_summaries", {
taskId: text("task_id").notNull().primaryKey(),
repoId: text("repo_id").notNull(),
title: text("title").notNull(),
status: text("status").notNull(),
repoName: text("repo_name").notNull(),
updatedAtMs: integer("updated_at_ms").notNull(),
branch: text("branch"),
pullRequestJson: text("pull_request_json"),
sessionsSummaryJson: text("sessions_summary_json").notNull().default("[]"),
});
export const organizationProfile = sqliteTable("organization_profile", {
id: text("id").notNull().primaryKey(),
kind: text("kind").notNull(),
@ -74,23 +91,33 @@ export const invoices = sqliteTable("invoices", {
createdAt: integer("created_at").notNull(),
});
export const appSessions = sqliteTable("app_sessions", {
export const authSessionIndex = sqliteTable("auth_session_index", {
sessionId: text("session_id").notNull().primaryKey(),
sessionToken: text("session_token").notNull(),
userId: text("user_id").notNull(),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
});
export const authEmailIndex = sqliteTable("auth_email_index", {
email: text("email").notNull().primaryKey(),
userId: text("user_id").notNull(),
updatedAt: integer("updated_at").notNull(),
});
export const authAccountIndex = sqliteTable("auth_account_index", {
id: text("id").notNull().primaryKey(),
currentUserId: text("current_user_id"),
currentUserName: text("current_user_name"),
currentUserEmail: text("current_user_email"),
currentUserGithubLogin: text("current_user_github_login"),
currentUserRoleLabel: text("current_user_role_label"),
// Structured as a JSON array of eligible organization ids for the session.
eligibleOrganizationIdsJson: text("eligible_organization_ids_json").notNull(),
activeOrganizationId: text("active_organization_id"),
githubAccessToken: text("github_access_token"),
githubScope: text("github_scope").notNull(),
starterRepoStatus: text("starter_repo_status").notNull(),
starterRepoStarredAt: integer("starter_repo_starred_at"),
starterRepoSkippedAt: integer("starter_repo_skipped_at"),
oauthState: text("oauth_state"),
oauthStateExpiresAt: integer("oauth_state_expires_at"),
providerId: text("provider_id").notNull(),
accountId: text("account_id").notNull(),
userId: text("user_id").notNull(),
updatedAt: integer("updated_at").notNull(),
});
export const authVerification = sqliteTable("auth_verification", {
id: text("id").notNull().primaryKey(),
identifier: text("identifier").notNull(),
value: text("value").notNull(),
expiresAt: integer("expires_at").notNull(),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
});

View file

@ -10,6 +10,7 @@ import { createDefaultDriver } from "./driver.js";
import { createProviderRegistry } from "./providers/index.js";
import { createClient } from "rivetkit/client";
import type { FoundryBillingPlanId } from "@sandbox-agent/foundry-shared";
import { initBetterAuthService } from "./services/better-auth.js";
import { createDefaultAppShellServices } from "./services/app-shell-runtime.js";
import { APP_SHELL_WORKSPACE_ID } from "./actors/workspace/app-shell.js";
import { logger } from "./logging.js";
@ -39,33 +40,15 @@ interface AppWorkspaceLogContext {
xRealIp?: string;
}
function stripTrailingSlash(value: string): string {
return value.replace(/\/$/, "");
}
function isRivetRequest(request: Request): boolean {
const { pathname } = new URL(request.url);
return pathname === "/v1/rivet" || pathname.startsWith("/v1/rivet/");
}
function isRetryableAppActorError(error: unknown): boolean {
const message = error instanceof Error ? error.message : String(error);
return message.includes("Actor not ready") || message.includes("socket connection was closed unexpectedly");
}
async function withRetries<T>(run: () => Promise<T>, attempts = 20, delayMs = 250): Promise<T> {
let lastError: unknown;
for (let attempt = 1; attempt <= attempts; attempt += 1) {
try {
return await run();
} catch (error) {
lastError = error;
if (!isRetryableAppActorError(error) || attempt === attempts) {
throw error;
}
await new Promise((resolve) => setTimeout(resolve, delayMs));
}
}
throw lastError instanceof Error ? lastError : new Error(String(lastError));
}
export async function startBackend(options: BackendStartOptions = {}): Promise<void> {
// sandbox-agent agent plugins vary on which env var they read for OpenAI/Codex auth.
// Normalize to keep local dev + docker-compose simple.
@ -94,11 +77,16 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
const providers = createProviderRegistry(config, driver);
const backends = await createBackends(config.notify);
const notifications = createNotificationService(backends);
initActorRuntimeContext(config, providers, notifications, driver, createDefaultAppShellServices());
const appShellServices = createDefaultAppShellServices();
initActorRuntimeContext(config, providers, notifications, driver, appShellServices);
const actorClient = createClient({
endpoint: `http://127.0.0.1:${config.backend.port}/v1/rivet`,
}) as any;
const betterAuth = initBetterAuthService(actorClient, {
apiUrl: appShellServices.apiUrl,
appUrl: appShellServices.appUrl,
});
const requestHeaderContext = (c: any): AppWorkspaceLogContext => ({
cfConnectingIp: c.req.header("cf-connecting-ip") ?? undefined,
@ -131,29 +119,18 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
"x-rivet-total-slots",
"x-rivet-runner-name",
"x-rivet-namespace-name",
"x-foundry-session",
];
const exposeHeaders = ["Content-Type", "x-foundry-session", "x-rivet-ray-id"];
app.use(
"/v1/*",
cors({
origin: (origin) => origin ?? "*",
credentials: true,
allowHeaders,
allowMethods: ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
exposeHeaders,
}),
);
app.use(
"/v1",
cors({
origin: (origin) => origin ?? "*",
credentials: true,
allowHeaders,
allowMethods: ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
exposeHeaders,
}),
);
const exposeHeaders = ["Content-Type", "x-rivet-ray-id"];
const allowedOrigins = new Set([stripTrailingSlash(appShellServices.appUrl), stripTrailingSlash(appShellServices.apiUrl)]);
const corsConfig = {
origin: (origin: string) => (allowedOrigins.has(origin) ? origin : null) as string | undefined | null,
credentials: true,
allowHeaders,
allowMethods: ["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
exposeHeaders,
};
app.use("/v1/*", cors(corsConfig));
app.use("/v1", cors(corsConfig));
app.use("*", async (c, next) => {
const requestId = c.req.header("x-request-id")?.trim() || randomUUID();
const start = performance.now();
@ -190,6 +167,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
);
});
// Cache the app workspace actor handle for the lifetime of this backend process.
// The "app" workspace is a singleton coordinator for auth indexes, org state, and
// billing. Caching avoids repeated getOrCreate round-trips on every HTTP request.
let cachedAppWorkspace: any | null = null;
const appWorkspace = async (context: AppWorkspaceLogContext = {}) => {
@ -197,12 +177,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
const start = performance.now();
try {
const handle = await withRetries(
async () =>
await actorClient.workspace.getOrCreate(workspaceKey(APP_SHELL_WORKSPACE_ID), {
createWithInput: APP_SHELL_WORKSPACE_ID,
}),
);
const handle = await actorClient.workspace.getOrCreate(workspaceKey(APP_SHELL_WORKSPACE_ID), {
createWithInput: APP_SHELL_WORKSPACE_ID,
});
cachedAppWorkspace = handle;
logger.info(
{
@ -253,68 +230,70 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
sessionId,
});
const resolveSessionId = async (c: any): Promise<string> => {
const requested = c.req.header("x-foundry-session");
const { sessionId } = await appWorkspaceAction(
"ensureAppSession",
async (workspace) => await workspace.ensureAppSession(requested && requested.trim().length > 0 ? { requestedSessionId: requested } : {}),
requestLogContext(c),
);
c.header("x-foundry-session", sessionId);
return sessionId;
const resolveSessionId = async (c: any): Promise<string | null> => {
const session = await betterAuth.resolveSession(c.req.raw.headers);
return session?.session?.id ?? null;
};
app.get("/v1/app/snapshot", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.json({
auth: { status: "signed_out", currentUserId: null },
activeOrganizationId: null,
onboarding: {
starterRepo: {
repoFullName: "rivet-dev/sandbox-agent",
repoUrl: "https://github.com/rivet-dev/sandbox-agent",
status: "pending",
starredAt: null,
skippedAt: null,
},
},
users: [],
organizations: [],
});
}
return c.json(
await appWorkspaceAction("getAppSnapshot", async (workspace) => await workspace.getAppSnapshot({ sessionId }), requestLogContext(c, sessionId)),
);
});
app.get("/v1/auth/github/start", async (c) => {
const sessionId = await resolveSessionId(c);
const result = await appWorkspaceAction(
"startAppGithubAuth",
async (workspace) => await workspace.startAppGithubAuth({ sessionId }),
requestLogContext(c, sessionId),
);
return Response.redirect(result.url, 302);
app.all("/v1/auth/*", async (c) => {
return await betterAuth.auth.handler(c.req.raw);
});
const handleGithubAuthCallback = async (c: any) => {
// TEMPORARY: dump all request headers to diagnose duplicate callback requests
// (Railway nginx proxy_next_upstream? Cloudflare retry? browser?)
// Remove once root cause is identified.
const allHeaders: Record<string, string> = {};
c.req.raw.headers.forEach((value: string, key: string) => {
allHeaders[key] = value;
});
logger.info({ headers: allHeaders, url: c.req.url }, "github_callback_headers");
const code = c.req.query("code");
const state = c.req.query("state");
if (!code || !state) {
return c.text("Missing GitHub OAuth callback parameters", 400);
}
const result = await appWorkspaceAction(
"completeAppGithubAuth",
async (workspace) => await workspace.completeAppGithubAuth({ code, state }),
requestLogContext(c),
);
c.header("x-foundry-session", result.sessionId);
return Response.redirect(result.redirectTo, 302);
};
app.get("/v1/auth/github/callback", handleGithubAuthCallback);
app.get("/api/auth/callback/github", handleGithubAuthCallback);
app.post("/v1/app/sign-out", async (c) => {
const sessionId = await resolveSessionId(c);
return c.json(await appWorkspaceAction("signOutApp", async (workspace) => await workspace.signOutApp({ sessionId }), requestLogContext(c, sessionId)));
if (sessionId) {
const signOutResponse = await betterAuth.signOut(c.req.raw.headers);
const setCookie = signOutResponse.headers.get("set-cookie");
if (setCookie) {
c.header("set-cookie", setCookie);
}
}
return c.json({
auth: { status: "signed_out", currentUserId: null },
activeOrganizationId: null,
onboarding: {
starterRepo: {
repoFullName: "rivet-dev/sandbox-agent",
repoUrl: "https://github.com/rivet-dev/sandbox-agent",
status: "pending",
starredAt: null,
skippedAt: null,
},
},
users: [],
organizations: [],
});
});
app.post("/v1/app/onboarding/starter-repo/skip", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
return c.json(
await appWorkspaceAction("skipAppStarterRepo", async (workspace) => await workspace.skipAppStarterRepo({ sessionId }), requestLogContext(c, sessionId)),
);
@ -322,6 +301,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.post("/v1/app/organizations/:organizationId/starter-repo/star", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
return c.json(
await appWorkspaceAction(
"starAppStarterRepo",
@ -337,6 +319,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.post("/v1/app/organizations/:organizationId/select", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
return c.json(
await appWorkspaceAction(
"selectAppOrganization",
@ -352,6 +337,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.patch("/v1/app/organizations/:organizationId/profile", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
const body = await c.req.json();
return c.json(
await appWorkspaceAction(
@ -371,6 +359,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.post("/v1/app/organizations/:organizationId/import", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
return c.json(
await appWorkspaceAction(
"triggerAppRepoImport",
@ -386,6 +377,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.post("/v1/app/organizations/:organizationId/reconnect", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
return c.json(
await appWorkspaceAction(
"beginAppGithubInstall",
@ -401,6 +395,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.post("/v1/app/organizations/:organizationId/billing/checkout", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
const body = await c.req.json().catch(() => ({}));
const planId = body?.planId === "free" || body?.planId === "team" ? (body.planId as FoundryBillingPlanId) : "team";
return c.json(
@ -414,11 +411,14 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.get("/v1/billing/checkout/complete", async (c) => {
const organizationId = c.req.query("organizationId");
const sessionId = c.req.query("foundrySession");
const checkoutSessionId = c.req.query("session_id");
if (!organizationId || !sessionId || !checkoutSessionId) {
if (!organizationId || !checkoutSessionId) {
return c.text("Missing Stripe checkout completion parameters", 400);
}
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
const result = await (await appWorkspace(requestLogContext(c, sessionId))).finalizeAppCheckoutSession({
organizationId,
sessionId,
@ -429,6 +429,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.post("/v1/app/organizations/:organizationId/billing/portal", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
return c.json(
await (await appWorkspace(requestLogContext(c, sessionId))).createAppBillingPortalSession({
sessionId,
@ -439,6 +442,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.post("/v1/app/organizations/:organizationId/billing/cancel", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
return c.json(
await (await appWorkspace(requestLogContext(c, sessionId))).cancelAppScheduledRenewal({
sessionId,
@ -449,6 +455,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.post("/v1/app/organizations/:organizationId/billing/resume", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
return c.json(
await (await appWorkspace(requestLogContext(c, sessionId))).resumeAppSubscription({
sessionId,
@ -459,6 +468,9 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
app.post("/v1/app/workspaces/:workspaceId/seat-usage", async (c) => {
const sessionId = await resolveSessionId(c);
if (!sessionId) {
return c.text("Unauthorized", 401);
}
return c.json(
await (await appWorkspace(requestLogContext(c, sessionId))).recordAppSeatUsage({
sessionId,

View file

@ -2,4 +2,5 @@ import { createFoundryLogger } from "@sandbox-agent/foundry-shared";
export const logger = createFoundryLogger({
service: "foundry-backend",
format: "logfmt",
});

View file

@ -262,11 +262,11 @@ export class GitHubAppClient {
}
async listOrganizations(accessToken: string): Promise<GitHubOrgIdentity[]> {
const organizations = await this.paginate<{ id: number; login: string; description?: string | null }>("/user/orgs?per_page=100", accessToken);
const organizations = await this.paginate<{ id: number; login: string; name?: string | null }>("/user/orgs?per_page=100", accessToken);
return organizations.map((organization) => ({
id: String(organization.id),
login: organization.login,
name: organization.description?.trim() || organization.login,
name: organization.name?.trim() || organization.login,
}));
}

View file

@ -0,0 +1,533 @@
import { betterAuth } from "better-auth";
import { createAdapterFactory } from "better-auth/adapters";
import { APP_SHELL_WORKSPACE_ID } from "../actors/workspace/app-shell.js";
import { authUserKey, workspaceKey } from "../actors/keys.js";
import { logger } from "../logging.js";
const AUTH_BASE_PATH = "/v1/auth";
const SESSION_COOKIE = "better-auth.session_token";
let betterAuthService: BetterAuthService | null = null;
function requireEnv(name: string): string {
const value = process.env[name]?.trim();
if (!value) {
throw new Error(`${name} is required`);
}
return value;
}
function stripTrailingSlash(value: string): string {
return value.replace(/\/$/, "");
}
function buildCookieHeaders(sessionToken: string): Headers {
return new Headers({
cookie: `${SESSION_COOKIE}=${encodeURIComponent(sessionToken)}`,
});
}
async function readJsonSafe(response: Response): Promise<any> {
const text = await response.text();
if (!text) {
return null;
}
try {
return JSON.parse(text);
} catch {
return text;
}
}
async function callAuthEndpoint(auth: any, url: string, init?: RequestInit): Promise<Response> {
return await auth.handler(new Request(url, init));
}
function resolveRouteUserId(workspace: any, resolved: any): string | null {
if (!resolved) {
return null;
}
if (typeof resolved === "string") {
return resolved;
}
if (typeof resolved.userId === "string" && resolved.userId.length > 0) {
return resolved.userId;
}
if (typeof resolved.id === "string" && resolved.id.length > 0) {
return resolved.id;
}
return null;
}
export interface BetterAuthService {
auth: any;
resolveSession(headers: Headers): Promise<{ session: any; user: any } | null>;
signOut(headers: Headers): Promise<Response>;
getAuthState(sessionId: string): Promise<any | null>;
upsertUserProfile(userId: string, patch: Record<string, unknown>): Promise<any>;
setActiveOrganization(sessionId: string, activeOrganizationId: string | null): Promise<any>;
getAccessTokenForSession(sessionId: string): Promise<{ accessToken: string; scopes: string[] } | null>;
}
export function initBetterAuthService(actorClient: any, options: { apiUrl: string; appUrl: string }): BetterAuthService {
if (betterAuthService) {
return betterAuthService;
}
// getOrCreate is intentional here: the adapter runs during Better Auth callbacks
// which can fire before any explicit create path. The app workspace and auth user
// actors must exist by the time the adapter needs them.
const appWorkspace = () =>
actorClient.workspace.getOrCreate(workspaceKey(APP_SHELL_WORKSPACE_ID), {
createWithInput: APP_SHELL_WORKSPACE_ID,
});
// getOrCreate is intentional: Better Auth creates user records during OAuth
// callbacks, so the auth-user actor must be lazily provisioned on first access.
const getAuthUser = async (userId: string) =>
await actorClient.authUser.getOrCreate(authUserKey(userId), {
createWithInput: { userId },
});
const adapter = createAdapterFactory({
config: {
adapterId: "rivetkit-actor",
adapterName: "RivetKit Actor Adapter",
supportsBooleans: false,
supportsDates: false,
supportsJSON: false,
},
adapter: ({ transformInput, transformOutput, transformWhereClause }) => {
const resolveUserIdForQuery = async (model: string, where?: any[], data?: Record<string, unknown>): Promise<string | null> => {
const clauses = where ?? [];
const direct = (field: string) => clauses.find((entry) => entry.field === field)?.value;
if (model === "user") {
const fromId = direct("id") ?? data?.id;
if (typeof fromId === "string" && fromId.length > 0) {
return fromId;
}
const email = direct("email");
if (typeof email === "string" && email.length > 0) {
const workspace = await appWorkspace();
const resolved = await workspace.authFindEmailIndex({ email: email.toLowerCase() });
return resolveRouteUserId(workspace, resolved);
}
return null;
}
if (model === "session") {
const fromUserId = direct("userId") ?? data?.userId;
if (typeof fromUserId === "string" && fromUserId.length > 0) {
return fromUserId;
}
const sessionId = direct("id") ?? data?.id;
const sessionToken = direct("token") ?? data?.token;
if (typeof sessionId === "string" || typeof sessionToken === "string") {
const workspace = await appWorkspace();
const resolved = await workspace.authFindSessionIndex({
...(typeof sessionId === "string" ? { sessionId } : {}),
...(typeof sessionToken === "string" ? { sessionToken } : {}),
});
return resolveRouteUserId(workspace, resolved);
}
return null;
}
if (model === "account") {
const fromUserId = direct("userId") ?? data?.userId;
if (typeof fromUserId === "string" && fromUserId.length > 0) {
return fromUserId;
}
const accountRecordId = direct("id") ?? data?.id;
const providerId = direct("providerId") ?? data?.providerId;
const accountId = direct("accountId") ?? data?.accountId;
const workspace = await appWorkspace();
if (typeof accountRecordId === "string" && accountRecordId.length > 0) {
const resolved = await workspace.authFindAccountIndex({ id: accountRecordId });
return resolveRouteUserId(workspace, resolved);
}
if (typeof providerId === "string" && providerId.length > 0 && typeof accountId === "string" && accountId.length > 0) {
const resolved = await workspace.authFindAccountIndex({ providerId, accountId });
return resolveRouteUserId(workspace, resolved);
}
return null;
}
return null;
};
const ensureWorkspaceVerification = async (method: string, payload: Record<string, unknown>) => {
const workspace = await appWorkspace();
return await workspace[method](payload);
};
return {
options: {
useDatabaseGeneratedIds: false,
},
create: async ({ model, data }) => {
const transformed = await transformInput(data, model, "create", true);
if (model === "verification") {
return await ensureWorkspaceVerification("authCreateVerification", { data: transformed });
}
const userId = await resolveUserIdForQuery(model, undefined, transformed);
if (!userId) {
throw new Error(`Unable to resolve auth actor for create(${model})`);
}
const userActor = await getAuthUser(userId);
const created = await userActor.createAuthRecord({ model, data: transformed });
const workspace = await appWorkspace();
if (model === "user" && typeof transformed.email === "string" && transformed.email.length > 0) {
await workspace.authUpsertEmailIndex({
email: transformed.email.toLowerCase(),
userId,
});
}
if (model === "session") {
await workspace.authUpsertSessionIndex({
sessionId: String(created.id),
sessionToken: String(created.token),
userId,
});
}
if (model === "account") {
await workspace.authUpsertAccountIndex({
id: String(created.id),
providerId: String(created.providerId),
accountId: String(created.accountId),
userId,
});
}
return (await transformOutput(created, model)) as any;
},
findOne: async ({ model, where, join }) => {
const transformedWhere = transformWhereClause({ model, where, action: "findOne" });
if (model === "verification") {
return await ensureWorkspaceVerification("authFindOneVerification", { where: transformedWhere, join });
}
const userId = await resolveUserIdForQuery(model, transformedWhere);
if (!userId) {
return null;
}
const userActor = await getAuthUser(userId);
const found = await userActor.findOneAuthRecord({ model, where: transformedWhere, join });
return found ? ((await transformOutput(found, model, undefined, join)) as any) : null;
},
findMany: async ({ model, where, limit, sortBy, offset, join }) => {
const transformedWhere = transformWhereClause({ model, where, action: "findMany" });
if (model === "verification") {
return await ensureWorkspaceVerification("authFindManyVerification", {
where: transformedWhere,
limit,
sortBy,
offset,
join,
});
}
if (model === "session") {
const tokenClause = transformedWhere?.find((entry: any) => entry.field === "token" && entry.operator === "in");
if (tokenClause && Array.isArray(tokenClause.value)) {
const workspace = await appWorkspace();
const resolved = await Promise.all(
(tokenClause.value as string[]).map(async (sessionToken: string) => ({
sessionToken,
route: await workspace.authFindSessionIndex({ sessionToken }),
})),
);
const byUser = new Map<string, string[]>();
for (const item of resolved) {
if (!item.route?.userId) {
continue;
}
const tokens = byUser.get(item.route.userId) ?? [];
tokens.push(item.sessionToken);
byUser.set(item.route.userId, tokens);
}
const rows = [];
for (const [userId, tokens] of byUser) {
const userActor = await getAuthUser(userId);
const scopedWhere = transformedWhere.map((entry: any) =>
entry.field === "token" && entry.operator === "in" ? { ...entry, value: tokens } : entry,
);
const found = await userActor.findManyAuthRecords({ model, where: scopedWhere, limit, sortBy, offset, join });
rows.push(...found);
}
return await Promise.all(rows.map(async (row: any) => await transformOutput(row, model, undefined, join)));
}
}
const userId = await resolveUserIdForQuery(model, transformedWhere);
if (!userId) {
return [];
}
const userActor = await getAuthUser(userId);
const found = await userActor.findManyAuthRecords({ model, where: transformedWhere, limit, sortBy, offset, join });
return await Promise.all(found.map(async (row: any) => await transformOutput(row, model, undefined, join)));
},
update: async ({ model, where, update }) => {
const transformedWhere = transformWhereClause({ model, where, action: "update" });
const transformedUpdate = (await transformInput(update as Record<string, unknown>, model, "update", true)) as Record<string, unknown>;
if (model === "verification") {
return await ensureWorkspaceVerification("authUpdateVerification", { where: transformedWhere, update: transformedUpdate });
}
const userId = await resolveUserIdForQuery(model, transformedWhere, transformedUpdate);
if (!userId) {
return null;
}
const userActor = await getAuthUser(userId);
const before =
model === "user"
? await userActor.findOneAuthRecord({ model, where: transformedWhere })
: model === "account"
? await userActor.findOneAuthRecord({ model, where: transformedWhere })
: model === "session"
? await userActor.findOneAuthRecord({ model, where: transformedWhere })
: null;
const updated = await userActor.updateAuthRecord({ model, where: transformedWhere, update: transformedUpdate });
const workspace = await appWorkspace();
if (model === "user" && updated) {
if (before?.email && before.email !== updated.email) {
await workspace.authDeleteEmailIndex({ email: before.email.toLowerCase() });
}
if (updated.email) {
await workspace.authUpsertEmailIndex({ email: updated.email.toLowerCase(), userId });
}
}
if (model === "session" && updated) {
await workspace.authUpsertSessionIndex({
sessionId: String(updated.id),
sessionToken: String(updated.token),
userId,
});
}
if (model === "account" && updated) {
await workspace.authUpsertAccountIndex({
id: String(updated.id),
providerId: String(updated.providerId),
accountId: String(updated.accountId),
userId,
});
}
return updated ? ((await transformOutput(updated, model)) as any) : null;
},
updateMany: async ({ model, where, update }) => {
const transformedWhere = transformWhereClause({ model, where, action: "updateMany" });
const transformedUpdate = (await transformInput(update as Record<string, unknown>, model, "update", true)) as Record<string, unknown>;
if (model === "verification") {
return await ensureWorkspaceVerification("authUpdateManyVerification", { where: transformedWhere, update: transformedUpdate });
}
const userId = await resolveUserIdForQuery(model, transformedWhere, transformedUpdate);
if (!userId) {
return 0;
}
const userActor = await getAuthUser(userId);
return await userActor.updateManyAuthRecords({ model, where: transformedWhere, update: transformedUpdate });
},
delete: async ({ model, where }) => {
const transformedWhere = transformWhereClause({ model, where, action: "delete" });
if (model === "verification") {
await ensureWorkspaceVerification("authDeleteVerification", { where: transformedWhere });
return;
}
const userId = await resolveUserIdForQuery(model, transformedWhere);
if (!userId) {
return;
}
const userActor = await getAuthUser(userId);
const workspace = await appWorkspace();
const before = await userActor.findOneAuthRecord({ model, where: transformedWhere });
await userActor.deleteAuthRecord({ model, where: transformedWhere });
if (model === "session" && before) {
await workspace.authDeleteSessionIndex({
sessionId: before.id,
sessionToken: before.token,
});
}
if (model === "account" && before) {
await workspace.authDeleteAccountIndex({
id: before.id,
providerId: before.providerId,
accountId: before.accountId,
});
}
if (model === "user" && before?.email) {
await workspace.authDeleteEmailIndex({ email: before.email.toLowerCase() });
}
},
deleteMany: async ({ model, where }) => {
const transformedWhere = transformWhereClause({ model, where, action: "deleteMany" });
if (model === "verification") {
return await ensureWorkspaceVerification("authDeleteManyVerification", { where: transformedWhere });
}
if (model === "session") {
const userId = await resolveUserIdForQuery(model, transformedWhere);
if (!userId) {
return 0;
}
const userActor = await getAuthUser(userId);
const workspace = await appWorkspace();
const sessions = await userActor.findManyAuthRecords({ model, where: transformedWhere, limit: 5000 });
const deleted = await userActor.deleteManyAuthRecords({ model, where: transformedWhere });
for (const session of sessions) {
await workspace.authDeleteSessionIndex({
sessionId: session.id,
sessionToken: session.token,
});
}
return deleted;
}
const userId = await resolveUserIdForQuery(model, transformedWhere);
if (!userId) {
return 0;
}
const userActor = await getAuthUser(userId);
const deleted = await userActor.deleteManyAuthRecords({ model, where: transformedWhere });
return deleted;
},
count: async ({ model, where }) => {
const transformedWhere = transformWhereClause({ model, where, action: "count" });
if (model === "verification") {
return await ensureWorkspaceVerification("authCountVerification", { where: transformedWhere });
}
const userId = await resolveUserIdForQuery(model, transformedWhere);
if (!userId) {
return 0;
}
const userActor = await getAuthUser(userId);
return await userActor.countAuthRecords({ model, where: transformedWhere });
},
};
},
});
const auth = betterAuth({
baseURL: stripTrailingSlash(process.env.BETTER_AUTH_URL ?? options.apiUrl),
basePath: AUTH_BASE_PATH,
secret: requireEnv("BETTER_AUTH_SECRET"),
database: adapter,
trustedOrigins: [stripTrailingSlash(options.appUrl), stripTrailingSlash(options.apiUrl)],
session: {
cookieCache: {
enabled: true,
maxAge: 5 * 60,
strategy: "compact",
},
},
socialProviders: {
github: {
clientId: requireEnv("GITHUB_CLIENT_ID"),
clientSecret: requireEnv("GITHUB_CLIENT_SECRET"),
scope: ["read:org", "repo"],
redirectURI: process.env.GITHUB_REDIRECT_URI || undefined,
},
},
});
betterAuthService = {
auth,
async resolveSession(headers: Headers) {
return (await auth.api.getSession({ headers })) ?? null;
},
async signOut(headers: Headers) {
return await callAuthEndpoint(auth, `${stripTrailingSlash(process.env.BETTER_AUTH_URL ?? options.apiUrl)}${AUTH_BASE_PATH}/sign-out`, {
method: "POST",
headers,
});
},
async getAuthState(sessionId: string) {
const workspace = await appWorkspace();
const route = await workspace.authFindSessionIndex({ sessionId });
if (!route?.userId) {
return null;
}
const userActor = await getAuthUser(route.userId);
return await userActor.getAppAuthState({ sessionId });
},
async upsertUserProfile(userId: string, patch: Record<string, unknown>) {
const userActor = await getAuthUser(userId);
return await userActor.upsertUserProfile({ userId, patch });
},
async setActiveOrganization(sessionId: string, activeOrganizationId: string | null) {
const authState = await this.getAuthState(sessionId);
if (!authState?.user?.id) {
throw new Error(`Unknown auth session ${sessionId}`);
}
const userActor = await getAuthUser(authState.user.id);
return await userActor.upsertSessionState({ sessionId, activeOrganizationId });
},
async getAccessTokenForSession(sessionId: string) {
// Read the GitHub access token directly from the account record stored in the
// auth user actor. Better Auth's internal /get-access-token endpoint requires
// session middleware resolution which fails for server-side internal calls (403),
// so we bypass it and read the stored token from our adapter layer directly.
const authState = await this.getAuthState(sessionId);
if (!authState?.user?.id || !authState?.accounts) {
return null;
}
const githubAccount = authState.accounts.find((account: any) => account.providerId === "github");
if (!githubAccount?.accessToken) {
logger.warn({ sessionId, userId: authState.user.id }, "get_access_token_no_github_account");
return null;
}
return {
accessToken: githubAccount.accessToken,
scopes: githubAccount.scope ? githubAccount.scope.split(/[, ]+/) : [],
};
},
};
return betterAuthService;
}
export function getBetterAuthService(): BetterAuthService {
if (!betterAuthService) {
throw new Error("BetterAuth service is not initialized");
}
return betterAuthService;
}