feat(foundry): add foundry base sandbox image with sudo, chromium, and dev tooling

Add a custom Docker image (foundry-base.Dockerfile) that builds sandbox-agent
from source and layers sudo, git, neovim, gh, node, bun, chromium, and
agent-browser. Includes publish script for timestamped + latest tags to
rivetdev/sandbox-agent on Docker Hub.

Update local sandbox provider default to use foundry-base-latest and wire
HF_LOCAL_SANDBOX_IMAGE env var through compose.dev.yaml.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-17 02:09:12 -07:00
parent eafe0f9fe4
commit 3895e34bdb
36 changed files with 800 additions and 1126 deletions

View file

@ -24,6 +24,12 @@ const journal = {
tag: "0003_sync_progress",
breakpoints: true,
},
{
idx: 4,
when: 1773993600000,
tag: "0004_drop_github_branches",
breakpoints: true,
},
],
} as const;
@ -101,6 +107,8 @@ ALTER TABLE \`github_members\` ADD \`sync_generation\` integer NOT NULL DEFAULT
ALTER TABLE \`github_pull_requests\` ADD \`sync_generation\` integer NOT NULL DEFAULT 0;
--> statement-breakpoint
ALTER TABLE \`github_branches\` ADD \`sync_generation\` integer NOT NULL DEFAULT 0;
`,
m0004: `DROP TABLE IF EXISTS \`github_branches\`;
`,
} as const,
};

View file

@ -30,15 +30,6 @@ export const githubRepositories = sqliteTable("github_repositories", {
updatedAt: integer("updated_at").notNull(),
});
export const githubBranches = sqliteTable("github_branches", {
branchId: text("branch_id").notNull().primaryKey(),
repoId: text("repo_id").notNull(),
branchName: text("branch_name").notNull(),
commitSha: text("commit_sha").notNull(),
syncGeneration: integer("sync_generation").notNull(),
updatedAt: integer("updated_at").notNull(),
});
export const githubMembers = sqliteTable("github_members", {
memberId: text("member_id").notNull().primaryKey(),
login: text("login").notNull(),

View file

@ -11,12 +11,12 @@ import { repoIdFromRemote } from "../../services/repo.js";
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
import { organizationWorkflowQueueName } from "../organization/queues.js";
import { githubDataDb } from "./db/db.js";
import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js";
import { githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js";
const META_ROW_ID = 1;
const SYNC_REPOSITORY_BATCH_SIZE = 10;
type GithubSyncPhase = "discovering_repositories" | "syncing_repositories" | "syncing_branches" | "syncing_members" | "syncing_pull_requests";
type GithubSyncPhase = "discovering_repositories" | "syncing_repositories" | "syncing_members" | "syncing_pull_requests";
interface GithubDataInput {
organizationId: string;
@ -38,12 +38,6 @@ interface GithubRepositoryRecord {
defaultBranch: string;
}
interface GithubBranchRecord {
repoId: string;
branchName: string;
commitSha: string;
}
interface GithubPullRequestRecord {
repoId: string;
repoFullName: string;
@ -81,7 +75,6 @@ export const GITHUB_DATA_QUEUE_NAMES = [
"githubData.command.syncRepos",
"githubData.command.handlePullRequestWebhook",
"githubData.command.clearState",
"githubData.command.reloadRepository",
] as const;
type GithubDataQueueName = (typeof GITHUB_DATA_QUEUE_NAMES)[number];
@ -308,42 +301,6 @@ async function sweepRepositories(c: any, syncGeneration: number) {
}
}
async function upsertBranches(c: any, branches: GithubBranchRecord[], updatedAt: number, syncGeneration: number) {
for (const branch of branches) {
await c.db
.insert(githubBranches)
.values({
branchId: `${branch.repoId}:${branch.branchName}`,
repoId: branch.repoId,
branchName: branch.branchName,
commitSha: branch.commitSha,
syncGeneration,
updatedAt,
})
.onConflictDoUpdate({
target: githubBranches.branchId,
set: {
repoId: branch.repoId,
branchName: branch.branchName,
commitSha: branch.commitSha,
syncGeneration,
updatedAt,
},
})
.run();
}
}
async function sweepBranches(c: any, syncGeneration: number) {
const rows = await c.db.select({ branchId: githubBranches.branchId, syncGeneration: githubBranches.syncGeneration }).from(githubBranches).all();
for (const row of rows) {
if (row.syncGeneration === syncGeneration) {
continue;
}
await c.db.delete(githubBranches).where(eq(githubBranches.branchId, row.branchId)).run();
}
}
async function upsertMembers(c: any, members: GithubMemberRecord[], updatedAt: number, syncGeneration: number) {
for (const member of members) {
await c.db
@ -602,63 +559,6 @@ async function listPullRequestsForRepositories(
}));
}
async function listRepositoryBranchesForContext(
context: Awaited<ReturnType<typeof getOrganizationContext>>,
repository: GithubRepositoryRecord,
): Promise<GithubBranchRecord[]> {
const { appShell } = getActorRuntimeContext();
let branches: Array<{ name: string; commitSha: string }> = [];
if (context.installationId != null) {
try {
branches = await appShell.github.listInstallationRepositoryBranches(context.installationId, repository.fullName);
} catch (error) {
if (!context.accessToken) {
throw error;
}
}
}
if (branches.length === 0 && context.accessToken) {
branches = await appShell.github.listUserRepositoryBranches(context.accessToken, repository.fullName);
}
const repoId = repoIdFromRemote(repository.cloneUrl);
return branches.map((branch) => ({
repoId,
branchName: branch.name,
commitSha: branch.commitSha,
}));
}
async function refreshRepositoryBranches(
c: any,
context: Awaited<ReturnType<typeof getOrganizationContext>>,
repository: GithubRepositoryRecord,
updatedAt: number,
): Promise<void> {
const currentMeta = await readMeta(c);
const nextBranches = await listRepositoryBranchesForContext(context, repository);
await c.db
.delete(githubBranches)
.where(eq(githubBranches.repoId, repoIdFromRemote(repository.cloneUrl)))
.run();
for (const branch of nextBranches) {
await c.db
.insert(githubBranches)
.values({
branchId: `${branch.repoId}:${branch.branchName}`,
repoId: branch.repoId,
branchName: branch.branchName,
commitSha: branch.commitSha,
syncGeneration: currentMeta.syncGeneration,
updatedAt,
})
.run();
}
}
async function readAllPullRequestRows(c: any) {
return await c.db.select().from(githubPullRequests).all();
}
@ -736,41 +636,7 @@ export async function fullSyncSetup(c: any, input: FullSyncInput = {}): Promise<
}
/**
* Phase 2 (per-batch): Fetch and upsert branches for one batch of repos.
* Returns true when all batches have been processed.
*/
export async function fullSyncBranchBatch(c: any, config: FullSyncConfig, batchIndex: number): Promise<boolean> {
const repos = await readRepositoriesFromDb(c);
const batches = chunkItems(repos, SYNC_REPOSITORY_BATCH_SIZE);
if (batchIndex >= batches.length) return true;
const batch = batches[batchIndex]!;
const context = await getOrganizationContext(c, {
connectedAccount: config.connectedAccount,
installationStatus: config.installationStatus as any,
installationId: config.installationId,
});
const batchBranches = (await Promise.all(batch.map((repo) => listRepositoryBranchesForContext(context, repo)))).flat();
await upsertBranches(c, batchBranches, config.startedAt, config.syncGeneration);
const processedCount = Math.min((batchIndex + 1) * SYNC_REPOSITORY_BATCH_SIZE, repos.length);
await publishSyncProgress(c, {
connectedAccount: config.connectedAccount,
installationStatus: config.installationStatus,
installationId: config.installationId,
syncStatus: "syncing",
lastSyncLabel: `Synced branches for ${processedCount} of ${repos.length} repositories`,
syncGeneration: config.syncGeneration,
syncPhase: "syncing_branches",
processedRepositoryCount: processedCount,
totalRepositoryCount: repos.length,
});
return false;
}
/**
* Phase 3: Resolve, upsert, and sweep members.
* Phase 2: Resolve, upsert, and sweep members.
*/
export async function fullSyncMembers(c: any, config: FullSyncConfig): Promise<void> {
await publishSyncProgress(c, {
@ -796,7 +662,7 @@ export async function fullSyncMembers(c: any, config: FullSyncConfig): Promise<v
}
/**
* Phase 4 (per-batch): Fetch and upsert pull requests for one batch of repos.
* Phase 3 (per-batch): Fetch and upsert pull requests for one batch of repos.
* Returns true when all batches have been processed.
*/
export async function fullSyncPullRequestBatch(c: any, config: FullSyncConfig, batchIndex: number): Promise<boolean> {
@ -830,10 +696,9 @@ export async function fullSyncPullRequestBatch(c: any, config: FullSyncConfig, b
}
/**
* Phase 5: Sweep stale data, publish final state, emit PR change events.
* Phase 4: Sweep stale data, publish final state, emit PR change events.
*/
export async function fullSyncFinalize(c: any, config: FullSyncConfig): Promise<void> {
await sweepBranches(c, config.syncGeneration);
await sweepPullRequests(c, config.syncGeneration);
await sweepRepositories(c, config.syncGeneration);
@ -866,12 +731,6 @@ export async function fullSyncFinalize(c: any, config: FullSyncConfig): Promise<
export async function runFullSync(c: any, input: FullSyncInput = {}): Promise<void> {
const config = await fullSyncSetup(c, input);
// Branches — native loop over batches
for (let i = 0; ; i++) {
const done = await fullSyncBranchBatch(c, config, i);
if (done) break;
}
// Members
await fullSyncMembers(c, config);
@ -929,7 +788,6 @@ const GITHUB_DATA_COMMAND_HANDLERS: Record<GithubDataQueueName, GithubDataWorkfl
await clearStateMutation(c, body);
return { ok: true };
},
"githubData.command.reloadRepository": async (c, body) => reloadRepositoryMutation(c, body),
};
async function runGithubDataWorkflow(ctx: any): Promise<void> {
@ -985,13 +843,11 @@ export const githubData = actor({
actions: {
async getSummary(c) {
const repositories = await c.db.select().from(githubRepositories).all();
const branches = await c.db.select().from(githubBranches).all();
const members = await c.db.select().from(githubMembers).all();
const pullRequests = await c.db.select().from(githubPullRequests).all();
return {
...(await readMeta(c)),
repositoryCount: repositories.length,
branchCount: branches.length,
memberCount: members.length,
pullRequestCount: pullRequests.length,
};
@ -1030,88 +886,14 @@ export const githubData = actor({
.all();
return rows.map((row) => pullRequestSummaryFromRow(row));
},
async listBranchesForRepository(c, input: { repoId: string }) {
const rows = await c.db.select().from(githubBranches).where(eq(githubBranches.repoId, input.repoId)).all();
return rows
.map((row) => ({
branchName: row.branchName,
commitSha: row.commitSha,
}))
.sort((left, right) => left.branchName.localeCompare(right.branchName));
},
},
run: workflow(runGithubDataWorkflow),
});
export async function reloadRepositoryMutation(c: any, input: { repoId: string }) {
const context = await getOrganizationContext(c);
const current = await c.db.select().from(githubRepositories).where(eq(githubRepositories.repoId, input.repoId)).get();
if (!current) {
throw new Error(`Unknown GitHub repository: ${input.repoId}`);
}
const { appShell } = getActorRuntimeContext();
const repository =
context.installationId != null
? await appShell.github.getInstallationRepository(context.installationId, current.fullName)
: context.accessToken
? await appShell.github.getUserRepository(context.accessToken, current.fullName)
: null;
if (!repository) {
throw new Error(`Unable to reload repository: ${current.fullName}`);
}
const updatedAt = Date.now();
const currentMeta = await readMeta(c);
await c.db
.insert(githubRepositories)
.values({
repoId: input.repoId,
fullName: repository.fullName,
cloneUrl: repository.cloneUrl,
private: repository.private ? 1 : 0,
defaultBranch: repository.defaultBranch,
syncGeneration: currentMeta.syncGeneration,
updatedAt,
})
.onConflictDoUpdate({
target: githubRepositories.repoId,
set: {
fullName: repository.fullName,
cloneUrl: repository.cloneUrl,
private: repository.private ? 1 : 0,
defaultBranch: repository.defaultBranch,
syncGeneration: currentMeta.syncGeneration,
updatedAt,
},
})
.run();
await refreshRepositoryBranches(
c,
context,
{
fullName: repository.fullName,
cloneUrl: repository.cloneUrl,
private: repository.private,
defaultBranch: repository.defaultBranch,
},
updatedAt,
);
return {
repoId: input.repoId,
fullName: repository.fullName,
cloneUrl: repository.cloneUrl,
private: repository.private,
defaultBranch: repository.defaultBranch,
};
}
export async function clearStateMutation(c: any, input: ClearStateInput) {
const beforeRows = await readAllPullRequestRows(c);
const currentMeta = await readMeta(c);
await c.db.delete(githubPullRequests).run();
await c.db.delete(githubBranches).run();
await c.db.delete(githubRepositories).run();
await c.db.delete(githubMembers).run();
await writeMeta(c, {

View file

@ -9,9 +9,8 @@ async function getIndexModule() {
export const GITHUB_DATA_QUEUE_NAMES = [
"githubData.command.syncRepos",
"githubData.command.reloadRepository",
"githubData.command.clearState",
"githubData.command.handlePullRequestWebhook",
"githubData.command.clearState",
] as const;
export type GithubDataQueueName = (typeof GITHUB_DATA_QUEUE_NAMES)[number];
@ -46,10 +45,10 @@ export async function runGithubDataCommandLoop(c: any): Promise<void> {
continue;
}
if (msg.name === "githubData.command.reloadRepository") {
const { reloadRepositoryMutation } = await getIndexModule();
const result = await reloadRepositoryMutation(c, msg.body);
await msg.complete(result);
if (msg.name === "githubData.command.handlePullRequestWebhook") {
const { handlePullRequestWebhookMutation } = await getIndexModule();
await handlePullRequestWebhookMutation(c, msg.body);
await msg.complete({ ok: true });
continue;
}
@ -60,13 +59,6 @@ export async function runGithubDataCommandLoop(c: any): Promise<void> {
continue;
}
if (msg.name === "githubData.command.handlePullRequestWebhook") {
const { handlePullRequestWebhookMutation } = await getIndexModule();
await handlePullRequestWebhookMutation(c, msg.body);
await msg.complete({ ok: true });
continue;
}
logActorWarning("githubData", "unknown queue message", { queueName: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` });
} catch (error) {

View file

@ -18,6 +18,7 @@ import { organizationOnboardingActions } from "./actions/onboarding.js";
import { organizationGithubActions } from "./actions/github.js";
import { organizationShellActions } from "./actions/organization.js";
import { organizationTaskActions } from "./actions/tasks.js";
import { updateOrganizationShellProfileMutation } from "./app-shell.js";
interface OrganizationState {
organizationId: string;
@ -169,6 +170,11 @@ export const organizationActions = {
assertOrganization(c, input.organizationId);
return await getOrganizationSummarySnapshot(c);
},
// updateShellProfile stays as a direct action — called with await from HTTP handler where the user can retry
async updateShellProfile(c: any, input: { displayName?: string; slug?: string; primaryDomain?: string }): Promise<void> {
await updateOrganizationShellProfileMutation(c, input);
},
};
export async function applyGithubSyncProgressMutation(

View file

@ -242,71 +242,49 @@ export async function betterAuthDeleteManyVerificationMutation(c: any, input: {
return rows.length;
}
/**
* Better Auth adapter actions exposed as actions (not queue commands) so they
* execute immediately without competing in the organization workflow queue.
*
* The org actor's workflow queue is shared with GitHub sync, webhook processing,
* task mutations, and billing operations. When the queue is busy, auth operations
* would time out (10s), causing Better Auth's parseState to throw a non-StateError
* which redirects to ?error=please_restart_the_process.
*
* Auth operations are safe to run as actions because they are simple SQLite
* reads/writes scoped to this actor instance with no cross-actor side effects.
*/
// Exception to the CLAUDE.md queue-for-mutations rule: Better Auth adapter operations
// use direct actions even for mutations. Better Auth runs during OAuth callbacks on the
// HTTP request path, not through the normal organization lifecycle. Routing through the
// queue adds multiple sequential round-trips (each with actor wake-up + step overhead)
// that cause 30-second OAuth callbacks and proxy retry storms. These mutations are simple
// SQLite upserts/deletes with no cross-actor coordination or broadcast side effects.
export const organizationBetterAuthActions = {
// --- Mutation actions (formerly queue commands) ---
// --- Mutation actions (called by the Better Auth adapter in better-auth.ts) ---
async betterAuthUpsertSessionIndex(c: any, input: { sessionId: string; sessionToken: string; userId: string }) {
return await betterAuthUpsertSessionIndexMutation(c, input);
},
async betterAuthDeleteSessionIndex(c: any, input: { sessionId?: string; sessionToken?: string }) {
await betterAuthDeleteSessionIndexMutation(c, input);
},
async betterAuthUpsertEmailIndex(c: any, input: { email: string; userId: string }) {
return await betterAuthUpsertEmailIndexMutation(c, input);
},
async betterAuthDeleteEmailIndex(c: any, input: { email: string }) {
await betterAuthDeleteEmailIndexMutation(c, input);
},
async betterAuthUpsertAccountIndex(c: any, input: { id: string; providerId: string; accountId: string; userId: string }) {
return await betterAuthUpsertAccountIndexMutation(c, input);
},
async betterAuthDeleteAccountIndex(c: any, input: { id?: string; providerId?: string; accountId?: string }) {
await betterAuthDeleteAccountIndexMutation(c, input);
},
async betterAuthCreateVerification(c: any, input: { data: Record<string, unknown> }) {
return await betterAuthCreateVerificationMutation(c, input);
},
async betterAuthUpdateVerification(c: any, input: { where: any[]; update: Record<string, unknown> }) {
return await betterAuthUpdateVerificationMutation(c, input);
},
async betterAuthUpdateManyVerification(c: any, input: { where: any[]; update: Record<string, unknown> }) {
return await betterAuthUpdateManyVerificationMutation(c, input);
},
async betterAuthDeleteVerification(c: any, input: { where: any[] }) {
await betterAuthDeleteVerificationMutation(c, input);
return { ok: true };
},
async betterAuthDeleteManyVerification(c: any, input: { where: any[] }) {
return await betterAuthDeleteManyVerificationMutation(c, input);
},
async betterAuthUpsertSessionIndex(c: any, input: { sessionId: string; sessionToken: string; userId: string }) {
return await betterAuthUpsertSessionIndexMutation(c, input);
},
async betterAuthDeleteSessionIndex(c: any, input: { sessionId?: string; sessionToken?: string }) {
await betterAuthDeleteSessionIndexMutation(c, input);
return { ok: true };
},
async betterAuthUpsertEmailIndex(c: any, input: { email: string; userId: string }) {
return await betterAuthUpsertEmailIndexMutation(c, input);
},
async betterAuthDeleteEmailIndex(c: any, input: { email: string }) {
await betterAuthDeleteEmailIndexMutation(c, input);
return { ok: true };
},
async betterAuthUpsertAccountIndex(c: any, input: { id: string; providerId: string; accountId: string; userId: string }) {
return await betterAuthUpsertAccountIndexMutation(c, input);
},
async betterAuthDeleteAccountIndex(c: any, input: { id?: string; providerId?: string; accountId?: string }) {
await betterAuthDeleteAccountIndexMutation(c, input);
return { ok: true };
},
// --- Read actions ---
async betterAuthFindSessionIndex(c: any, input: { sessionId?: string; sessionToken?: string }) {
assertAppOrganization(c);

View file

@ -3,13 +3,7 @@ import type { FoundryAppSnapshot } from "@sandbox-agent/foundry-shared";
import { getOrCreateGithubData, getOrCreateOrganization } from "../../handles.js";
import { githubDataWorkflowQueueName } from "../../github-data/index.js";
import { authSessionIndex } from "../db/schema.js";
import {
assertAppOrganization,
buildAppSnapshot,
requireEligibleOrganization,
requireSignedInSession,
markOrganizationSyncStartedMutation,
} from "../app-shell.js";
import { assertAppOrganization, buildAppSnapshot, requireEligibleOrganization, requireSignedInSession } from "../app-shell.js";
import { getBetterAuthService } from "../../../services/better-auth.js";
import { refreshOrganizationSnapshotMutation } from "../actions.js";
import { organizationWorkflowQueueName } from "../queues.js";
@ -79,8 +73,8 @@ export const organizationGithubActions = {
await githubData.send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Reloading GitHub organization..." }, { wait: false });
},
async adminReloadGithubRepository(c: any, input: { repoId: string }): Promise<void> {
async adminReloadGithubRepository(c: any, _input: { repoId: string }): Promise<void> {
const githubData = await getOrCreateGithubData(c, c.state.organizationId);
await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), input, { wait: false });
await githubData.send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Reloading repository..." }, { wait: false });
},
};

View file

@ -10,7 +10,6 @@ import {
requireEligibleOrganization,
requireSignedInSession,
} from "../app-shell.js";
import { organizationWorkflowQueueName } from "../queues.js";
export const organizationShellActions = {
async getAppSnapshot(c: any, input: { sessionId: string }): Promise<FoundryAppSnapshot> {
@ -34,15 +33,11 @@ export const organizationShellActions = {
const session = await requireSignedInSession(c, input.sessionId);
requireEligibleOrganization(session, input.organizationId);
const organization = await getOrCreateOrganization(c, input.organizationId);
await organization.send(
organizationWorkflowQueueName("organization.command.shell.profile.update"),
{
displayName: input.displayName,
slug: input.slug,
primaryDomain: input.primaryDomain,
},
{ wait: true, timeout: 10_000 },
);
await organization.updateShellProfile({
displayName: input.displayName,
slug: input.slug,
primaryDomain: input.primaryDomain,
});
return await buildAppSnapshot(c, input.sessionId);
},

View file

@ -35,7 +35,6 @@ interface RegisterTaskBranchCommand {
repoId: string;
taskId: string;
branchName: string;
requireExistingRemote?: boolean;
}
function isStaleTaskReferenceError(error: unknown): boolean {
@ -120,11 +119,6 @@ async function resolveGitHubRepository(c: any, repoId: string) {
return await githubData.getRepository({ repoId }).catch(() => null);
}
async function listGitHubBranches(c: any, repoId: string): Promise<Array<{ branchName: string; commitSha: string }>> {
const githubData = getGithubData(c, c.state.organizationId);
return await githubData.listBranchesForRepository({ repoId }).catch(() => []);
}
async function resolveRepositoryRemoteUrl(c: any, repoId: string): Promise<string> {
const repository = await resolveGitHubRepository(c, repoId);
const remoteUrl = repository?.cloneUrl?.trim();
@ -161,7 +155,6 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis
repoId,
taskId,
branchName: onBranch,
requireExistingRemote: true,
});
} else {
const reservedBranches = await listKnownTaskBranches(c, repoId);
@ -255,7 +248,7 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis
return created;
}
export async function registerTaskBranchMutation(c: any, cmd: RegisterTaskBranchCommand): Promise<{ branchName: string; headSha: string }> {
export async function registerTaskBranchMutation(c: any, cmd: RegisterTaskBranchCommand): Promise<{ branchName: string }> {
const branchName = cmd.branchName.trim();
if (!branchName) {
throw new Error("branchName is required");
@ -284,16 +277,6 @@ export async function registerTaskBranchMutation(c: any, cmd: RegisterTaskBranch
}
}
const branches = await listGitHubBranches(c, cmd.repoId);
const branchMatch = branches.find((branch) => branch.branchName === branchName) ?? null;
if (cmd.requireExistingRemote && !branchMatch) {
throw new Error(`Remote branch not found: ${branchName}`);
}
const repository = await resolveGitHubRepository(c, cmd.repoId);
const defaultBranch = repository?.defaultBranch ?? "main";
const headSha = branchMatch?.commitSha ?? branches.find((branch) => branch.branchName === defaultBranch)?.commitSha ?? "";
const now = Date.now();
await c.db
.insert(taskIndex)
@ -313,7 +296,7 @@ export async function registerTaskBranchMutation(c: any, cmd: RegisterTaskBranch
})
.run();
return { branchName, headSha };
return { branchName };
}
export async function applyTaskSummaryUpdateMutation(c: any, input: { taskSummary: WorkspaceTaskSummary }): Promise<void> {
@ -392,7 +375,7 @@ export async function refreshTaskSummaryForBranchMutation(
// Best-effort notify the task actor if it exists (fire-and-forget)
try {
const task = getTask(c, c.state.organizationId, input.repoId, row.taskId);
void task.send(taskWorkflowQueueName("task.command.pull_request.sync"), { pullRequest }, { wait: false }).catch(() => {});
void task.syncPullRequest({ pullRequest }).catch(() => {});
} catch {
// Task actor doesn't exist yet — that's fine, it's virtual
}
@ -402,34 +385,6 @@ export async function refreshTaskSummaryForBranchMutation(
await refreshOrganizationSnapshotMutation(c);
}
export function sortOverviewBranches(
branches: Array<{
branchName: string;
commitSha: string;
taskId: string | null;
taskTitle: string | null;
taskStatus: TaskRecord["status"] | null;
pullRequest: WorkspacePullRequestSummary | null;
ciStatus: string | null;
updatedAt: number;
}>,
defaultBranch: string | null,
) {
return [...branches].sort((left, right) => {
if (defaultBranch) {
if (left.branchName === defaultBranch && right.branchName !== defaultBranch) return -1;
if (right.branchName === defaultBranch && left.branchName !== defaultBranch) return 1;
}
if (Boolean(left.taskId) !== Boolean(right.taskId)) {
return left.taskId ? -1 : 1;
}
if (left.updatedAt !== right.updatedAt) {
return right.updatedAt - left.updatedAt;
}
return left.branchName.localeCompare(right.branchName);
});
}
export async function listTaskSummariesForRepo(c: any, repoId: string, includeArchived = false): Promise<TaskSummary[]> {
const rows = await c.db.select().from(taskSummaries).where(eq(taskSummaries.repoId, repoId)).orderBy(desc(taskSummaries.updatedAtMs)).all();
return rows
@ -471,56 +426,24 @@ export async function getRepoOverviewFromOrg(c: any, repoId: string): Promise<Re
const now = Date.now();
const repository = await resolveGitHubRepository(c, repoId);
const remoteUrl = await resolveRepositoryRemoteUrl(c, repoId);
const githubBranches = await listGitHubBranches(c, repoId).catch(() => []);
const taskRows = await c.db.select().from(taskSummaries).where(eq(taskSummaries.repoId, repoId)).all();
const taskMetaByBranch = new Map<
string,
{ taskId: string; title: string | null; status: TaskRecord["status"] | null; updatedAt: number; pullRequest: WorkspacePullRequestSummary | null }
>();
for (const row of taskRows) {
if (!row.branch) {
continue;
}
taskMetaByBranch.set(row.branch, {
taskId: row.taskId,
title: row.title ?? null,
status: row.status,
updatedAt: row.updatedAtMs,
pullRequest: parseJsonValue<WorkspacePullRequestSummary | null>(row.pullRequestJson, null),
});
}
const branchMap = new Map<string, { branchName: string; commitSha: string }>();
for (const branch of githubBranches) {
branchMap.set(branch.branchName, branch);
}
for (const branchName of taskMetaByBranch.keys()) {
if (!branchMap.has(branchName)) {
branchMap.set(branchName, { branchName, commitSha: "" });
}
}
if (repository?.defaultBranch && !branchMap.has(repository.defaultBranch)) {
branchMap.set(repository.defaultBranch, { branchName: repository.defaultBranch, commitSha: "" });
}
const branches = sortOverviewBranches(
[...branchMap.values()].map((branch) => {
const taskMeta = taskMetaByBranch.get(branch.branchName);
const pr = taskMeta?.pullRequest ?? null;
const branches = taskRows
.filter((row: any) => row.branch)
.map((row: any) => {
const pr = parseJsonValue<WorkspacePullRequestSummary | null>(row.pullRequestJson, null);
return {
branchName: branch.branchName,
commitSha: branch.commitSha,
taskId: taskMeta?.taskId ?? null,
taskTitle: taskMeta?.title ?? null,
taskStatus: taskMeta?.status ?? null,
branchName: row.branch!,
commitSha: "",
taskId: row.taskId,
taskTitle: row.title ?? null,
taskStatus: row.status ?? null,
pullRequest: pr,
ciStatus: null,
updatedAt: Math.max(taskMeta?.updatedAt ?? 0, pr?.updatedAtMs ?? 0, now),
updatedAt: Math.max(row.updatedAtMs ?? 0, pr?.updatedAtMs ?? 0, now),
};
}),
repository?.defaultBranch ?? null,
);
})
.sort((a: any, b: any) => b.updatedAt - a.updatedAt);
return {
organizationId: c.state.organizationId,

View file

@ -149,12 +149,12 @@ export const organizationTaskActions = {
async markWorkspaceUnread(c: any, input: TaskWorkspaceSelectInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(taskWorkflowQueueName("task.command.workspace.mark_unread"), { authSessionId: input.authSessionId }, { wait: false });
await task.markUnread({ authSessionId: input.authSessionId });
},
async renameWorkspaceTask(c: any, input: TaskWorkspaceRenameInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(taskWorkflowQueueName("task.command.workspace.rename_task"), { value: input.value }, { wait: false });
await task.renameTask({ value: input.value });
},
async createWorkspaceSession(c: any, input: TaskWorkspaceSelectInput & { model?: string }): Promise<{ sessionId: string }> {
@ -173,54 +173,34 @@ export const organizationTaskActions = {
async renameWorkspaceSession(c: any, input: TaskWorkspaceRenameSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(
taskWorkflowQueueName("task.command.workspace.rename_session"),
{ sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId },
{ wait: false },
);
await task.renameSession({ sessionId: input.sessionId, title: input.title });
},
async selectWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(
taskWorkflowQueueName("task.command.workspace.select_session"),
{ sessionId: input.sessionId, authSessionId: input.authSessionId },
{ wait: false },
);
await task.selectSession({ sessionId: input.sessionId, authSessionId: input.authSessionId });
},
async setWorkspaceSessionUnread(c: any, input: TaskWorkspaceSetSessionUnreadInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(
taskWorkflowQueueName("task.command.workspace.set_session_unread"),
{ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId },
{ wait: false },
);
await task.setSessionUnread({ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId });
},
async updateWorkspaceDraft(c: any, input: TaskWorkspaceUpdateDraftInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
void task
.send(
taskWorkflowQueueName("task.command.workspace.update_draft"),
{
sessionId: input.sessionId,
text: input.text,
attachments: input.attachments,
authSessionId: input.authSessionId,
},
{ wait: false },
)
.updateDraft({
sessionId: input.sessionId,
text: input.text,
attachments: input.attachments,
authSessionId: input.authSessionId,
})
.catch(() => {});
},
async changeWorkspaceModel(c: any, input: TaskWorkspaceChangeModelInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(
taskWorkflowQueueName("task.command.workspace.change_model"),
{ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId },
{ wait: false },
);
await task.changeModel({ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId });
},
async sendWorkspaceMessage(c: any, input: TaskWorkspaceSendMessageInput): Promise<void> {

View file

@ -15,7 +15,7 @@ import { getActorRuntimeContext } from "../context.js";
import { getOrCreateGithubData, getOrCreateOrganization, selfOrganization } from "../handles.js";
import { GitHubAppError } from "../../services/app-github.js";
import { getBetterAuthService } from "../../services/better-auth.js";
import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js";
import { repoLabelFromRemote } from "../../services/repo.js";
import { logger } from "../../logging.js";
import { githubDataWorkflowQueueName } from "../github-data/index.js";
import { organizationWorkflowQueueName } from "./queues.js";
@ -685,10 +685,7 @@ async function applySubscriptionState(
): Promise<void> {
await organization.send(
organizationWorkflowQueueName("organization.command.billing.stripe_subscription.apply"),
{
subscription,
fallbackPlanId,
},
{ subscription, fallbackPlanId },
{ wait: true, timeout: 10_000 },
);
}
@ -705,9 +702,7 @@ export const organizationAppActions = {
if (input.planId === "free") {
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.free_plan.apply"),
{
clearSubscription: false,
},
{ clearSubscription: false },
{ wait: true, timeout: 10_000 },
);
return {
@ -730,9 +725,7 @@ export const organizationAppActions = {
).id;
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"),
{
customerId,
},
{ customerId },
{ wait: true, timeout: 10_000 },
);
await upsertStripeLookupEntries(c, input.organizationId, customerId, null);
@ -764,9 +757,7 @@ export const organizationAppActions = {
if (completion.customerId) {
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"),
{
customerId: completion.customerId,
},
{ customerId: completion.customerId },
{ wait: true, timeout: 10_000 },
);
}
@ -780,9 +771,7 @@ export const organizationAppActions = {
if (completion.paymentMethodLabel) {
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.payment_method.set"),
{
label: completion.paymentMethodLabel,
},
{ label: completion.paymentMethodLabel },
{ wait: true, timeout: 10_000 },
);
}
@ -824,9 +813,7 @@ export const organizationAppActions = {
} else {
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.status.set"),
{
status: "scheduled_cancel",
},
{ status: "scheduled_cancel" },
{ wait: true, timeout: 10_000 },
);
}
@ -849,9 +836,7 @@ export const organizationAppActions = {
} else {
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.status.set"),
{
status: "active",
},
{ status: "active" },
{ wait: true, timeout: 10_000 },
);
}
@ -866,9 +851,7 @@ export const organizationAppActions = {
const organization = await getOrCreateOrganization(c, input.organizationId);
await organization.send(
organizationWorkflowQueueName("organization.command.billing.seat_usage.record"),
{
email: session.currentUserEmail,
},
{ email: session.currentUserEmail },
{ wait: true, timeout: 10_000 },
);
return await buildAppSnapshot(c, input.sessionId);
@ -893,9 +876,7 @@ export const organizationAppActions = {
if (typeof object.customer === "string") {
await organization.send(
organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"),
{
customerId: object.customer,
},
{ customerId: object.customer },
{ wait: true, timeout: 10_000 },
);
}
@ -932,9 +913,7 @@ export const organizationAppActions = {
const organization = await getOrCreateOrganization(c, organizationId);
await organization.send(
organizationWorkflowQueueName("organization.command.billing.free_plan.apply"),
{
clearSubscription: true,
},
{ clearSubscription: true },
{ wait: true, timeout: 10_000 },
);
}
@ -990,12 +969,7 @@ export const organizationAppActions = {
const organization = await getOrCreateOrganization(c, organizationId);
await organization.send(
organizationWorkflowQueueName("organization.command.github.webhook_receipt.record"),
{
organizationId: organizationId,
event,
action: body.action ?? null,
receivedAt,
},
{ organizationId, event, action: body.action ?? null, receivedAt },
{ wait: false },
);
const githubData = await getOrCreateGithubData(c, organizationId);
@ -1013,12 +987,7 @@ export const organizationAppActions = {
if (body.action === "deleted") {
await githubData.send(
githubDataWorkflowQueueName("githubData.command.clearState"),
{
connectedAccount: accountLogin,
installationStatus: "install_required",
installationId: null,
label: "GitHub App installation removed",
},
{ connectedAccount: accountLogin, installationStatus: "install_required", installationId: null, label: "GitHub App installation removed" },
{ wait: false },
);
} else if (body.action === "created") {
@ -1147,13 +1116,6 @@ export const organizationAppActions = {
{ wait: false },
);
}
if ((event === "push" || event === "create" || event === "delete") && body.repository?.clone_url) {
const repoId = repoIdFromRemote(body.repository.clone_url);
const knownRepository = await githubData.getRepository({ repoId });
if (knownRepository) {
await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), { repoId }, { wait: false });
}
}
}
return { ok: true };
}

View file

@ -1,16 +1,14 @@
export const ORGANIZATION_QUEUE_NAMES = [
"organization.command.createTask",
"organization.command.materializeTask",
"organization.command.registerTaskBranch",
"organization.command.applyTaskSummaryUpdate",
"organization.command.removeTaskSummary",
"organization.command.refreshTaskSummaryForBranch",
"organization.command.snapshot.broadcast",
"organization.command.syncGithubSession",
"organization.command.github.organization_shell.sync_from_github",
"organization.command.github.sync_progress.apply",
"organization.command.github.webhook_receipt.record",
"organization.command.github.organization_shell.sync_from_github",
"organization.command.shell.profile.update",
"organization.command.shell.sync_started.mark",
"organization.command.billing.stripe_customer.apply",
"organization.command.billing.stripe_subscription.apply",

View file

@ -17,7 +17,6 @@ import {
applyTaskSummaryUpdateMutation,
createTaskMutation,
refreshTaskSummaryForBranchMutation,
registerTaskBranchMutation,
removeTaskSummaryMutation,
} from "./actions/task-mutations.js";
import {
@ -29,7 +28,6 @@ import {
setOrganizationBillingPaymentMethodMutation,
setOrganizationBillingStatusMutation,
syncOrganizationShellFromGithubMutation,
updateOrganizationShellProfileMutation,
upsertOrganizationInvoiceMutation,
} from "./app-shell.js";
@ -49,7 +47,6 @@ const COMMAND_HANDLERS: Record<OrganizationQueueName, WorkflowHandler> = {
// Task mutations
"organization.command.createTask": async (c, body) => createTaskMutation(c, body),
"organization.command.materializeTask": async (c, body) => createTaskMutation(c, body),
"organization.command.registerTaskBranch": async (c, body) => registerTaskBranchMutation(c, body),
"organization.command.applyTaskSummaryUpdate": async (c, body) => {
await applyTaskSummaryUpdateMutation(c, body);
return { ok: true };
@ -72,7 +69,10 @@ const COMMAND_HANDLERS: Record<OrganizationQueueName, WorkflowHandler> = {
return { ok: true };
},
// GitHub sync mutations
// GitHub organization shell sync (stays on queue)
"organization.command.github.organization_shell.sync_from_github": async (c, body) => syncOrganizationShellFromGithubMutation(c, body),
// GitHub sync progress + webhook receipt
"organization.command.github.sync_progress.apply": async (c, body) => {
await applyGithubSyncProgressMutation(c, body);
return { ok: true };
@ -81,13 +81,6 @@ const COMMAND_HANDLERS: Record<OrganizationQueueName, WorkflowHandler> = {
await recordGithubWebhookReceiptMutation(c, body);
return { ok: true };
},
"organization.command.github.organization_shell.sync_from_github": async (c, body) => syncOrganizationShellFromGithubMutation(c, body),
// Shell/profile mutations
"organization.command.shell.profile.update": async (c, body) => {
await updateOrganizationShellProfileMutation(c, body);
return { ok: true };
},
"organization.command.shell.sync_started.mark": async (c, body) => {
await markOrganizationSyncStartedMutation(c, body);
return { ok: true };

View file

@ -13,8 +13,8 @@ import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { expectQueueResponse } from "../../services/queue.js";
import { resolveSandboxProviderId } from "../../sandbox-config.js";
const SANDBOX_REPO_CWD = "/home/user/repo";
const DEFAULT_LOCAL_SANDBOX_IMAGE = "rivetdev/sandbox-agent:full";
const SANDBOX_REPO_CWD = "/home/sandbox/repo";
const DEFAULT_LOCAL_SANDBOX_IMAGE = "rivetdev/sandbox-agent:foundry-base-latest";
const DEFAULT_LOCAL_SANDBOX_PORT = 2468;
const dockerClient = new Dockerode({ socketPath: "/var/run/docker.sock" });

View file

@ -3,7 +3,22 @@ import { workflow } from "rivetkit/workflow";
import type { TaskRecord } from "@sandbox-agent/foundry-shared";
import { taskDb } from "./db/db.js";
import { getCurrentRecord } from "./workflow/common.js";
import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js";
import {
changeWorkspaceModel,
getSessionDetail,
getTaskDetail,
getTaskSummary,
markWorkspaceUnread,
refreshWorkspaceDerivedState,
refreshWorkspaceSessionTranscript,
renameWorkspaceSession,
renameWorkspaceTask,
selectWorkspaceSession,
setWorkspaceSessionUnread,
syncTaskPullRequest,
syncWorkspaceSessionStatus,
updateWorkspaceDraft,
} from "./workspace.js";
import { runTaskWorkflow } from "./workflow/index.js";
import { TASK_QUEUE_NAMES } from "./workflow/queue.js";
@ -42,6 +57,41 @@ export const task = actor({
async getSessionDetail(c, input: { sessionId: string; authSessionId?: string }) {
return await getSessionDetail(c, input.sessionId, input.authSessionId);
},
// Direct actions migrated from queue:
async markUnread(c, input: { authSessionId?: string }) {
await markWorkspaceUnread(c, input?.authSessionId);
},
async renameTask(c, input: { value: string }) {
await renameWorkspaceTask(c, input.value);
},
async renameSession(c, input: { sessionId: string; title: string }) {
await renameWorkspaceSession(c, input.sessionId, input.title);
},
async selectSession(c, input: { sessionId: string; authSessionId?: string }) {
await selectWorkspaceSession(c, input.sessionId, input?.authSessionId);
},
async setSessionUnread(c, input: { sessionId: string; unread: boolean; authSessionId?: string }) {
await setWorkspaceSessionUnread(c, input.sessionId, input.unread, input?.authSessionId);
},
async updateDraft(c, input: { sessionId: string; text: string; attachments: any[]; authSessionId?: string }) {
await updateWorkspaceDraft(c, input.sessionId, input.text, input.attachments, input?.authSessionId);
},
async changeModel(c, input: { sessionId: string; model: string; authSessionId?: string }) {
await changeWorkspaceModel(c, input.sessionId, input.model, input?.authSessionId);
},
async refreshSessionTranscript(c, input: { sessionId: string }) {
await refreshWorkspaceSessionTranscript(c, input.sessionId);
},
async refreshDerived(c) {
await refreshWorkspaceDerivedState(c);
},
async syncSessionStatus(c, input: { sessionId: string; status: "running" | "idle" | "error"; at: number }) {
await syncWorkspaceSessionStatus(c, input.sessionId, input.status, input.at);
},
async syncPullRequest(c, input: { pullRequest: any }) {
await syncTaskPullRequest(c, input?.pullRequest ?? null);
},
},
run: workflow(runTaskWorkflow),
});

View file

@ -16,7 +16,6 @@ import { initBootstrapDbActivity, initCompleteActivity, initEnqueueProvisionActi
import {
handleArchiveActivity,
handleAttachActivity,
handleGetActivity,
handlePushActivity,
handleSimpleCommandActivity,
handleSwitchActivity,
@ -25,24 +24,13 @@ import {
} from "./commands.js";
import {
changeTaskOwnerManually,
changeWorkspaceModel,
closeWorkspaceSession,
createWorkspaceSession,
ensureWorkspaceSession,
refreshWorkspaceDerivedState,
refreshWorkspaceSessionTranscript,
markWorkspaceUnread,
publishWorkspacePr,
renameWorkspaceTask,
renameWorkspaceSession,
selectWorkspaceSession,
revertWorkspaceFile,
sendWorkspaceMessage,
setWorkspaceSessionUnread,
stopWorkspaceSession,
syncTaskPullRequest,
syncWorkspaceSessionStatus,
updateWorkspaceDraft,
} from "../workspace.js";
export { taskWorkflowQueueName } from "./queue.js";
@ -100,25 +88,6 @@ const COMMAND_HANDLERS: Record<TaskQueueName, WorkflowHandler> = {
await killWriteDbActivity(loopCtx, msg);
},
"task.command.get": async (loopCtx, msg) => {
await handleGetActivity(loopCtx, msg);
},
"task.command.pull_request.sync": async (loopCtx, msg) => {
await syncTaskPullRequest(loopCtx, msg.body?.pullRequest ?? null);
await msg.complete({ ok: true });
},
"task.command.workspace.mark_unread": async (loopCtx, msg) => {
await markWorkspaceUnread(loopCtx, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
"task.command.workspace.rename_task": async (loopCtx, msg) => {
await renameWorkspaceTask(loopCtx, msg.body.value);
await msg.complete({ ok: true });
},
"task.command.workspace.create_session": async (loopCtx, msg) => {
const result = await createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId);
await msg.complete(result);
@ -141,31 +110,6 @@ const COMMAND_HANDLERS: Record<TaskQueueName, WorkflowHandler> = {
await msg.complete({ ok: true });
},
"task.command.workspace.rename_session": async (loopCtx, msg) => {
await renameWorkspaceSession(loopCtx, msg.body.sessionId, msg.body.title);
await msg.complete({ ok: true });
},
"task.command.workspace.select_session": async (loopCtx, msg) => {
await selectWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
"task.command.workspace.set_session_unread": async (loopCtx, msg) => {
await setWorkspaceSessionUnread(loopCtx, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
"task.command.workspace.update_draft": async (loopCtx, msg) => {
await updateWorkspaceDraft(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
"task.command.workspace.change_model": async (loopCtx, msg) => {
await changeWorkspaceModel(loopCtx, msg.body.sessionId, msg.body.model, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
"task.command.workspace.send_message": async (loopCtx, msg) => {
await sendWorkspaceMessage(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
await msg.complete({ ok: true });
@ -176,21 +120,6 @@ const COMMAND_HANDLERS: Record<TaskQueueName, WorkflowHandler> = {
await msg.complete({ ok: true });
},
"task.command.workspace.sync_session_status": async (loopCtx, msg) => {
await syncWorkspaceSessionStatus(loopCtx, msg.body.sessionId, msg.body.status, msg.body.at);
await msg.complete({ ok: true });
},
"task.command.workspace.refresh_derived": async (loopCtx, msg) => {
await refreshWorkspaceDerivedState(loopCtx);
await msg.complete({ ok: true });
},
"task.command.workspace.refresh_session_transcript": async (loopCtx, msg) => {
await refreshWorkspaceSessionTranscript(loopCtx, msg.body.sessionId);
await msg.complete({ ok: true });
},
"task.command.workspace.close_session": async (loopCtx, msg) => {
await closeWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId);
await msg.complete({ ok: true });

View file

@ -8,23 +8,11 @@ export const TASK_QUEUE_NAMES = [
"task.command.merge",
"task.command.archive",
"task.command.kill",
"task.command.get",
"task.command.pull_request.sync",
"task.command.workspace.mark_unread",
"task.command.workspace.rename_task",
"task.command.workspace.create_session",
"task.command.workspace.create_session_and_send",
"task.command.workspace.ensure_session",
"task.command.workspace.rename_session",
"task.command.workspace.select_session",
"task.command.workspace.set_session_unread",
"task.command.workspace.update_draft",
"task.command.workspace.change_model",
"task.command.workspace.send_message",
"task.command.workspace.stop_session",
"task.command.workspace.sync_session_status",
"task.command.workspace.refresh_derived",
"task.command.workspace.refresh_session_transcript",
"task.command.workspace.close_session",
"task.command.workspace.publish_pr",
"task.command.workspace.revert_file",

View file

@ -17,8 +17,6 @@ import { getBetterAuthService } from "../../services/better-auth.js";
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
import { githubRepoFullNameFromRemote } from "../../services/repo.js";
import { taskWorkflowQueueName } from "./workflow/queue.js";
import { expectQueueResponse } from "../../services/queue.js";
import { userWorkflowQueueName } from "../user/workflow.js";
import { organizationWorkflowQueueName } from "../organization/queues.js";
import { task as taskTable, taskOwner, taskRuntime, taskSandboxes, taskWorkspaceSessions } from "./db/schema.js";
@ -185,9 +183,9 @@ async function injectGitCredentials(sandbox: any, login: string, email: string,
"set -euo pipefail",
`git config --global user.name ${JSON.stringify(login)}`,
`git config --global user.email ${JSON.stringify(email)}`,
`git config --global credential.helper 'store --file=/home/user/.git-token'`,
`printf '%s\\n' ${JSON.stringify(`https://${login}:${token}@github.com`)} > /home/user/.git-token`,
`chmod 600 /home/user/.git-token`,
`git config --global credential.helper 'store --file=/home/sandbox/.git-token'`,
`printf '%s\\n' ${JSON.stringify(`https://${login}:${token}@github.com`)} > /home/sandbox/.git-token`,
`chmod 600 /home/sandbox/.git-token`,
];
const result = await sandbox.runProcess({
command: "bash",
@ -427,17 +425,11 @@ async function upsertUserTaskState(c: any, authSessionId: string | null | undefi
}
const user = await getOrCreateUser(c, userId);
expectQueueResponse(
await user.send(
userWorkflowQueueName("user.command.task_state.upsert"),
{
taskId: c.state.taskId,
sessionId,
patch,
},
{ wait: true, timeout: 10_000 },
),
);
await user.upsertTaskState({
taskId: c.state.taskId,
sessionId,
patch,
});
}
async function deleteUserTaskState(c: any, authSessionId: string | null | undefined, sessionId: string): Promise<void> {
@ -452,14 +444,10 @@ async function deleteUserTaskState(c: any, authSessionId: string | null | undefi
}
const user = await getOrCreateUser(c, userId);
await user.send(
userWorkflowQueueName("user.command.task_state.delete"),
{
taskId: c.state.taskId,
sessionId,
},
{ wait: true, timeout: 10_000 },
);
await user.deleteTaskState({
taskId: c.state.taskId,
sessionId,
});
}
async function resolveDefaultModel(c: any, authSessionId?: string | null): Promise<string> {
@ -937,13 +925,14 @@ async function writeSessionTranscript(c: any, sessionId: string, transcript: Arr
});
}
async function enqueueWorkspaceRefresh(
c: any,
command: "task.command.workspace.refresh_derived" | "task.command.workspace.refresh_session_transcript",
body: Record<string, unknown>,
): Promise<void> {
function fireRefreshDerived(c: any): void {
const self = selfTask(c);
await self.send(taskWorkflowQueueName(command as any), body, { wait: false });
void self.refreshDerived({}).catch(() => {});
}
function fireRefreshSessionTranscript(c: any, sessionId: string): void {
const self = selfTask(c);
void self.refreshSessionTranscript({ sessionId }).catch(() => {});
}
async function enqueueWorkspaceEnsureSession(c: any, sessionId: string): Promise<void> {
@ -958,16 +947,14 @@ function pendingWorkspaceSessionStatus(record: any): "pending_provision" | "pend
async function maybeScheduleWorkspaceRefreshes(c: any, record: any, sessions: Array<any>): Promise<void> {
const gitState = await readCachedGitState(c);
if (record.activeSandboxId && !gitState.updatedAt) {
await enqueueWorkspaceRefresh(c, "task.command.workspace.refresh_derived", {});
fireRefreshDerived(c);
}
for (const session of sessions) {
if (session.closed || session.status !== "ready" || !session.sandboxSessionId || session.transcriptUpdatedAt) {
continue;
}
await enqueueWorkspaceRefresh(c, "task.command.workspace.refresh_session_transcript", {
sessionId: session.sandboxSessionId,
});
fireRefreshSessionTranscript(c, session.sandboxSessionId);
}
}
@ -1097,11 +1084,28 @@ export async function buildTaskDetail(c: any, authSessionId?: string | null): Pr
diffs: gitState.diffs,
fileTree: gitState.fileTree,
minutesUsed: 0,
sandboxes: (record.sandboxes ?? []).map((sandbox: any) => ({
sandboxProviderId: sandbox.sandboxProviderId,
sandboxId: sandbox.sandboxId,
cwd: sandbox.cwd ?? null,
})),
sandboxes: await Promise.all(
(record.sandboxes ?? []).map(async (sandbox: any) => {
let url: string | null = null;
if (sandbox.sandboxId) {
try {
const handle = getTaskSandbox(c, c.state.organizationId, sandbox.sandboxId);
const conn = await handle.sandboxAgentConnection();
if (conn?.endpoint && !conn.endpoint.startsWith("mock://")) {
url = conn.endpoint;
}
} catch {
// Sandbox may not be running
}
}
return {
sandboxProviderId: sandbox.sandboxProviderId,
sandboxId: sandbox.sandboxId,
cwd: sandbox.cwd ?? null,
url,
};
}),
),
activeSandboxId: record.activeSandboxId ?? null,
};
}
@ -1267,9 +1271,7 @@ export async function ensureWorkspaceSession(c: any, sessionId: string, model?:
const record = await ensureWorkspaceSeeded(c);
if (meta.sandboxSessionId && meta.status === "ready") {
await enqueueWorkspaceRefresh(c, "task.command.workspace.refresh_session_transcript", {
sessionId: meta.sandboxSessionId,
});
fireRefreshSessionTranscript(c, meta.sandboxSessionId);
await broadcastTaskUpdate(c, { sessionId: sessionId });
return;
}
@ -1299,9 +1301,7 @@ export async function ensureWorkspaceSession(c: any, sessionId: string, model?:
status: "ready",
errorMessage: null,
});
await enqueueWorkspaceRefresh(c, "task.command.workspace.refresh_session_transcript", {
sessionId: meta.sandboxSessionId ?? sessionId,
});
fireRefreshSessionTranscript(c, meta.sandboxSessionId ?? sessionId);
} catch (error) {
await updateSessionMeta(c, sessionId, {
status: "error",
@ -1506,11 +1506,9 @@ export async function syncWorkspaceSessionStatus(c: any, sessionId: string, stat
})
.where(eq(taskTable.id, 1))
.run();
await enqueueWorkspaceRefresh(c, "task.command.workspace.refresh_session_transcript", {
sessionId,
});
fireRefreshSessionTranscript(c, sessionId);
if (status !== "running") {
await enqueueWorkspaceRefresh(c, "task.command.workspace.refresh_derived", {});
fireRefreshDerived(c);
}
await broadcastTaskUpdate(c, { sessionId: meta.sessionId });
}
@ -1608,6 +1606,6 @@ export async function revertWorkspaceFile(c: any, path: string): Promise<void> {
if (result.exitCode !== 0) {
throw new Error(`file revert failed (${result.exitCode}): ${result.result}`);
}
await enqueueWorkspaceRefresh(c, "task.command.workspace.refresh_derived", {});
fireRefreshDerived(c);
await broadcastTaskUpdate(c);
}

View file

@ -1,53 +1,71 @@
import { asc, count as sqlCount, desc } from "drizzle-orm";
import { applyJoinToRow, applyJoinToRows, buildWhere, columnFor, tableFor } from "../query-helpers.js";
import {
createAuthRecordMutation,
updateAuthRecordMutation,
updateManyAuthRecordsMutation,
deleteAuthRecordMutation,
deleteManyAuthRecordsMutation,
} from "../workflow.js";
import { applyJoinToRow, applyJoinToRows, buildWhere, columnFor, materializeRow, persistInput, persistPatch, tableFor } from "../query-helpers.js";
/**
* Better Auth adapter actions exposed as actions (not queue commands) so they
* execute immediately without competing in the user workflow queue.
*
* The user actor's workflow queue is shared with profile upserts, session state,
* and task state operations. When the queue is busy, auth operations would time
* out (10s), causing Better Auth's parseState to throw a non-StateError which
* redirects to ?error=please_restart_the_process.
*
* Auth operations are safe to run as actions because they are simple SQLite
* reads/writes scoped to this actor instance with no cross-actor side effects.
*/
// Exception to the CLAUDE.md queue-for-mutations rule: Better Auth adapter operations
// use direct actions even for mutations. Better Auth runs during OAuth callbacks on the
// HTTP request path, not through the normal organization lifecycle. Routing through the
// queue adds multiple sequential round-trips (each with actor wake-up + step overhead)
// that cause 30-second OAuth callbacks and proxy retry storms. These mutations are simple
// SQLite upserts/deletes with no cross-actor coordination or broadcast side effects.
export const betterAuthActions = {
// --- Mutation actions (formerly queue commands) ---
async betterAuthCreateRecord(c: any, input: { model: string; data: Record<string, unknown> }) {
return await createAuthRecordMutation(c, input);
// --- Mutation actions ---
async betterAuthCreateRecord(c, input: { model: string; data: Record<string, unknown> }) {
const table = tableFor(input.model);
const persisted = persistInput(input.model, input.data);
await c.db
.insert(table)
.values(persisted as any)
.run();
const row = await c.db
.select()
.from(table)
.where(buildWhere(table, [{ field: "id", value: input.data.id }])!)
.get();
return materializeRow(input.model, row);
},
async betterAuthUpdateRecord(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
return await updateAuthRecordMutation(c, input);
async betterAuthUpdateRecord(c, input: { model: string; where: any[]; update: Record<string, unknown> }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) throw new Error("betterAuthUpdateRecord requires a where clause");
await c.db
.update(table)
.set(persistPatch(input.model, input.update) as any)
.where(predicate)
.run();
return materializeRow(input.model, await c.db.select().from(table).where(predicate).get());
},
async betterAuthUpdateManyRecords(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
return await updateManyAuthRecordsMutation(c, input);
async betterAuthUpdateManyRecords(c, input: { model: string; where: any[]; update: Record<string, unknown> }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) throw new Error("betterAuthUpdateManyRecords requires a where clause");
await c.db
.update(table)
.set(persistPatch(input.model, input.update) as any)
.where(predicate)
.run();
const row = await c.db.select({ value: sqlCount() }).from(table).where(predicate).get();
return row?.value ?? 0;
},
async betterAuthDeleteRecord(c: any, input: { model: string; where: any[] }) {
await deleteAuthRecordMutation(c, input);
return { ok: true };
async betterAuthDeleteRecord(c, input: { model: string; where: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) throw new Error("betterAuthDeleteRecord requires a where clause");
await c.db.delete(table).where(predicate).run();
},
async betterAuthDeleteManyRecords(c: any, input: { model: string; where: any[] }) {
return await deleteManyAuthRecordsMutation(c, input);
async betterAuthDeleteManyRecords(c, input: { model: string; where: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) throw new Error("betterAuthDeleteManyRecords requires a where clause");
const rows = await c.db.select().from(table).where(predicate).all();
await c.db.delete(table).where(predicate).run();
return rows.length;
},
// --- Read actions ---
// Better Auth adapter action — called by the Better Auth adapter in better-auth.ts.
// Schema and behavior are constrained by Better Auth.
async betterAuthFindOneRecord(c, input: { model: string; where: any[]; join?: any }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
@ -55,8 +73,6 @@ export const betterAuthActions = {
return await applyJoinToRow(c, input.model, row ?? null, input.join);
},
// Better Auth adapter action — called by the Better Auth adapter in better-auth.ts.
// Schema and behavior are constrained by Better Auth.
async betterAuthFindManyRecords(c, input: { model: string; where?: any[]; limit?: number; offset?: number; sortBy?: any; join?: any }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
@ -78,8 +94,6 @@ export const betterAuthActions = {
return await applyJoinToRows(c, input.model, rows, input.join);
},
// Better Auth adapter action — called by the Better Auth adapter in better-auth.ts.
// Schema and behavior are constrained by Better Auth.
async betterAuthCountRecords(c, input: { model: string; where?: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);

View file

@ -1,4 +1,5 @@
import { eq } from "drizzle-orm";
import { eq, and } from "drizzle-orm";
import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared";
import { authAccounts, authSessions, authUsers, sessionState, userProfiles, userTaskState } from "../db/schema.js";
import { materializeRow } from "../query-helpers.js";
@ -41,4 +42,147 @@ export const userActions = {
})),
};
},
// --- Mutation actions (migrated from queue) ---
async upsertProfile(
c,
input: {
userId: string;
patch: {
githubAccountId?: string | null;
githubLogin?: string | null;
roleLabel?: string;
defaultModel?: string;
eligibleOrganizationIdsJson?: string;
starterRepoStatus?: string;
starterRepoStarredAt?: number | null;
starterRepoSkippedAt?: number | null;
};
},
) {
const now = Date.now();
await c.db
.insert(userProfiles)
.values({
id: 1,
userId: input.userId,
githubAccountId: input.patch.githubAccountId ?? null,
githubLogin: input.patch.githubLogin ?? null,
roleLabel: input.patch.roleLabel ?? "GitHub user",
defaultModel: input.patch.defaultModel ?? DEFAULT_WORKSPACE_MODEL_ID,
eligibleOrganizationIdsJson: input.patch.eligibleOrganizationIdsJson ?? "[]",
starterRepoStatus: input.patch.starterRepoStatus ?? "pending",
starterRepoStarredAt: input.patch.starterRepoStarredAt ?? null,
starterRepoSkippedAt: input.patch.starterRepoSkippedAt ?? null,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: userProfiles.userId,
set: {
...(input.patch.githubAccountId !== undefined ? { githubAccountId: input.patch.githubAccountId } : {}),
...(input.patch.githubLogin !== undefined ? { githubLogin: input.patch.githubLogin } : {}),
...(input.patch.roleLabel !== undefined ? { roleLabel: input.patch.roleLabel } : {}),
...(input.patch.defaultModel !== undefined ? { defaultModel: input.patch.defaultModel } : {}),
...(input.patch.eligibleOrganizationIdsJson !== undefined ? { eligibleOrganizationIdsJson: input.patch.eligibleOrganizationIdsJson } : {}),
...(input.patch.starterRepoStatus !== undefined ? { starterRepoStatus: input.patch.starterRepoStatus } : {}),
...(input.patch.starterRepoStarredAt !== undefined ? { starterRepoStarredAt: input.patch.starterRepoStarredAt } : {}),
...(input.patch.starterRepoSkippedAt !== undefined ? { starterRepoSkippedAt: input.patch.starterRepoSkippedAt } : {}),
updatedAt: now,
},
})
.run();
return await c.db.select().from(userProfiles).where(eq(userProfiles.userId, input.userId)).get();
},
async upsertSessionState(c, input: { sessionId: string; activeOrganizationId: string | null }) {
const now = Date.now();
await c.db
.insert(sessionState)
.values({
sessionId: input.sessionId,
activeOrganizationId: input.activeOrganizationId,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: sessionState.sessionId,
set: { activeOrganizationId: input.activeOrganizationId, updatedAt: now },
})
.run();
return await c.db.select().from(sessionState).where(eq(sessionState.sessionId, input.sessionId)).get();
},
async upsertTaskState(
c,
input: {
taskId: string;
sessionId: string;
patch: {
activeSessionId?: string | null;
unread?: boolean;
draftText?: string;
draftAttachmentsJson?: string;
draftUpdatedAt?: number | null;
};
},
) {
const now = Date.now();
const existing = await c.db
.select()
.from(userTaskState)
.where(and(eq(userTaskState.taskId, input.taskId), eq(userTaskState.sessionId, input.sessionId)))
.get();
if (input.patch.activeSessionId !== undefined) {
await c.db
.update(userTaskState)
.set({ activeSessionId: input.patch.activeSessionId, updatedAt: now })
.where(eq(userTaskState.taskId, input.taskId))
.run();
}
await c.db
.insert(userTaskState)
.values({
taskId: input.taskId,
sessionId: input.sessionId,
activeSessionId: input.patch.activeSessionId ?? existing?.activeSessionId ?? null,
unread: input.patch.unread !== undefined ? (input.patch.unread ? 1 : 0) : (existing?.unread ?? 0),
draftText: input.patch.draftText ?? existing?.draftText ?? "",
draftAttachmentsJson: input.patch.draftAttachmentsJson ?? existing?.draftAttachmentsJson ?? "[]",
draftUpdatedAt: input.patch.draftUpdatedAt === undefined ? (existing?.draftUpdatedAt ?? null) : input.patch.draftUpdatedAt,
updatedAt: now,
})
.onConflictDoUpdate({
target: [userTaskState.taskId, userTaskState.sessionId],
set: {
...(input.patch.activeSessionId !== undefined ? { activeSessionId: input.patch.activeSessionId } : {}),
...(input.patch.unread !== undefined ? { unread: input.patch.unread ? 1 : 0 } : {}),
...(input.patch.draftText !== undefined ? { draftText: input.patch.draftText } : {}),
...(input.patch.draftAttachmentsJson !== undefined ? { draftAttachmentsJson: input.patch.draftAttachmentsJson } : {}),
...(input.patch.draftUpdatedAt !== undefined ? { draftUpdatedAt: input.patch.draftUpdatedAt } : {}),
updatedAt: now,
},
})
.run();
return await c.db
.select()
.from(userTaskState)
.where(and(eq(userTaskState.taskId, input.taskId), eq(userTaskState.sessionId, input.sessionId)))
.get();
},
async deleteTaskState(c, input: { taskId: string; sessionId?: string }) {
if (input.sessionId) {
await c.db
.delete(userTaskState)
.where(and(eq(userTaskState.taskId, input.taskId), eq(userTaskState.sessionId, input.sessionId)))
.run();
return;
}
await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run();
},
};

View file

@ -1,13 +1,10 @@
import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import { actor } from "rivetkit";
import { userDb } from "./db/db.js";
import { betterAuthActions } from "./actions/better-auth.js";
import { userActions } from "./actions/user.js";
import { USER_QUEUE_NAMES, runUserWorkflow } from "./workflow.js";
export const user = actor({
db: userDb,
queues: Object.fromEntries(USER_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "User",
icon: "shield",
@ -20,5 +17,4 @@ export const user = actor({
...betterAuthActions,
...userActions,
},
run: workflow(runUserWorkflow),
});

View file

@ -1,284 +0,0 @@
// @ts-nocheck
/**
* User workflow queue-based command loop.
*
* Auth mutation commands are dispatched through named queues and processed
* inside the workflow command loop for observability and replay semantics.
*/
import { eq, count as sqlCount, and } from "drizzle-orm";
import { Loop } from "rivetkit/workflow";
import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { selfUser } from "../handles.js";
import { expectQueueResponse } from "../../services/queue.js";
import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js";
import { buildWhere, columnFor, materializeRow, persistInput, persistPatch, tableFor } from "./query-helpers.js";
// ---------------------------------------------------------------------------
// Queue names
// ---------------------------------------------------------------------------
export const USER_QUEUE_NAMES = [
"user.command.profile.upsert",
"user.command.session_state.upsert",
"user.command.task_state.upsert",
"user.command.task_state.delete",
] as const;
export type UserQueueName = (typeof USER_QUEUE_NAMES)[number];
export function userWorkflowQueueName(name: UserQueueName): UserQueueName {
return name;
}
// ---------------------------------------------------------------------------
// Mutation functions
// ---------------------------------------------------------------------------
export async function createAuthRecordMutation(c: any, input: { model: string; data: Record<string, unknown> }) {
const table = tableFor(input.model);
const persisted = persistInput(input.model, input.data);
await c.db
.insert(table)
.values(persisted as any)
.run();
const row = await c.db
.select()
.from(table)
.where(eq(columnFor(input.model, table, "id"), input.data.id as any))
.get();
return materializeRow(input.model, row);
}
export async function updateAuthRecordMutation(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) throw new Error("updateAuthRecord requires a where clause");
await c.db
.update(table)
.set(persistPatch(input.model, input.update) as any)
.where(predicate)
.run();
return materializeRow(input.model, await c.db.select().from(table).where(predicate).get());
}
export async function updateManyAuthRecordsMutation(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) throw new Error("updateManyAuthRecords requires a where clause");
await c.db
.update(table)
.set(persistPatch(input.model, input.update) as any)
.where(predicate)
.run();
const row = await c.db.select({ value: sqlCount() }).from(table).where(predicate).get();
return row?.value ?? 0;
}
export async function deleteAuthRecordMutation(c: any, input: { model: string; where: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) throw new Error("deleteAuthRecord requires a where clause");
await c.db.delete(table).where(predicate).run();
}
export async function deleteManyAuthRecordsMutation(c: any, input: { model: string; where: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) throw new Error("deleteManyAuthRecords requires a where clause");
const rows = await c.db.select().from(table).where(predicate).all();
await c.db.delete(table).where(predicate).run();
return rows.length;
}
export async function upsertUserProfileMutation(
c: any,
input: {
userId: string;
patch: {
githubAccountId?: string | null;
githubLogin?: string | null;
roleLabel?: string;
defaultModel?: string;
eligibleOrganizationIdsJson?: string;
starterRepoStatus?: string;
starterRepoStarredAt?: number | null;
starterRepoSkippedAt?: number | null;
};
},
) {
const now = Date.now();
await c.db
.insert(userProfiles)
.values({
id: 1,
userId: input.userId,
githubAccountId: input.patch.githubAccountId ?? null,
githubLogin: input.patch.githubLogin ?? null,
roleLabel: input.patch.roleLabel ?? "GitHub user",
defaultModel: input.patch.defaultModel ?? DEFAULT_WORKSPACE_MODEL_ID,
eligibleOrganizationIdsJson: input.patch.eligibleOrganizationIdsJson ?? "[]",
starterRepoStatus: input.patch.starterRepoStatus ?? "pending",
starterRepoStarredAt: input.patch.starterRepoStarredAt ?? null,
starterRepoSkippedAt: input.patch.starterRepoSkippedAt ?? null,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: userProfiles.userId,
set: {
...(input.patch.githubAccountId !== undefined ? { githubAccountId: input.patch.githubAccountId } : {}),
...(input.patch.githubLogin !== undefined ? { githubLogin: input.patch.githubLogin } : {}),
...(input.patch.roleLabel !== undefined ? { roleLabel: input.patch.roleLabel } : {}),
...(input.patch.defaultModel !== undefined ? { defaultModel: input.patch.defaultModel } : {}),
...(input.patch.eligibleOrganizationIdsJson !== undefined ? { eligibleOrganizationIdsJson: input.patch.eligibleOrganizationIdsJson } : {}),
...(input.patch.starterRepoStatus !== undefined ? { starterRepoStatus: input.patch.starterRepoStatus } : {}),
...(input.patch.starterRepoStarredAt !== undefined ? { starterRepoStarredAt: input.patch.starterRepoStarredAt } : {}),
...(input.patch.starterRepoSkippedAt !== undefined ? { starterRepoSkippedAt: input.patch.starterRepoSkippedAt } : {}),
updatedAt: now,
},
})
.run();
return await c.db.select().from(userProfiles).where(eq(userProfiles.userId, input.userId)).get();
}
export async function upsertSessionStateMutation(c: any, input: { sessionId: string; activeOrganizationId: string | null }) {
const now = Date.now();
await c.db
.insert(sessionState)
.values({
sessionId: input.sessionId,
activeOrganizationId: input.activeOrganizationId,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: sessionState.sessionId,
set: { activeOrganizationId: input.activeOrganizationId, updatedAt: now },
})
.run();
return await c.db.select().from(sessionState).where(eq(sessionState.sessionId, input.sessionId)).get();
}
export async function upsertTaskStateMutation(
c: any,
input: {
taskId: string;
sessionId: string;
patch: {
activeSessionId?: string | null;
unread?: boolean;
draftText?: string;
draftAttachmentsJson?: string;
draftUpdatedAt?: number | null;
};
},
) {
const now = Date.now();
const existing = await c.db
.select()
.from(userTaskState)
.where(and(eq(userTaskState.taskId, input.taskId), eq(userTaskState.sessionId, input.sessionId)))
.get();
if (input.patch.activeSessionId !== undefined) {
await c.db.update(userTaskState).set({ activeSessionId: input.patch.activeSessionId, updatedAt: now }).where(eq(userTaskState.taskId, input.taskId)).run();
}
await c.db
.insert(userTaskState)
.values({
taskId: input.taskId,
sessionId: input.sessionId,
activeSessionId: input.patch.activeSessionId ?? existing?.activeSessionId ?? null,
unread: input.patch.unread !== undefined ? (input.patch.unread ? 1 : 0) : (existing?.unread ?? 0),
draftText: input.patch.draftText ?? existing?.draftText ?? "",
draftAttachmentsJson: input.patch.draftAttachmentsJson ?? existing?.draftAttachmentsJson ?? "[]",
draftUpdatedAt: input.patch.draftUpdatedAt === undefined ? (existing?.draftUpdatedAt ?? null) : input.patch.draftUpdatedAt,
updatedAt: now,
})
.onConflictDoUpdate({
target: [userTaskState.taskId, userTaskState.sessionId],
set: {
...(input.patch.activeSessionId !== undefined ? { activeSessionId: input.patch.activeSessionId } : {}),
...(input.patch.unread !== undefined ? { unread: input.patch.unread ? 1 : 0 } : {}),
...(input.patch.draftText !== undefined ? { draftText: input.patch.draftText } : {}),
...(input.patch.draftAttachmentsJson !== undefined ? { draftAttachmentsJson: input.patch.draftAttachmentsJson } : {}),
...(input.patch.draftUpdatedAt !== undefined ? { draftUpdatedAt: input.patch.draftUpdatedAt } : {}),
updatedAt: now,
},
})
.run();
return await c.db
.select()
.from(userTaskState)
.where(and(eq(userTaskState.taskId, input.taskId), eq(userTaskState.sessionId, input.sessionId)))
.get();
}
export async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionId?: string }) {
if (input.sessionId) {
await c.db
.delete(userTaskState)
.where(and(eq(userTaskState.taskId, input.taskId), eq(userTaskState.sessionId, input.sessionId)))
.run();
return;
}
await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run();
}
// ---------------------------------------------------------------------------
// Workflow command loop
// ---------------------------------------------------------------------------
type WorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
const COMMAND_HANDLERS: Record<UserQueueName, WorkflowHandler> = {
"user.command.profile.upsert": async (c, body) => upsertUserProfileMutation(c, body),
"user.command.session_state.upsert": async (c, body) => upsertSessionStateMutation(c, body),
"user.command.task_state.upsert": async (c, body) => upsertTaskStateMutation(c, body),
"user.command.task_state.delete": async (c, body) => {
await deleteTaskStateMutation(c, body);
return { ok: true };
},
};
export async function runUserWorkflow(ctx: any): Promise<void> {
await ctx.loop("user-command-loop", async (loopCtx: any) => {
const msg = await loopCtx.queue.next("next-user-command", {
names: [...USER_QUEUE_NAMES],
completable: true,
});
if (!msg) {
return Loop.continue(undefined);
}
const handler = COMMAND_HANDLERS[msg.name as UserQueueName];
if (!handler) {
logActorWarning("user", "unknown user command", { command: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
return Loop.continue(undefined);
}
try {
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
const result = await loopCtx.step({
name: msg.name,
timeout: 60_000,
run: async () => handler(loopCtx, msg.body),
});
await msg.complete(result);
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("user", "user workflow command failed", {
command: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {});
}
return Loop.continue(undefined);
});
}

View file

@ -41,11 +41,6 @@ export interface GitHubRepositoryRecord {
defaultBranch: string;
}
export interface GitHubBranchRecord {
name: string;
commitSha: string;
}
export interface GitHubMemberRecord {
id: string;
login: string;
@ -402,15 +397,6 @@ export class GitHubAppClient {
return await this.getUserRepository(accessToken, fullName);
}
async listUserRepositoryBranches(accessToken: string, fullName: string): Promise<GitHubBranchRecord[]> {
return await this.listRepositoryBranches(accessToken, fullName);
}
async listInstallationRepositoryBranches(installationId: number, fullName: string): Promise<GitHubBranchRecord[]> {
const accessToken = await this.createInstallationAccessToken(installationId);
return await this.listRepositoryBranches(accessToken, fullName);
}
async listOrganizationMembers(accessToken: string, organizationLogin: string): Promise<GitHubMemberRecord[]> {
const members = await this.paginate<{
id: number;
@ -708,20 +694,6 @@ export class GitHubAppClient {
nextUrl: parseNextLink(response.headers.get("link")),
};
}
private async listRepositoryBranches(accessToken: string, fullName: string): Promise<GitHubBranchRecord[]> {
const branches = await this.paginate<{
name: string;
commit?: { sha?: string | null } | null;
}>(`/repos/${fullName}/branches?per_page=100`, accessToken);
return branches
.map((branch) => ({
name: branch.name?.trim() ?? "",
commitSha: branch.commit?.sha?.trim() ?? "",
}))
.filter((branch) => branch.name.length > 0 && branch.commitSha.length > 0);
}
}
function parseNextLink(linkHeader: string | null): string | null {

View file

@ -3,8 +3,6 @@ import { createAdapterFactory } from "better-auth/adapters";
import { APP_SHELL_ORGANIZATION_ID } from "../actors/organization/constants.js";
import { organizationKey, userKey } from "../actors/keys.js";
import { logger } from "../logging.js";
import { expectQueueResponse } from "./queue.js";
import { userWorkflowQueueName } from "../actors/user/workflow.js";
const AUTH_BASE_PATH = "/v1/auth";
const SESSION_COOKIE = "better-auth.session_token";
@ -79,33 +77,17 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
// getOrCreate is intentional here: the adapter runs during Better Auth callbacks
// which can fire before any explicit create path. The app organization and user
// actors must exist by the time the adapter needs them.
//
// Handles are cached to avoid redundant getOrCreate RPCs during a single OAuth
// callback (which calls the adapter 5-10+ times). The RivetKit handle is a
// lightweight proxy; caching it just avoids repeated gateway round-trips.
let cachedAppOrganization: any = null;
const appOrganization = async () => {
if (!cachedAppOrganization) {
cachedAppOrganization = await actorClient.organization.getOrCreate(organizationKey(APP_SHELL_ORGANIZATION_ID), {
createWithInput: APP_SHELL_ORGANIZATION_ID,
});
}
return cachedAppOrganization;
};
const appOrganization = () =>
actorClient.organization.getOrCreate(organizationKey(APP_SHELL_ORGANIZATION_ID), {
createWithInput: APP_SHELL_ORGANIZATION_ID,
});
// getOrCreate is intentional: Better Auth creates user records during OAuth
// callbacks, so the user actor must be lazily provisioned on first access.
const userHandleCache = new Map<string, any>();
const getUser = async (userId: string) => {
let handle = userHandleCache.get(userId);
if (!handle) {
handle = await actorClient.user.getOrCreate(userKey(userId), {
createWithInput: { userId },
});
userHandleCache.set(userId, handle);
}
return handle;
};
const getUser = async (userId: string) =>
await actorClient.user.getOrCreate(userKey(userId), {
createWithInput: { userId },
});
const adapter = createAdapterFactory({
config: {
@ -183,91 +165,51 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
create: async ({ model, data }) => {
const transformed = await transformInput(data, model, "create", true);
if (model === "verification") {
const start = performance.now();
try {
const organization = await appOrganization();
const result = await organization.betterAuthCreateVerification({ data: transformed });
logger.info(
{ model, identifier: transformed.identifier, durationMs: Math.round((performance.now() - start) * 100) / 100 },
"auth_adapter_create_verification",
);
return result;
} catch (error) {
logger.error(
{ model, identifier: transformed.identifier, durationMs: Math.round((performance.now() - start) * 100) / 100, error: String(error) },
"auth_adapter_create_verification_error",
);
throw error;
}
const organization = await appOrganization();
return await organization.betterAuthCreateVerification({ data: transformed });
}
const createStart = performance.now();
const userId = await resolveUserIdForQuery(model, undefined, transformed);
if (!userId) {
throw new Error(`Unable to resolve auth actor for create(${model})`);
}
try {
const userActor = await getUser(userId);
const created = await userActor.betterAuthCreateRecord({ model, data: transformed });
const organization = await appOrganization();
const userActor = await getUser(userId);
const created = await userActor.betterAuthCreateRecord({ model, data: transformed });
const organization = await appOrganization();
if (model === "user" && typeof transformed.email === "string" && transformed.email.length > 0) {
await organization.betterAuthUpsertEmailIndex({
email: transformed.email.toLowerCase(),
userId,
});
}
if (model === "session") {
await organization.betterAuthUpsertSessionIndex({
sessionId: String(created.id),
sessionToken: String(created.token),
userId,
});
}
if (model === "account") {
await organization.betterAuthUpsertAccountIndex({
id: String(created.id),
providerId: String(created.providerId),
accountId: String(created.accountId),
userId,
});
}
logger.info({ model, userId, durationMs: Math.round((performance.now() - createStart) * 100) / 100 }, "auth_adapter_create_record");
return (await transformOutput(created, model)) as any;
} catch (error) {
logger.error(
{ model, userId, durationMs: Math.round((performance.now() - createStart) * 100) / 100, error: String(error) },
"auth_adapter_create_record_error",
);
throw error;
if (model === "user" && typeof transformed.email === "string" && transformed.email.length > 0) {
await organization.betterAuthUpsertEmailIndex({
email: transformed.email.toLowerCase(),
userId,
});
}
if (model === "session") {
await organization.betterAuthUpsertSessionIndex({
sessionId: String(created.id),
sessionToken: String(created.token),
userId,
});
}
if (model === "account") {
await organization.betterAuthUpsertAccountIndex({
id: String(created.id),
providerId: String(created.providerId),
accountId: String(created.accountId),
userId,
});
}
return (await transformOutput(created, model)) as any;
},
findOne: async ({ model, where, join }) => {
const transformedWhere = transformWhereClause({ model, where, action: "findOne" });
if (model === "verification") {
const start = performance.now();
try {
const organization = await appOrganization();
const result = await organization.betterAuthFindOneVerification({ where: transformedWhere, join });
const identifier = transformedWhere?.find((entry: any) => entry.field === "identifier")?.value;
logger.info(
{ model, identifier, found: !!result, durationMs: Math.round((performance.now() - start) * 100) / 100 },
"auth_adapter_find_verification",
);
return result;
} catch (error) {
const identifier = transformedWhere?.find((entry: any) => entry.field === "identifier")?.value;
logger.error(
{ model, identifier, durationMs: Math.round((performance.now() - start) * 100) / 100, error: String(error) },
"auth_adapter_find_verification_error",
);
throw error;
}
const organization = await appOrganization();
return await organization.betterAuthFindOneVerification({ where: transformedWhere, join });
}
const userId = await resolveUserIdForQuery(model, transformedWhere);
@ -429,8 +371,6 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
delete: async ({ model, where }) => {
const transformedWhere = transformWhereClause({ model, where, action: "delete" });
if (model === "verification") {
const identifier = transformedWhere?.find((entry: any) => entry.field === "identifier")?.value;
logger.info({ model, identifier }, "auth_adapter_delete_verification");
const organization = await appOrganization();
await organization.betterAuthDeleteVerification({ where: transformedWhere });
return;
@ -527,15 +467,6 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
secret: requireEnv("BETTER_AUTH_SECRET"),
database: adapter,
trustedOrigins: [stripTrailingSlash(options.appUrl), stripTrailingSlash(options.apiUrl)],
account: {
// Store OAuth state in an encrypted cookie instead of a DB verification record.
// The production proxy chain (Cloudflare -> Fastly -> Railway) retries the OAuth
// callback when it takes >10s, causing a duplicate request. With the "database"
// strategy the first request deletes the verification record, so the retry fails
// with "verification not found" -> ?error=please_restart_the_process.
// Cookie strategy avoids this because the state lives in the request itself.
storeStateStrategy: "cookie",
},
session: {
cookieCache: {
enabled: true,
@ -582,9 +513,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
async upsertUserProfile(userId: string, patch: Record<string, unknown>) {
const userActor = await getUser(userId);
return expectQueueResponse(
await userActor.send(userWorkflowQueueName("user.command.profile.upsert"), { userId, patch }, { wait: true, timeout: 10_000 }),
);
return await userActor.upsertProfile({ userId, patch });
},
async setActiveOrganization(sessionId: string, activeOrganizationId: string | null) {
@ -593,9 +522,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
throw new Error(`Unknown auth session ${sessionId}`);
}
const userActor = await getUser(authState.user.id);
return expectQueueResponse(
await userActor.send(userWorkflowQueueName("user.command.session_state.upsert"), { sessionId, activeOrganizationId }, { wait: true, timeout: 10_000 }),
);
return await userActor.upsertSessionState({ sessionId, activeOrganizationId });
},
async getAccessTokenForSession(sessionId: string) {