wip: convert all actors from workflow to plain run handlers

Workaround for RivetKit bug where c.queue.iter() never yields messages
for actors created via getOrCreate from another actor's context. The
queue accepts messages (visible in inspector) but the iterator hangs.
Sleep/wake fixes it, but actors with active connections never sleep.

Converted organization, github-data, task, and user actors from
run: workflow(...) to plain run: async (c) => { for await ... }.

Also fixes:
- Missing auth tables in org migration (auth_verification etc)
- default_model NOT NULL constraint on org profile upsert
- Nested workflow step in github-data (HistoryDivergedError)
- Removed --force from frontend Dockerfile pnpm install

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-16 02:00:31 -07:00
parent b372383cfd
commit 29e5821fef
23 changed files with 490 additions and 930 deletions

View file

@ -56,6 +56,8 @@ Use `pnpm` workspaces and Turborepo.
- mock frontend changes: `just foundry-mock` or restart with `just foundry-mock-down && just foundry-mock` - mock frontend changes: `just foundry-mock` or restart with `just foundry-mock-down && just foundry-mock`
- local frontend-only work outside Docker: restart `pnpm --filter @sandbox-agent/foundry-frontend dev` or `just foundry-dev-mock` as appropriate - local frontend-only work outside Docker: restart `pnpm --filter @sandbox-agent/foundry-frontend dev` or `just foundry-dev-mock` as appropriate
- The backend does **not** hot reload. Bun's `--hot` flag causes the server to re-bind on a different port (e.g. 6421 instead of 6420), breaking all client connections while the container still exposes the original port. After backend code changes, restart the backend container: `just foundry-dev-down && just foundry-dev`. - The backend does **not** hot reload. Bun's `--hot` flag causes the server to re-bind on a different port (e.g. 6421 instead of 6420), breaking all client connections while the container still exposes the original port. After backend code changes, restart the backend container: `just foundry-dev-down && just foundry-dev`.
- The dev server has debug logging enabled by default (`RIVET_LOG_LEVEL=debug`, `FOUNDRY_LOG_LEVEL=debug`) via `compose.dev.yaml`. Error stacks and timestamps are also enabled.
- The frontend client uses JSON encoding for RivetKit in development (`import.meta.env.DEV`) for easier debugging. Production uses the default encoding.
## Railway Logs ## Railway Logs
@ -77,9 +79,10 @@ Use `pnpm` workspaces and Turborepo.
- Keep frontend route/state coverage current in code and tests; there is no separate page-inventory doc to maintain. - Keep frontend route/state coverage current in code and tests; there is no separate page-inventory doc to maintain.
- If Foundry uses a shared component from `@sandbox-agent/react`, make changes in `sdks/react` instead of copying or forking that component into Foundry. - If Foundry uses a shared component from `@sandbox-agent/react`, make changes in `sdks/react` instead of copying or forking that component into Foundry.
- When changing shared React components in `sdks/react` for Foundry, verify they still work in the Sandbox Agent Inspector before finishing. - When changing shared React components in `sdks/react` for Foundry, verify they still work in the Sandbox Agent Inspector before finishing.
- When making UI changes, verify the live flow with `agent-browser`, take screenshots of the updated UI, and offer to open those screenshots in Preview when you finish. - When making UI changes, verify the live flow with the Chrome DevTools MCP or `agent-browser`, take screenshots of the updated UI, and offer to open those screenshots in Preview when you finish.
- When asked for screenshots, capture all relevant affected screens and modal states, not just a single viewport. Include empty, populated, success, and blocked/error states when they are part of the changed flow. - When asked for screenshots, capture all relevant affected screens and modal states, not just a single viewport. Include empty, populated, success, and blocked/error states when they are part of the changed flow.
- If a screenshot catches a transition frame, blank modal, or otherwise misleading state, retake it before reporting it. - If a screenshot catches a transition frame, blank modal, or otherwise misleading state, retake it before reporting it.
- When verifying UI in the browser, attempt to sign in by navigating to `/signin` and clicking "Continue with GitHub". If the browser lands on the GitHub login page (github.com/login) and you don't have credentials, stop and ask the user to complete the sign-in. Do not assume the session is invalid just because you see the Foundry sign-in page — always attempt the OAuth flow first.
## Realtime Data Architecture ## Realtime Data Architecture

View file

@ -29,7 +29,6 @@ Children push updates **up** to their direct coordinator only. Coordinators broa
OrganizationActor (coordinator for tasks + auth users) OrganizationActor (coordinator for tasks + auth users)
│ Index tables: │ Index tables:
│ ├─ repos → Repository catalog (GitHub sync)
│ ├─ taskIndex → TaskActor index (taskId → repoId + branchName) │ ├─ taskIndex → TaskActor index (taskId → repoId + branchName)
│ ├─ taskSummaries → TaskActor materialized sidebar projection │ ├─ taskSummaries → TaskActor materialized sidebar projection
│ ├─ authSessionIndex → UserActor index (session token → userId) │ ├─ authSessionIndex → UserActor index (session token → userId)

View file

@ -1,11 +1,10 @@
// @ts-nocheck // @ts-nocheck
import { and, desc, eq } from "drizzle-orm"; import { and, desc, eq } from "drizzle-orm";
import { actor, queue } from "rivetkit"; import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import type { AuditLogEvent } from "@sandbox-agent/foundry-shared"; import type { AuditLogEvent } from "@sandbox-agent/foundry-shared";
import { auditLogDb } from "./db/db.js"; import { auditLogDb } from "./db/db.js";
import { events } from "./db/schema.js"; import { events } from "./db/schema.js";
import { AUDIT_LOG_QUEUE_NAMES, runAuditLogWorkflow } from "./workflow.js"; import { AUDIT_LOG_QUEUE_NAMES, runAuditLogCommandLoop } from "./workflow.js";
export interface AuditLogInput { export interface AuditLogInput {
organizationId: string; organizationId: string;
@ -82,5 +81,5 @@ export const auditLog = actor({
})); }));
}, },
}, },
run: workflow(runAuditLogWorkflow), run: runAuditLogCommandLoop,
}); });

View file

@ -1,13 +1,13 @@
// @ts-nocheck // @ts-nocheck
import { Loop } from "rivetkit/workflow"; import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { events } from "./db/schema.js"; import { events } from "./db/schema.js";
import type { AppendAuditLogCommand } from "./index.js"; import type { AppendAuditLogCommand } from "./index.js";
export const AUDIT_LOG_QUEUE_NAMES = ["auditLog.command.append"] as const; export const AUDIT_LOG_QUEUE_NAMES = ["auditLog.command.append"] as const;
async function appendAuditLogRow(loopCtx: any, body: AppendAuditLogCommand): Promise<void> { async function appendAuditLogRow(c: any, body: AppendAuditLogCommand): Promise<void> {
const now = Date.now(); const now = Date.now();
await loopCtx.db await c.db
.insert(events) .insert(events)
.values({ .values({
repoId: body.repoId ?? null, repoId: body.repoId ?? null,
@ -20,21 +20,19 @@ async function appendAuditLogRow(loopCtx: any, body: AppendAuditLogCommand): Pro
.run(); .run();
} }
export async function runAuditLogWorkflow(ctx: any): Promise<void> { export async function runAuditLogCommandLoop(c: any): Promise<void> {
await ctx.loop("audit-log-command-loop", async (loopCtx: any) => { for await (const msg of c.queue.iter({ names: [...AUDIT_LOG_QUEUE_NAMES], completable: true })) {
const msg = await loopCtx.queue.next("next-audit-log-command", { try {
names: [...AUDIT_LOG_QUEUE_NAMES], if (msg.name === "auditLog.command.append") {
completable: true, await appendAuditLogRow(c, msg.body as AppendAuditLogCommand);
}); await msg.complete({ ok: true });
if (!msg) { continue;
return Loop.continue(undefined); }
await msg.complete({ error: `Unknown command: ${msg.name}` });
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("auditLog", "audit-log command failed", { queueName: msg.name, error: message });
await msg.complete({ error: message }).catch(() => {});
} }
}
if (msg.name === "auditLog.command.append") {
await loopCtx.step("append-audit-log-row", async () => appendAuditLogRow(loopCtx, msg.body as AppendAuditLogCommand));
await msg.complete({ ok: true });
}
return Loop.continue(undefined);
});
} }

View file

@ -1,7 +1,6 @@
// @ts-nocheck // @ts-nocheck
import { eq } from "drizzle-orm"; import { eq, inArray } from "drizzle-orm";
import { actor, queue } from "rivetkit"; import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import type { FoundryOrganization } from "@sandbox-agent/foundry-shared"; import type { FoundryOrganization } from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js"; import { getActorRuntimeContext } from "../context.js";
import { getOrCreateOrganization, getTask } from "../handles.js"; import { getOrCreateOrganization, getTask } from "../handles.js";
@ -12,7 +11,7 @@ import { organizationWorkflowQueueName } from "../organization/queues.js";
import { taskWorkflowQueueName } from "../task/workflow/index.js"; import { taskWorkflowQueueName } from "../task/workflow/index.js";
import { githubDataDb } from "./db/db.js"; import { githubDataDb } from "./db/db.js";
import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js"; import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js";
import { GITHUB_DATA_QUEUE_NAMES, runGithubDataWorkflow } from "./workflow.js"; import { GITHUB_DATA_QUEUE_NAMES, runGithubDataCommandLoop } from "./workflow.js";
const META_ROW_ID = 1; const META_ROW_ID = 1;
const SYNC_REPOSITORY_BATCH_SIZE = 10; const SYNC_REPOSITORY_BATCH_SIZE = 10;
@ -701,21 +700,6 @@ export async function fullSyncSetup(c: any, input: FullSyncInput = {}): Promise<
await upsertRepositories(c, repositories, startedAt, syncGeneration); await upsertRepositories(c, repositories, startedAt, syncGeneration);
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", {
connectedAccount: context.connectedAccount,
installationStatus: context.installationStatus,
installationId: context.installationId,
syncStatus: "syncing",
lastSyncLabel: totalRepositoryCount > 0 ? `Imported ${totalRepositoryCount} repositories` : "No repositories available",
lastSyncAt: currentMeta.lastSyncAt,
syncGeneration,
syncPhase: totalRepositoryCount > 0 ? "syncing_branches" : null,
processedRepositoryCount: 0,
totalRepositoryCount,
repositories,
});
return { return {
syncGeneration, syncGeneration,
startedAt, startedAt,
@ -784,7 +768,7 @@ export async function fullSyncMembers(c: any, config: FullSyncConfig): Promise<v
* Returns true when all batches have been processed. * Returns true when all batches have been processed.
*/ */
export async function fullSyncPullRequestBatch(c: any, config: FullSyncConfig, batchIndex: number): Promise<boolean> { export async function fullSyncPullRequestBatch(c: any, config: FullSyncConfig, batchIndex: number): Promise<boolean> {
const repos = readRepositoriesFromDb(c); const repos = await readRepositoriesFromDb(c);
const batches = chunkItems(repos, SYNC_REPOSITORY_BATCH_SIZE); const batches = chunkItems(repos, SYNC_REPOSITORY_BATCH_SIZE);
if (batchIndex >= batches.length) return true; if (batchIndex >= batches.length) return true;
@ -817,22 +801,6 @@ export async function fullSyncFinalize(c: any, config: FullSyncConfig): Promise<
await sweepPullRequests(c, config.syncGeneration); await sweepPullRequests(c, config.syncGeneration);
await sweepRepositories(c, config.syncGeneration); await sweepRepositories(c, config.syncGeneration);
const repos = readRepositoriesFromDb(c);
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", {
connectedAccount: config.connectedAccount,
installationStatus: config.installationStatus,
installationId: config.installationId,
syncStatus: "synced",
lastSyncLabel: config.totalRepositoryCount > 0 ? `Synced ${config.totalRepositoryCount} repositories` : "No repositories available",
lastSyncAt: config.startedAt,
syncGeneration: config.syncGeneration,
syncPhase: null,
processedRepositoryCount: config.totalRepositoryCount,
totalRepositoryCount: config.totalRepositoryCount,
repositories: repos,
});
await writeMeta(c, { await writeMeta(c, {
connectedAccount: config.connectedAccount, connectedAccount: config.connectedAccount,
installationStatus: config.installationStatus, installationStatus: config.installationStatus,
@ -908,7 +876,7 @@ export const githubData = actor({
createState: (_c, input: GithubDataInput) => ({ createState: (_c, input: GithubDataInput) => ({
organizationId: input.organizationId, organizationId: input.organizationId,
}), }),
run: workflow(runGithubDataWorkflow), run: runGithubDataCommandLoop,
actions: { actions: {
async getSummary(c) { async getSummary(c) {
const repositories = await c.db.select().from(githubRepositories).all(); const repositories = await c.db.select().from(githubRepositories).all();
@ -949,6 +917,15 @@ export const githubData = actor({
}; };
}, },
async listOpenPullRequests(c) {
const rows = await c.db
.select()
.from(githubPullRequests)
.where(inArray(githubPullRequests.state, ["OPEN", "DRAFT"]))
.all();
return rows.map((row) => pullRequestSummaryFromRow(row));
},
async listBranchesForRepository(c, input: { repoId: string }) { async listBranchesForRepository(c, input: { repoId: string }) {
const rows = await c.db.select().from(githubBranches).where(eq(githubBranches.repoId, input.repoId)).all(); const rows = await c.db.select().from(githubBranches).where(eq(githubBranches.repoId, input.repoId)).all();
return rows return rows
@ -1015,11 +992,6 @@ export async function reloadRepositoryMutation(c: any, input: { repoId: string }
updatedAt, updatedAt,
); );
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", {
repoId: input.repoId,
remoteUrl: repository.cloneUrl,
});
return { return {
repoId: input.repoId, repoId: input.repoId,
fullName: repository.fullName, fullName: repository.fullName,
@ -1049,20 +1021,6 @@ export async function clearStateMutation(c: any, input: ClearStateInput) {
totalRepositoryCount: 0, totalRepositoryCount: 0,
}); });
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", {
connectedAccount: input.connectedAccount,
installationStatus: input.installationStatus,
installationId: input.installationId,
syncStatus: "pending",
lastSyncLabel: input.label,
lastSyncAt: null,
syncGeneration: currentMeta.syncGeneration,
syncPhase: null,
processedRepositoryCount: 0,
totalRepositoryCount: 0,
repositories: [],
});
await emitPullRequestChangeEvents(c, beforeRows, []); await emitPullRequestChangeEvents(c, beforeRows, []);
} }
@ -1150,12 +1108,6 @@ export async function handlePullRequestWebhookMutation(c: any, input: PullReques
totalRepositoryCount: 0, totalRepositoryCount: 0,
}); });
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", {
repoId,
remoteUrl: input.repository.cloneUrl,
});
const afterRows = await readAllPullRequestRows(c); const afterRows = await readAllPullRequestRows(c);
await emitPullRequestChangeEvents(c, beforeRows, afterRows); await emitPullRequestChangeEvents(c, beforeRows, afterRows);
if (state === "CLOSED" || state === "MERGED") { if (state === "CLOSED" || state === "MERGED") {

View file

@ -1,6 +1,11 @@
// @ts-nocheck // @ts-nocheck
import { Loop } from "rivetkit/workflow"; import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { clearStateMutation, handlePullRequestWebhookMutation, reloadRepositoryMutation, runFullSync, fullSyncError } from "./index.js";
// Dynamic imports to break circular dependency: index.ts imports workflow.ts,
// and workflow.ts needs functions from index.ts.
async function getIndexModule() {
return await import("./index.js");
}
export const GITHUB_DATA_QUEUE_NAMES = [ export const GITHUB_DATA_QUEUE_NAMES = [
"githubData.command.syncRepos", "githubData.command.syncRepos",
@ -15,78 +20,62 @@ export function githubDataWorkflowQueueName(name: GithubDataQueueName): GithubDa
return name; return name;
} }
export async function runGithubDataWorkflow(ctx: any): Promise<void> { /**
// The org actor sends a "githubData.command.syncRepos" queue message when it * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()`
// creates this actor, so the command loop below handles the initial sync. * with completable messages. This avoids the RivetKit bug where actors created
// * from another actor's workflow context never start their `run: workflow(...)`.
// IMPORTANT: Do NOT use workflow sub-loops (ctx.loop) inside command handlers. */
// RivetKit workflow sub-loops inside a parent loop cause HistoryDivergedError export async function runGithubDataCommandLoop(c: any): Promise<void> {
// on the second iteration because entries from the first iteration's sub-loop for await (const msg of c.queue.iter({ names: [...GITHUB_DATA_QUEUE_NAMES], completable: true })) {
// are still in history but not visited during replay of iteration 2. Use native
// JS loops inside a single step instead. See .context/rivetkit-subloop-bug.md.
await ctx.loop("github-data-command-loop", async (loopCtx: any) => {
const msg = await loopCtx.queue.next("next-github-data-command", {
names: [...GITHUB_DATA_QUEUE_NAMES],
completable: true,
});
if (!msg) {
return Loop.continue(undefined);
}
try { try {
if (msg.name === "githubData.command.syncRepos") { if (msg.name === "githubData.command.syncRepos") {
try { try {
// Single opaque step for the entire sync. Do NOT decompose into const { runFullSync } = await getIndexModule();
// sub-loops/sub-steps — see comment at top of function. await runFullSync(c, msg.body);
await loopCtx.step({
name: "github-data-sync-repos",
timeout: 5 * 60_000,
run: async () => runFullSync(loopCtx, msg.body),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
} catch (error) { } catch (error) {
await loopCtx.step("sync-repos-error", async () => fullSyncError(loopCtx, error)); const { fullSyncError } = await getIndexModule();
try {
await fullSyncError(c, error);
} catch {
/* best effort */
}
const message = error instanceof Error ? error.message : String(error); const message = error instanceof Error ? error.message : String(error);
await msg.complete({ error: message }).catch(() => {}); await msg.complete({ error: message }).catch(() => {});
} }
return Loop.continue(undefined); continue;
} }
if (msg.name === "githubData.command.reloadRepository") { if (msg.name === "githubData.command.reloadRepository") {
const result = await loopCtx.step({ const { reloadRepositoryMutation } = await getIndexModule();
name: "github-data-reload-repository", const result = await reloadRepositoryMutation(c, msg.body);
timeout: 5 * 60_000,
run: async () => reloadRepositoryMutation(loopCtx, msg.body),
});
await msg.complete(result); await msg.complete(result);
return Loop.continue(undefined); continue;
} }
if (msg.name === "githubData.command.clearState") { if (msg.name === "githubData.command.clearState") {
await loopCtx.step({ const { clearStateMutation } = await getIndexModule();
name: "github-data-clear-state", await clearStateMutation(c, msg.body);
timeout: 60_000,
run: async () => clearStateMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
return Loop.continue(undefined); continue;
} }
if (msg.name === "githubData.command.handlePullRequestWebhook") { if (msg.name === "githubData.command.handlePullRequestWebhook") {
await loopCtx.step({ const { handlePullRequestWebhookMutation } = await getIndexModule();
name: "github-data-handle-pull-request-webhook", await handlePullRequestWebhookMutation(c, msg.body);
timeout: 60_000,
run: async () => handlePullRequestWebhookMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
return Loop.continue(undefined); continue;
} }
logActorWarning("githubData", "unknown queue message", { queueName: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` });
} catch (error) { } catch (error) {
const message = error instanceof Error ? error.message : String(error); const message = resolveErrorMessage(error);
logActorWarning("githubData", "github-data command failed", {
queueName: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {}); await msg.complete({ error: message }).catch(() => {});
} }
}
return Loop.continue(undefined);
});
} }

View file

@ -10,8 +10,8 @@ import type {
OrganizationUseInput, OrganizationUseInput,
} from "@sandbox-agent/foundry-shared"; } from "@sandbox-agent/foundry-shared";
import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { repoIdFromRemote } from "../../services/repo.js"; import { getOrCreateGithubData } from "../handles.js";
import { organizationProfile, repos, taskSummaries } from "./db/schema.js"; import { organizationProfile, taskSummaries } from "./db/schema.js";
import { organizationAppActions } from "./actions/app.js"; import { organizationAppActions } from "./actions/app.js";
import { organizationBetterAuthActions } from "./actions/better-auth.js"; import { organizationBetterAuthActions } from "./actions/better-auth.js";
import { organizationOnboardingActions } from "./actions/onboarding.js"; import { organizationOnboardingActions } from "./actions/onboarding.js";
@ -45,18 +45,6 @@ function repoLabelFromRemote(remoteUrl: string): string {
return remoteUrl; return remoteUrl;
} }
function buildRepoSummary(repoRow: { repoId: string; remoteUrl: string; updatedAt: number }, taskRows: WorkspaceTaskSummary[]): WorkspaceRepositorySummary {
const repoTasks = taskRows.filter((task) => task.repoId === repoRow.repoId);
const latestActivityMs = repoTasks.reduce((latest, task) => Math.max(latest, task.updatedAtMs), repoRow.updatedAt);
return {
id: repoRow.repoId,
label: repoLabelFromRemote(repoRow.remoteUrl),
taskCount: repoTasks.length,
latestActivityMs,
};
}
function buildGithubSummary(profile: any, importedRepoCount: number): OrganizationGithubSummary { function buildGithubSummary(profile: any, importedRepoCount: number): OrganizationGithubSummary {
return { return {
connectedAccount: profile?.githubConnectedAccount ?? "", connectedAccount: profile?.githubConnectedAccount ?? "",
@ -81,18 +69,19 @@ function buildGithubSummary(profile: any, importedRepoCount: number): Organizati
*/ */
async function getOrganizationSummarySnapshot(c: any): Promise<OrganizationSummarySnapshot> { async function getOrganizationSummarySnapshot(c: any): Promise<OrganizationSummarySnapshot> {
const profile = await c.db.select().from(organizationProfile).where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID)).get(); const profile = await c.db.select().from(organizationProfile).where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID)).get();
const repoRows = await c.db
.select({ // Fetch repos + open PRs from github-data actor (single actor, not fan-out)
repoId: repos.repoId, let repoRows: Array<{ repoId: string; fullName: string; cloneUrl: string; private: boolean; defaultBranch: string }> = [];
remoteUrl: repos.remoteUrl, let openPullRequests: any[] = [];
updatedAt: repos.updatedAt, try {
}) const githubData = await getOrCreateGithubData(c, c.state.organizationId);
.from(repos) [repoRows, openPullRequests] = await Promise.all([githubData.listRepositories({}), githubData.listOpenPullRequests({})]);
.orderBy(desc(repos.updatedAt)) } catch {
.all(); // github-data actor may not exist yet
}
const summaryRows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all(); const summaryRows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all();
const summaries: WorkspaceTaskSummary[] = summaryRows.map((row) => ({ const summaries = summaryRows.map((row) => ({
id: row.taskId, id: row.taskId,
repoId: row.repoId, repoId: row.repoId,
title: row.title, title: row.title,
@ -123,8 +112,20 @@ async function getOrganizationSummarySnapshot(c: any): Promise<OrganizationSumma
return { return {
organizationId: c.state.organizationId, organizationId: c.state.organizationId,
github: buildGithubSummary(profile, repoRows.length), github: buildGithubSummary(profile, repoRows.length),
repos: repoRows.map((row) => buildRepoSummary(row, summaries)).sort((left, right) => right.latestActivityMs - left.latestActivityMs), repos: repoRows
.map((repo) => {
const repoTasks = summaries.filter((t) => t.repoId === repo.repoId);
const latestTaskMs = repoTasks.reduce((latest, t) => Math.max(latest, t.updatedAtMs), 0);
return {
id: repo.repoId,
label: repoLabelFromRemote(repo.cloneUrl),
taskCount: repoTasks.length,
latestActivityMs: latestTaskMs || Date.now(),
};
})
.sort((a, b) => b.latestActivityMs - a.latestActivityMs),
taskSummaries: summaries, taskSummaries: summaries,
openPullRequests,
}; };
} }
@ -149,25 +150,19 @@ export const organizationActions = {
async listRepos(c: any, input: OrganizationUseInput): Promise<RepoRecord[]> { async listRepos(c: any, input: OrganizationUseInput): Promise<RepoRecord[]> {
assertOrganization(c, input.organizationId); assertOrganization(c, input.organizationId);
try {
const rows = await c.db const githubData = await getOrCreateGithubData(c, c.state.organizationId);
.select({ const rows = await githubData.listRepositories({});
repoId: repos.repoId, return rows.map((row: any) => ({
remoteUrl: repos.remoteUrl, organizationId: c.state.organizationId,
createdAt: repos.createdAt, repoId: row.repoId,
updatedAt: repos.updatedAt, remoteUrl: row.cloneUrl,
}) createdAt: row.updatedAt ?? Date.now(),
.from(repos) updatedAt: row.updatedAt ?? Date.now(),
.orderBy(desc(repos.updatedAt)) }));
.all(); } catch {
return [];
return rows.map((row) => ({ }
organizationId: c.state.organizationId,
repoId: row.repoId,
remoteUrl: row.remoteUrl,
createdAt: row.createdAt,
updatedAt: row.updatedAt,
}));
}, },
async getOrganizationSummary(c: any, input: OrganizationUseInput): Promise<OrganizationSummarySnapshot> { async getOrganizationSummary(c: any, input: OrganizationUseInput): Promise<OrganizationSummarySnapshot> {
@ -176,103 +171,6 @@ export const organizationActions = {
}, },
}; };
export async function applyGithubRepositoryProjectionMutation(c: any, input: { repoId: string; remoteUrl: string }): Promise<void> {
const now = Date.now();
await c.db
.insert(repos)
.values({
repoId: input.repoId,
remoteUrl: input.remoteUrl,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: repos.repoId,
set: {
remoteUrl: input.remoteUrl,
updatedAt: now,
},
})
.run();
await refreshOrganizationSnapshotMutation(c);
}
export async function applyGithubDataProjectionMutation(
c: any,
input: {
connectedAccount: string;
installationStatus: string;
installationId: number | null;
syncStatus: string;
lastSyncLabel: string;
lastSyncAt: number | null;
syncGeneration: number;
syncPhase: string | null;
processedRepositoryCount: number;
totalRepositoryCount: number;
repositories: Array<{ fullName: string; cloneUrl: string; private: boolean }>;
},
): Promise<void> {
const existingRepos = await c.db.select({ repoId: repos.repoId }).from(repos).all();
const nextRepoIds = new Set<string>();
const now = Date.now();
const profile = await c.db
.select({ id: organizationProfile.id })
.from(organizationProfile)
.where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID))
.get();
if (profile) {
await c.db
.update(organizationProfile)
.set({
githubConnectedAccount: input.connectedAccount,
githubInstallationStatus: input.installationStatus,
githubSyncStatus: input.syncStatus,
githubInstallationId: input.installationId,
githubLastSyncLabel: input.lastSyncLabel,
githubLastSyncAt: input.lastSyncAt,
githubSyncGeneration: input.syncGeneration,
githubSyncPhase: input.syncPhase,
githubProcessedRepositoryCount: input.processedRepositoryCount,
githubTotalRepositoryCount: input.totalRepositoryCount,
updatedAt: now,
})
.where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID))
.run();
}
for (const repository of input.repositories) {
const repoId = repoIdFromRemote(repository.cloneUrl);
nextRepoIds.add(repoId);
await c.db
.insert(repos)
.values({
repoId,
remoteUrl: repository.cloneUrl,
createdAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: repos.repoId,
set: {
remoteUrl: repository.cloneUrl,
updatedAt: now,
},
})
.run();
}
for (const repo of existingRepos) {
if (nextRepoIds.has(repo.repoId)) {
continue;
}
await c.db.delete(repos).where(eq(repos.repoId, repo.repoId)).run();
}
await refreshOrganizationSnapshotMutation(c);
}
export async function applyGithubSyncProgressMutation( export async function applyGithubSyncProgressMutation(
c: any, c: any,
input: { input: {

View file

@ -17,7 +17,7 @@ import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../../service
import { expectQueueResponse } from "../../../services/queue.js"; import { expectQueueResponse } from "../../../services/queue.js";
import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js"; import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js";
import { defaultSandboxProviderId } from "../../../sandbox-config.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js";
import { taskIndex, taskSummaries, repos } from "../db/schema.js"; import { taskIndex, taskSummaries } from "../db/schema.js";
import { refreshOrganizationSnapshotMutation } from "../actions.js"; import { refreshOrganizationSnapshotMutation } from "../actions.js";
interface CreateTaskCommand { interface CreateTaskCommand {
@ -120,11 +120,6 @@ async function listGitHubBranches(c: any, repoId: string): Promise<Array<{ branc
} }
async function resolveRepositoryRemoteUrl(c: any, repoId: string): Promise<string> { async function resolveRepositoryRemoteUrl(c: any, repoId: string): Promise<string> {
const repoRow = await c.db.select({ remoteUrl: repos.remoteUrl }).from(repos).where(eq(repos.repoId, repoId)).get();
if (repoRow?.remoteUrl) {
return repoRow.remoteUrl;
}
const repository = await resolveGitHubRepository(c, repoId); const repository = await resolveGitHubRepository(c, repoId);
const remoteUrl = repository?.cloneUrl?.trim(); const remoteUrl = repository?.cloneUrl?.trim();
if (!remoteUrl) { if (!remoteUrl) {

View file

@ -21,12 +21,11 @@ import type {
TaskWorkspaceUpdateDraftInput, TaskWorkspaceUpdateDraftInput,
} from "@sandbox-agent/foundry-shared"; } from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../../context.js"; import { getActorRuntimeContext } from "../../context.js";
import { getOrCreateAuditLog, getTask as getTaskHandle, selfOrganization } from "../../handles.js"; import { getOrCreateAuditLog, getOrCreateGithubData, getTask as getTaskHandle, selfOrganization } from "../../handles.js";
import { defaultSandboxProviderId } from "../../../sandbox-config.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js";
import { expectQueueResponse } from "../../../services/queue.js"; import { expectQueueResponse } from "../../../services/queue.js";
import { logActorWarning, resolveErrorMessage } from "../../logging.js"; import { logActorWarning, resolveErrorMessage } from "../../logging.js";
import { taskWorkflowQueueName } from "../../task/workflow/index.js"; import { taskWorkflowQueueName } from "../../task/workflow/index.js";
import { repos } from "../db/schema.js";
import { organizationWorkflowQueueName } from "../queues.js"; import { organizationWorkflowQueueName } from "../queues.js";
import { import {
createTaskMutation, createTaskMutation,
@ -44,8 +43,9 @@ function assertOrganization(c: { state: { organizationId: string } }, organizati
} }
async function requireRepoExists(c: any, repoId: string): Promise<void> { async function requireRepoExists(c: any, repoId: string): Promise<void> {
const repoRow = await c.db.select({ repoId: repos.repoId }).from(repos).where(eq(repos.repoId, repoId)).get(); const githubData = await getOrCreateGithubData(c, c.state.organizationId);
if (!repoRow) { const repo = await githubData.getRepository({ repoId });
if (!repo) {
throw new Error(`Unknown repo: ${repoId}`); throw new Error(`Unknown repo: ${repoId}`);
} }
} }

View file

@ -19,7 +19,7 @@ import { getBetterAuthService } from "../../services/better-auth.js";
import { expectQueueResponse } from "../../services/queue.js"; import { expectQueueResponse } from "../../services/queue.js";
import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js"; import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js";
import { logger } from "../../logging.js"; import { logger } from "../../logging.js";
import { invoices, organizationMembers, organizationProfile, repos, seatAssignments, stripeLookup } from "./db/schema.js"; import { invoices, organizationMembers, organizationProfile, seatAssignments, stripeLookup } from "./db/schema.js";
import { APP_SHELL_ORGANIZATION_ID } from "./constants.js"; import { APP_SHELL_ORGANIZATION_ID } from "./constants.js";
import { organizationWorkflowQueueName } from "./queues.js"; import { organizationWorkflowQueueName } from "./queues.js";
@ -575,8 +575,13 @@ async function listOrganizationInvoices(c: any): Promise<FoundryBillingState["in
async function listOrganizationRepoCatalog(c: any): Promise<string[]> { async function listOrganizationRepoCatalog(c: any): Promise<string[]> {
assertOrganizationShell(c); assertOrganizationShell(c);
const rows = await c.db.select({ remoteUrl: repos.remoteUrl }).from(repos).orderBy(desc(repos.updatedAt)).all(); try {
return rows.map((row) => repoLabelFromRemote(row.remoteUrl)).sort((left, right) => left.localeCompare(right)); const githubData = await getOrCreateGithubData(c, c.state.organizationId);
const rows = await githubData.listRepositories({});
return rows.map((row: any) => repoLabelFromRemote(row.cloneUrl)).sort((a: string, b: string) => a.localeCompare(b));
} catch {
return [];
}
} }
export async function buildOrganizationState(c: any) { export async function buildOrganizationState(c: any) {

View file

@ -4,17 +4,6 @@ import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared";
// SQLite is per organization actor instance, so no organizationId column needed. // SQLite is per organization actor instance, so no organizationId column needed.
/**
* Repository catalog. Rows are created/removed when repos are added/removed
* from the organization via GitHub sync.
*/
export const repos = sqliteTable("repos", {
repoId: text("repo_id").notNull().primaryKey(),
remoteUrl: text("remote_url").notNull(),
createdAt: integer("created_at").notNull(),
updatedAt: integer("updated_at").notNull(),
});
/** /**
* Coordinator index of TaskActor instances. * Coordinator index of TaskActor instances.
* The organization actor is the direct coordinator for tasks (not a per-repo * The organization actor is the direct coordinator for tasks (not a per-repo

View file

@ -1,9 +1,8 @@
import { actor, queue } from "rivetkit"; import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import { organizationDb } from "./db/db.js"; import { organizationDb } from "./db/db.js";
import { organizationActions } from "./actions.js"; import { organizationActions } from "./actions.js";
import { ORGANIZATION_QUEUE_NAMES } from "./queues.js"; import { ORGANIZATION_QUEUE_NAMES } from "./queues.js";
import { runOrganizationWorkflow } from "./workflow.js"; import { runOrganizationCommandLoop } from "./workflow.js";
export const organization = actor({ export const organization = actor({
db: organizationDb, db: organizationDb,
@ -17,5 +16,5 @@ export const organization = actor({
organizationId, organizationId,
}), }),
actions: organizationActions, actions: organizationActions,
run: workflow(runOrganizationWorkflow), run: runOrganizationCommandLoop,
}); });

View file

@ -18,8 +18,6 @@ export const ORGANIZATION_QUEUE_NAMES = [
"organization.command.better_auth.verification.update_many", "organization.command.better_auth.verification.update_many",
"organization.command.better_auth.verification.delete", "organization.command.better_auth.verification.delete",
"organization.command.better_auth.verification.delete_many", "organization.command.better_auth.verification.delete_many",
"organization.command.github.repository_projection.apply",
"organization.command.github.data_projection.apply",
"organization.command.github.sync_progress.apply", "organization.command.github.sync_progress.apply",
"organization.command.github.webhook_receipt.record", "organization.command.github.webhook_receipt.record",
"organization.command.github.organization_shell.sync_from_github", "organization.command.github.organization_shell.sync_from_github",

View file

@ -1,13 +1,6 @@
// @ts-nocheck // @ts-nocheck
import { Loop } from "rivetkit/workflow";
import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { import { applyGithubSyncProgressMutation, recordGithubWebhookReceiptMutation, refreshOrganizationSnapshotMutation } from "./actions.js";
applyGithubDataProjectionMutation,
applyGithubRepositoryProjectionMutation,
applyGithubSyncProgressMutation,
recordGithubWebhookReceiptMutation,
refreshOrganizationSnapshotMutation,
} from "./actions.js";
import { import {
applyTaskSummaryUpdateMutation, applyTaskSummaryUpdateMutation,
createTaskMutation, createTaskMutation,
@ -42,363 +35,134 @@ import {
} from "./app-shell.js"; } from "./app-shell.js";
import { ORGANIZATION_QUEUE_NAMES } from "./queues.js"; import { ORGANIZATION_QUEUE_NAMES } from "./queues.js";
export async function runOrganizationWorkflow(ctx: any): Promise<void> { // Command handler dispatch table — maps queue name to handler function.
await ctx.loop("organization-command-loop", async (loopCtx: any) => { const COMMAND_HANDLERS: Record<string, (c: any, body: any) => Promise<any>> = {
const msg = await loopCtx.queue.next("next-organization-command", { "organization.command.createTask": (c, body) => createTaskMutation(c, body),
names: [...ORGANIZATION_QUEUE_NAMES], "organization.command.materializeTask": (c, body) => createTaskMutation(c, body),
completable: true, "organization.command.registerTaskBranch": (c, body) => registerTaskBranchMutation(c, body),
}); "organization.command.applyTaskSummaryUpdate": async (c, body) => {
if (!msg) { await applyTaskSummaryUpdateMutation(c, body);
return Loop.continue(undefined); return { ok: true };
} },
"organization.command.removeTaskSummary": async (c, body) => {
await removeTaskSummaryMutation(c, body);
return { ok: true };
},
"organization.command.refreshTaskSummaryForBranch": async (c, body) => {
await refreshTaskSummaryForBranchMutation(c, body);
return { ok: true };
},
"organization.command.snapshot.broadcast": async (c, _body) => {
await refreshOrganizationSnapshotMutation(c);
return { ok: true };
},
"organization.command.syncGithubSession": async (c, body) => {
const { syncGithubOrganizations } = await import("./app-shell.js");
await syncGithubOrganizations(c, body);
return { ok: true };
},
"organization.command.better_auth.session_index.upsert": (c, body) => betterAuthUpsertSessionIndexMutation(c, body),
"organization.command.better_auth.session_index.delete": async (c, body) => {
await betterAuthDeleteSessionIndexMutation(c, body);
return { ok: true };
},
"organization.command.better_auth.email_index.upsert": (c, body) => betterAuthUpsertEmailIndexMutation(c, body),
"organization.command.better_auth.email_index.delete": async (c, body) => {
await betterAuthDeleteEmailIndexMutation(c, body);
return { ok: true };
},
"organization.command.better_auth.account_index.upsert": (c, body) => betterAuthUpsertAccountIndexMutation(c, body),
"organization.command.better_auth.account_index.delete": async (c, body) => {
await betterAuthDeleteAccountIndexMutation(c, body);
return { ok: true };
},
"organization.command.better_auth.verification.create": (c, body) => betterAuthCreateVerificationMutation(c, body),
"organization.command.better_auth.verification.update": (c, body) => betterAuthUpdateVerificationMutation(c, body),
"organization.command.better_auth.verification.update_many": (c, body) => betterAuthUpdateManyVerificationMutation(c, body),
"organization.command.better_auth.verification.delete": async (c, body) => {
await betterAuthDeleteVerificationMutation(c, body);
return { ok: true };
},
"organization.command.better_auth.verification.delete_many": (c, body) => betterAuthDeleteManyVerificationMutation(c, body),
"organization.command.github.sync_progress.apply": async (c, body) => {
await applyGithubSyncProgressMutation(c, body);
return { ok: true };
},
"organization.command.github.webhook_receipt.record": async (c, body) => {
await recordGithubWebhookReceiptMutation(c, body);
return { ok: true };
},
"organization.command.github.organization_shell.sync_from_github": (c, body) => syncOrganizationShellFromGithubMutation(c, body),
"organization.command.shell.profile.update": async (c, body) => {
await updateOrganizationShellProfileMutation(c, body);
return { ok: true };
},
"organization.command.shell.sync_started.mark": async (c, body) => {
await markOrganizationSyncStartedMutation(c, body);
return { ok: true };
},
"organization.command.billing.stripe_customer.apply": async (c, body) => {
await applyOrganizationStripeCustomerMutation(c, body);
return { ok: true };
},
"organization.command.billing.stripe_subscription.apply": async (c, body) => {
await applyOrganizationStripeSubscriptionMutation(c, body);
return { ok: true };
},
"organization.command.billing.free_plan.apply": async (c, body) => {
await applyOrganizationFreePlanMutation(c, body);
return { ok: true };
},
"organization.command.billing.payment_method.set": async (c, body) => {
await setOrganizationBillingPaymentMethodMutation(c, body);
return { ok: true };
},
"organization.command.billing.status.set": async (c, body) => {
await setOrganizationBillingStatusMutation(c, body);
return { ok: true };
},
"organization.command.billing.invoice.upsert": async (c, body) => {
await upsertOrganizationInvoiceMutation(c, body);
return { ok: true };
},
"organization.command.billing.seat_usage.record": async (c, body) => {
await recordOrganizationSeatUsageMutation(c, body);
return { ok: true };
},
};
/**
* Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()`
* with completable messages. This avoids the RivetKit bug where actors created
* from another actor's workflow context never start their `run: workflow(...)`.
*
* The queue is still durable messages survive restarts. Only in-flight processing
* of a single message is lost on crash (the message is retried). All mutations are
* idempotent, so this is safe.
*/
export async function runOrganizationCommandLoop(c: any): Promise<void> {
for await (const msg of c.queue.iter({ names: [...ORGANIZATION_QUEUE_NAMES], completable: true })) {
try { try {
if (msg.name === "organization.command.createTask") { const handler = COMMAND_HANDLERS[msg.name];
const result = await loopCtx.step({ if (handler) {
name: "organization-create-task", const result = await handler(c, msg.body);
timeout: 5 * 60_000,
run: async () => createTaskMutation(loopCtx, msg.body),
});
await msg.complete(result); await msg.complete(result);
return Loop.continue(undefined); } else {
} logActorWarning("organization", "unknown queue message", { queueName: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` });
if (msg.name === "organization.command.materializeTask") {
const result = await loopCtx.step({
name: "organization-materialize-task",
timeout: 5 * 60_000,
run: async () => createTaskMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.registerTaskBranch") {
const result = await loopCtx.step({
name: "organization-register-task-branch",
timeout: 60_000,
run: async () => registerTaskBranchMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.applyTaskSummaryUpdate") {
await loopCtx.step({
name: "organization-apply-task-summary-update",
timeout: 30_000,
run: async () => applyTaskSummaryUpdateMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.removeTaskSummary") {
await loopCtx.step({
name: "organization-remove-task-summary",
timeout: 30_000,
run: async () => removeTaskSummaryMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.refreshTaskSummaryForBranch") {
await loopCtx.step({
name: "organization-refresh-task-summary-for-branch",
timeout: 60_000,
run: async () => refreshTaskSummaryForBranchMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.snapshot.broadcast") {
await loopCtx.step({
name: "organization-snapshot-broadcast",
timeout: 60_000,
run: async () => refreshOrganizationSnapshotMutation(loopCtx),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.syncGithubSession") {
await loopCtx.step({
name: "organization-sync-github-session",
timeout: 60_000,
run: async () => {
const { syncGithubOrganizations } = await import("./app-shell.js");
await syncGithubOrganizations(loopCtx, msg.body as { sessionId: string; accessToken: string });
},
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.session_index.upsert") {
const result = await loopCtx.step({
name: "organization-better-auth-session-index-upsert",
timeout: 60_000,
run: async () => betterAuthUpsertSessionIndexMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.session_index.delete") {
await loopCtx.step({
name: "organization-better-auth-session-index-delete",
timeout: 60_000,
run: async () => betterAuthDeleteSessionIndexMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.email_index.upsert") {
const result = await loopCtx.step({
name: "organization-better-auth-email-index-upsert",
timeout: 60_000,
run: async () => betterAuthUpsertEmailIndexMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.email_index.delete") {
await loopCtx.step({
name: "organization-better-auth-email-index-delete",
timeout: 60_000,
run: async () => betterAuthDeleteEmailIndexMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.account_index.upsert") {
const result = await loopCtx.step({
name: "organization-better-auth-account-index-upsert",
timeout: 60_000,
run: async () => betterAuthUpsertAccountIndexMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.account_index.delete") {
await loopCtx.step({
name: "organization-better-auth-account-index-delete",
timeout: 60_000,
run: async () => betterAuthDeleteAccountIndexMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.verification.create") {
const result = await loopCtx.step({
name: "organization-better-auth-verification-create",
timeout: 60_000,
run: async () => betterAuthCreateVerificationMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.verification.update") {
const result = await loopCtx.step({
name: "organization-better-auth-verification-update",
timeout: 60_000,
run: async () => betterAuthUpdateVerificationMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.verification.update_many") {
const result = await loopCtx.step({
name: "organization-better-auth-verification-update-many",
timeout: 60_000,
run: async () => betterAuthUpdateManyVerificationMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.verification.delete") {
await loopCtx.step({
name: "organization-better-auth-verification-delete",
timeout: 60_000,
run: async () => betterAuthDeleteVerificationMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.better_auth.verification.delete_many") {
const result = await loopCtx.step({
name: "organization-better-auth-verification-delete-many",
timeout: 60_000,
run: async () => betterAuthDeleteManyVerificationMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.github.repository_projection.apply") {
await loopCtx.step({
name: "organization-github-repository-projection-apply",
timeout: 60_000,
run: async () => applyGithubRepositoryProjectionMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.github.data_projection.apply") {
await loopCtx.step({
name: "organization-github-data-projection-apply",
timeout: 60_000,
run: async () => applyGithubDataProjectionMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.github.sync_progress.apply") {
await loopCtx.step({
name: "organization-github-sync-progress-apply",
timeout: 60_000,
run: async () => applyGithubSyncProgressMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.github.webhook_receipt.record") {
await loopCtx.step({
name: "organization-github-webhook-receipt-record",
timeout: 60_000,
run: async () => recordGithubWebhookReceiptMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.github.organization_shell.sync_from_github") {
const result = await loopCtx.step({
name: "organization-github-organization-shell-sync-from-github",
timeout: 60_000,
run: async () => syncOrganizationShellFromGithubMutation(loopCtx, msg.body),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "organization.command.shell.profile.update") {
await loopCtx.step({
name: "organization-shell-profile-update",
timeout: 60_000,
run: async () => updateOrganizationShellProfileMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.shell.sync_started.mark") {
await loopCtx.step({
name: "organization-shell-sync-started-mark",
timeout: 60_000,
run: async () => markOrganizationSyncStartedMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.billing.stripe_customer.apply") {
await loopCtx.step({
name: "organization-billing-stripe-customer-apply",
timeout: 60_000,
run: async () => applyOrganizationStripeCustomerMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.billing.stripe_subscription.apply") {
await loopCtx.step({
name: "organization-billing-stripe-subscription-apply",
timeout: 60_000,
run: async () => applyOrganizationStripeSubscriptionMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.billing.free_plan.apply") {
await loopCtx.step({
name: "organization-billing-free-plan-apply",
timeout: 60_000,
run: async () => applyOrganizationFreePlanMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.billing.payment_method.set") {
await loopCtx.step({
name: "organization-billing-payment-method-set",
timeout: 60_000,
run: async () => setOrganizationBillingPaymentMethodMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.billing.status.set") {
await loopCtx.step({
name: "organization-billing-status-set",
timeout: 60_000,
run: async () => setOrganizationBillingStatusMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.billing.invoice.upsert") {
await loopCtx.step({
name: "organization-billing-invoice-upsert",
timeout: 60_000,
run: async () => upsertOrganizationInvoiceMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.billing.seat_usage.record") {
await loopCtx.step({
name: "organization-billing-seat-usage-record",
timeout: 60_000,
run: async () => recordOrganizationSeatUsageMutation(loopCtx, msg.body),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
} }
} catch (error) { } catch (error) {
const message = resolveErrorMessage(error); const message = resolveErrorMessage(error);
logActorWarning("organization", "organization workflow command failed", { logActorWarning("organization", "organization command failed", {
queueName: msg.name, queueName: msg.name,
error: message, error: message,
}); });
await msg.complete({ error: message }).catch((completeError: unknown) => { await msg.complete({ error: message }).catch((completeError: unknown) => {
logActorWarning("organization", "organization workflow failed completing error response", { logActorWarning("organization", "organization command failed completing error response", {
queueName: msg.name, queueName: msg.name,
error: resolveErrorMessage(completeError), error: resolveErrorMessage(completeError),
}); });
}); });
} }
}
return Loop.continue(undefined);
});
} }

View file

@ -1,10 +1,9 @@
import { actor, queue } from "rivetkit"; import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import type { TaskRecord } from "@sandbox-agent/foundry-shared"; import type { TaskRecord } from "@sandbox-agent/foundry-shared";
import { taskDb } from "./db/db.js"; import { taskDb } from "./db/db.js";
import { getCurrentRecord } from "./workflow/common.js"; import { getCurrentRecord } from "./workflow/common.js";
import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js"; import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js";
import { TASK_QUEUE_NAMES, runTaskWorkflow } from "./workflow/index.js"; import { TASK_QUEUE_NAMES, runTaskCommandLoop } from "./workflow/index.js";
export interface TaskInput { export interface TaskInput {
organizationId: string; organizationId: string;
@ -42,7 +41,7 @@ export const task = actor({
return await getSessionDetail(c, input.sessionId, input.authSessionId); return await getSessionDetail(c, input.sessionId, input.authSessionId);
}, },
}, },
run: workflow(runTaskWorkflow), run: runTaskCommandLoop,
}); });
export { TASK_QUEUE_NAMES }; export { TASK_QUEUE_NAMES };

View file

@ -1,4 +1,3 @@
import { Loop } from "rivetkit/workflow";
import { logActorWarning, resolveErrorMessage } from "../../logging.js"; import { logActorWarning, resolveErrorMessage } from "../../logging.js";
import { getCurrentRecord } from "./common.js"; import { getCurrentRecord } from "./common.js";
import { initBootstrapDbActivity, initCompleteActivity, initEnqueueProvisionActivity, initFailedActivity } from "./init.js"; import { initBootstrapDbActivity, initCompleteActivity, initEnqueueProvisionActivity, initFailedActivity } from "./init.js";
@ -38,16 +37,14 @@ export { TASK_QUEUE_NAMES, taskWorkflowQueueName } from "./queue.js";
type TaskQueueName = (typeof TASK_QUEUE_NAMES)[number]; type TaskQueueName = (typeof TASK_QUEUE_NAMES)[number];
type WorkflowHandler = (loopCtx: any, msg: { name: TaskQueueName; body: any; complete: (response: unknown) => Promise<void> }) => Promise<void>; type CommandHandler = (c: any, msg: { name: TaskQueueName; body: any; complete: (response: unknown) => Promise<void> }) => Promise<void>;
const commandHandlers: Record<TaskQueueName, WorkflowHandler> = { const commandHandlers: Record<TaskQueueName, CommandHandler> = {
"task.command.initialize": async (loopCtx, msg) => { "task.command.initialize": async (c, msg) => {
const body = msg.body; const body = msg.body;
await initBootstrapDbActivity(c, body);
await loopCtx.step("init-bootstrap-db", async () => initBootstrapDbActivity(loopCtx, body)); await initEnqueueProvisionActivity(c, body);
await loopCtx.step("init-enqueue-provision", async () => initEnqueueProvisionActivity(loopCtx, body)); const currentRecord = await getCurrentRecord(c);
await loopCtx.removed("init-dispatch-provision-v2", "step");
const currentRecord = await loopCtx.step("init-read-current-record", async () => getCurrentRecord(loopCtx));
try { try {
await msg.complete(currentRecord); await msg.complete(currentRecord);
} catch (error) { } catch (error) {
@ -57,23 +54,12 @@ const commandHandlers: Record<TaskQueueName, WorkflowHandler> = {
} }
}, },
"task.command.provision": async (loopCtx, msg) => { "task.command.provision": async (c, msg) => {
await loopCtx.removed("init-failed", "step");
await loopCtx.removed("init-failed-v2", "step");
try { try {
await loopCtx.removed("init-ensure-name", "step"); await initCompleteActivity(c, msg.body);
await loopCtx.removed("init-assert-name", "step");
await loopCtx.removed("init-create-sandbox", "step");
await loopCtx.removed("init-ensure-agent", "step");
await loopCtx.removed("init-start-sandbox-instance", "step");
await loopCtx.removed("init-expose-sandbox", "step");
await loopCtx.removed("init-create-session", "step");
await loopCtx.removed("init-write-db", "step");
await loopCtx.removed("init-start-status-sync", "step");
await loopCtx.step("init-complete", async () => initCompleteActivity(loopCtx, msg.body));
await msg.complete({ ok: true }); await msg.complete({ ok: true });
} catch (error) { } catch (error) {
await loopCtx.step("init-failed-v3", async () => initFailedActivity(loopCtx, error, msg.body)); await initFailedActivity(c, error, msg.body);
await msg.complete({ await msg.complete({
ok: false, ok: false,
error: resolveErrorMessage(error), error: resolveErrorMessage(error),
@ -81,79 +67,67 @@ const commandHandlers: Record<TaskQueueName, WorkflowHandler> = {
} }
}, },
"task.command.attach": async (loopCtx, msg) => { "task.command.attach": async (c, msg) => {
await loopCtx.step("handle-attach", async () => handleAttachActivity(loopCtx, msg)); await handleAttachActivity(c, msg);
}, },
"task.command.switch": async (loopCtx, msg) => { "task.command.switch": async (c, msg) => {
await loopCtx.step("handle-switch", async () => handleSwitchActivity(loopCtx, msg)); await handleSwitchActivity(c, msg);
}, },
"task.command.push": async (loopCtx, msg) => { "task.command.push": async (c, msg) => {
await loopCtx.step("handle-push", async () => handlePushActivity(loopCtx, msg)); await handlePushActivity(c, msg);
}, },
"task.command.sync": async (loopCtx, msg) => { "task.command.sync": async (c, msg) => {
await loopCtx.step("handle-sync", async () => handleSimpleCommandActivity(loopCtx, msg, "task.sync")); await handleSimpleCommandActivity(c, msg, "task.sync");
}, },
"task.command.merge": async (loopCtx, msg) => { "task.command.merge": async (c, msg) => {
await loopCtx.step("handle-merge", async () => handleSimpleCommandActivity(loopCtx, msg, "task.merge")); await handleSimpleCommandActivity(c, msg, "task.merge");
}, },
"task.command.archive": async (loopCtx, msg) => { "task.command.archive": async (c, msg) => {
await loopCtx.step("handle-archive", async () => handleArchiveActivity(loopCtx, msg)); await handleArchiveActivity(c, msg);
}, },
"task.command.kill": async (loopCtx, msg) => { "task.command.kill": async (c, msg) => {
await loopCtx.step("kill-destroy-sandbox", async () => killDestroySandboxActivity(loopCtx)); await killDestroySandboxActivity(c);
await loopCtx.step("kill-write-db", async () => killWriteDbActivity(loopCtx, msg)); await killWriteDbActivity(c, msg);
}, },
"task.command.get": async (loopCtx, msg) => { "task.command.get": async (c, msg) => {
await loopCtx.step("handle-get", async () => handleGetActivity(loopCtx, msg)); await handleGetActivity(c, msg);
}, },
"task.command.pull_request.sync": async (loopCtx, msg) => { "task.command.pull_request.sync": async (c, msg) => {
await loopCtx.step("task-pull-request-sync", async () => syncTaskPullRequest(loopCtx, msg.body?.pullRequest ?? null)); await syncTaskPullRequest(c, msg.body?.pullRequest ?? null);
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.mark_unread": async (loopCtx, msg) => { "task.command.workspace.mark_unread": async (c, msg) => {
await loopCtx.step("workspace-mark-unread", async () => markWorkspaceUnread(loopCtx, msg.body?.authSessionId)); await markWorkspaceUnread(c, msg.body?.authSessionId);
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.rename_task": async (loopCtx, msg) => { "task.command.workspace.rename_task": async (c, msg) => {
await loopCtx.step("workspace-rename-task", async () => renameWorkspaceTask(loopCtx, msg.body.value)); await renameWorkspaceTask(c, msg.body.value);
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.create_session": async (loopCtx, msg) => { "task.command.workspace.create_session": async (c, msg) => {
try { try {
const created = await loopCtx.step({ const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId);
name: "workspace-create-session",
timeout: 5 * 60_000,
run: async () => createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId),
});
await msg.complete(created); await msg.complete(created);
} catch (error) { } catch (error) {
await msg.complete({ error: resolveErrorMessage(error) }); await msg.complete({ error: resolveErrorMessage(error) });
} }
}, },
"task.command.workspace.create_session_and_send": async (loopCtx, msg) => { "task.command.workspace.create_session_and_send": async (c, msg) => {
try { try {
const created = await loopCtx.step({ const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId);
name: "workspace-create-session-for-send", await sendWorkspaceMessage(c, created.sessionId, msg.body.text, [], msg.body?.authSessionId);
timeout: 5 * 60_000,
run: async () => createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId),
});
await loopCtx.step({
name: "workspace-send-initial-message",
timeout: 5 * 60_000,
run: async () => sendWorkspaceMessage(loopCtx, created.sessionId, msg.body.text, [], msg.body?.authSessionId),
});
} catch (error) { } catch (error) {
logActorWarning("task.workflow", "create_session_and_send failed", { logActorWarning("task.workflow", "create_session_and_send failed", {
error: resolveErrorMessage(error), error: resolveErrorMessage(error),
@ -162,135 +136,102 @@ const commandHandlers: Record<TaskQueueName, WorkflowHandler> = {
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.ensure_session": async (loopCtx, msg) => { "task.command.workspace.ensure_session": async (c, msg) => {
await loopCtx.step({ await ensureWorkspaceSession(c, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId);
name: "workspace-ensure-session",
timeout: 5 * 60_000,
run: async () => ensureWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.rename_session": async (loopCtx, msg) => { "task.command.workspace.rename_session": async (c, msg) => {
await loopCtx.step("workspace-rename-session", async () => renameWorkspaceSession(loopCtx, msg.body.sessionId, msg.body.title)); await renameWorkspaceSession(c, msg.body.sessionId, msg.body.title);
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.select_session": async (loopCtx, msg) => { "task.command.workspace.select_session": async (c, msg) => {
await loopCtx.step("workspace-select-session", async () => selectWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId)); await selectWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId);
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.set_session_unread": async (loopCtx, msg) => { "task.command.workspace.set_session_unread": async (c, msg) => {
await loopCtx.step("workspace-set-session-unread", async () => setWorkspaceSessionUnread(loopCtx, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId)); await setWorkspaceSessionUnread(c, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId);
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.update_draft": async (loopCtx, msg) => { "task.command.workspace.update_draft": async (c, msg) => {
await loopCtx.step("workspace-update-draft", async () => updateWorkspaceDraft(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId)); await updateWorkspaceDraft(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.change_model": async (loopCtx, msg) => { "task.command.workspace.change_model": async (c, msg) => {
await loopCtx.step("workspace-change-model", async () => changeWorkspaceModel(loopCtx, msg.body.sessionId, msg.body.model, msg.body?.authSessionId)); await changeWorkspaceModel(c, msg.body.sessionId, msg.body.model, msg.body?.authSessionId);
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.send_message": async (loopCtx, msg) => { "task.command.workspace.send_message": async (c, msg) => {
try { try {
await loopCtx.step({ await sendWorkspaceMessage(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
name: "workspace-send-message",
timeout: 10 * 60_000,
run: async () => sendWorkspaceMessage(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
} catch (error) { } catch (error) {
await msg.complete({ error: resolveErrorMessage(error) }); await msg.complete({ error: resolveErrorMessage(error) });
} }
}, },
"task.command.workspace.stop_session": async (loopCtx, msg) => { "task.command.workspace.stop_session": async (c, msg) => {
await loopCtx.step({ await stopWorkspaceSession(c, msg.body.sessionId);
name: "workspace-stop-session",
timeout: 5 * 60_000,
run: async () => stopWorkspaceSession(loopCtx, msg.body.sessionId),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.sync_session_status": async (loopCtx, msg) => { "task.command.workspace.sync_session_status": async (c, msg) => {
await loopCtx.step("workspace-sync-session-status", async () => syncWorkspaceSessionStatus(loopCtx, msg.body.sessionId, msg.body.status, msg.body.at)); await syncWorkspaceSessionStatus(c, msg.body.sessionId, msg.body.status, msg.body.at);
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.refresh_derived": async (loopCtx, msg) => { "task.command.workspace.refresh_derived": async (c, msg) => {
await loopCtx.step({ await refreshWorkspaceDerivedState(c);
name: "workspace-refresh-derived",
timeout: 5 * 60_000,
run: async () => refreshWorkspaceDerivedState(loopCtx),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.refresh_session_transcript": async (loopCtx, msg) => { "task.command.workspace.refresh_session_transcript": async (c, msg) => {
await loopCtx.step({ await refreshWorkspaceSessionTranscript(c, msg.body.sessionId);
name: "workspace-refresh-session-transcript",
timeout: 60_000,
run: async () => refreshWorkspaceSessionTranscript(loopCtx, msg.body.sessionId),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.close_session": async (loopCtx, msg) => { "task.command.workspace.close_session": async (c, msg) => {
await loopCtx.step({ await closeWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId);
name: "workspace-close-session",
timeout: 5 * 60_000,
run: async () => closeWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.publish_pr": async (loopCtx, msg) => { "task.command.workspace.publish_pr": async (c, msg) => {
await loopCtx.step({ await publishWorkspacePr(c);
name: "workspace-publish-pr",
timeout: 10 * 60_000,
run: async () => publishWorkspacePr(loopCtx),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
"task.command.workspace.revert_file": async (loopCtx, msg) => { "task.command.workspace.revert_file": async (c, msg) => {
await loopCtx.step({ await revertWorkspaceFile(c, msg.body.path);
name: "workspace-revert-file",
timeout: 5 * 60_000,
run: async () => revertWorkspaceFile(loopCtx, msg.body.path),
});
await msg.complete({ ok: true }); await msg.complete({ ok: true });
}, },
}; };
export async function runTaskWorkflow(ctx: any): Promise<void> { /**
await ctx.loop("task-command-loop", async (loopCtx: any) => { * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()`
const msg = await loopCtx.queue.next("next-command", { * with completable messages.
names: [...TASK_QUEUE_NAMES], */
completable: true, export async function runTaskCommandLoop(c: any): Promise<void> {
}); for await (const msg of c.queue.iter({ names: [...TASK_QUEUE_NAMES], completable: true })) {
if (!msg) {
return Loop.continue(undefined);
}
const handler = commandHandlers[msg.name as TaskQueueName]; const handler = commandHandlers[msg.name as TaskQueueName];
if (handler) { if (handler) {
try { try {
await handler(loopCtx, msg); await handler(c, msg);
} catch (error) { } catch (error) {
const message = resolveErrorMessage(error); const message = resolveErrorMessage(error);
logActorWarning("task.workflow", "task workflow command failed", { logActorWarning("task.workflow", "task command failed", {
queueName: msg.name, queueName: msg.name,
error: message, error: message,
}); });
await msg.complete({ error: message }).catch(() => {}); await msg.complete({ error: message }).catch(() => {});
} }
} else {
logActorWarning("task.workflow", "unknown queue message", { queueName: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` });
} }
return Loop.continue(undefined); }
});
} }

View file

@ -1,9 +1,8 @@
import { actor, queue } from "rivetkit"; import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import { userDb } from "./db/db.js"; import { userDb } from "./db/db.js";
import { betterAuthActions } from "./actions/better-auth.js"; import { betterAuthActions } from "./actions/better-auth.js";
import { userActions } from "./actions/user.js"; import { userActions } from "./actions/user.js";
import { USER_QUEUE_NAMES, runUserWorkflow } from "./workflow.js"; import { USER_QUEUE_NAMES, runUserCommandLoop } from "./workflow.js";
export const user = actor({ export const user = actor({
db: userDb, db: userDb,
@ -20,5 +19,5 @@ export const user = actor({
...betterAuthActions, ...betterAuthActions,
...userActions, ...userActions,
}, },
run: workflow(runUserWorkflow), run: runUserCommandLoop,
}); });

View file

@ -1,5 +1,4 @@
import { eq, count as sqlCount, and } from "drizzle-orm"; import { eq, count as sqlCount, and } from "drizzle-orm";
import { Loop } from "rivetkit/workflow";
import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared"; import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared";
import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js"; import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js";
@ -26,8 +25,15 @@ export function userWorkflowQueueName(name: UserQueueName): UserQueueName {
async function createAuthRecordMutation(c: any, input: { model: string; data: Record<string, unknown> }) { async function createAuthRecordMutation(c: any, input: { model: string; data: Record<string, unknown> }) {
const table = tableFor(input.model); const table = tableFor(input.model);
const persisted = persistInput(input.model, input.data); const persisted = persistInput(input.model, input.data);
await c.db.insert(table).values(persisted as any).run(); await c.db
const row = await c.db.select().from(table).where(eq(columnFor(input.model, table, "id"), input.data.id as any)).get(); .insert(table)
.values(persisted as any)
.run();
const row = await c.db
.select()
.from(table)
.where(eq(columnFor(input.model, table, "id"), input.data.id as any))
.get();
return materializeRow(input.model, row); return materializeRow(input.model, row);
} }
@ -37,7 +43,11 @@ async function updateAuthRecordMutation(c: any, input: { model: string; where: a
if (!predicate) { if (!predicate) {
throw new Error("updateAuthRecord requires a where clause"); throw new Error("updateAuthRecord requires a where clause");
} }
await c.db.update(table).set(persistPatch(input.model, input.update) as any).where(predicate).run(); await c.db
.update(table)
.set(persistPatch(input.model, input.update) as any)
.where(predicate)
.run();
return materializeRow(input.model, await c.db.select().from(table).where(predicate).get()); return materializeRow(input.model, await c.db.select().from(table).where(predicate).get());
} }
@ -47,7 +57,11 @@ async function updateManyAuthRecordsMutation(c: any, input: { model: string; whe
if (!predicate) { if (!predicate) {
throw new Error("updateManyAuthRecords requires a where clause"); throw new Error("updateManyAuthRecords requires a where clause");
} }
await c.db.update(table).set(persistPatch(input.model, input.update) as any).where(predicate).run(); await c.db
.update(table)
.set(persistPatch(input.model, input.update) as any)
.where(predicate)
.run();
const row = await c.db.select({ value: sqlCount() }).from(table).where(predicate).get(); const row = await c.db.select({ value: sqlCount() }).from(table).where(predicate).get();
return row?.value ?? 0; return row?.value ?? 0;
} }
@ -222,60 +236,46 @@ async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionI
await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run(); await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run();
} }
export async function runUserWorkflow(ctx: any): Promise<void> { const COMMAND_HANDLERS: Record<string, (c: any, body: any) => Promise<any>> = {
await ctx.loop("user-command-loop", async (loopCtx: any) => { "user.command.auth.create": (c, body) => createAuthRecordMutation(c, body),
const msg = await loopCtx.queue.next("next-user-command", { "user.command.auth.update": (c, body) => updateAuthRecordMutation(c, body),
names: [...USER_QUEUE_NAMES], "user.command.auth.update_many": (c, body) => updateManyAuthRecordsMutation(c, body),
completable: true, "user.command.auth.delete": async (c, body) => {
}); await deleteAuthRecordMutation(c, body);
if (!msg) { return { ok: true };
return Loop.continue(undefined); },
} "user.command.auth.delete_many": (c, body) => deleteManyAuthRecordsMutation(c, body),
"user.command.profile.upsert": (c, body) => upsertUserProfileMutation(c, body),
"user.command.session_state.upsert": (c, body) => upsertSessionStateMutation(c, body),
"user.command.task_state.upsert": (c, body) => upsertTaskStateMutation(c, body),
"user.command.task_state.delete": async (c, body) => {
await deleteTaskStateMutation(c, body);
return { ok: true };
},
};
/**
* Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()`
* with completable messages.
*/
export async function runUserCommandLoop(c: any): Promise<void> {
for await (const msg of c.queue.iter({ names: [...USER_QUEUE_NAMES], completable: true })) {
try { try {
let result: unknown; const handler = COMMAND_HANDLERS[msg.name];
switch (msg.name) { if (handler) {
case "user.command.auth.create": const result = await handler(c, msg.body);
result = await loopCtx.step({ name: "user-auth-create", timeout: 60_000, run: async () => createAuthRecordMutation(loopCtx, msg.body) }); await msg.complete(result);
break; } else {
case "user.command.auth.update": logActorWarning("user", "unknown queue message", { queueName: msg.name });
result = await loopCtx.step({ name: "user-auth-update", timeout: 60_000, run: async () => updateAuthRecordMutation(loopCtx, msg.body) }); await msg.complete({ error: `Unknown command: ${msg.name}` });
break;
case "user.command.auth.update_many":
result = await loopCtx.step({ name: "user-auth-update-many", timeout: 60_000, run: async () => updateManyAuthRecordsMutation(loopCtx, msg.body) });
break;
case "user.command.auth.delete":
result = await loopCtx.step({ name: "user-auth-delete", timeout: 60_000, run: async () => deleteAuthRecordMutation(loopCtx, msg.body) });
break;
case "user.command.auth.delete_many":
result = await loopCtx.step({ name: "user-auth-delete-many", timeout: 60_000, run: async () => deleteManyAuthRecordsMutation(loopCtx, msg.body) });
break;
case "user.command.profile.upsert":
result = await loopCtx.step({ name: "user-profile-upsert", timeout: 60_000, run: async () => upsertUserProfileMutation(loopCtx, msg.body) });
break;
case "user.command.session_state.upsert":
result = await loopCtx.step({ name: "user-session-state-upsert", timeout: 60_000, run: async () => upsertSessionStateMutation(loopCtx, msg.body) });
break;
case "user.command.task_state.upsert":
result = await loopCtx.step({ name: "user-task-state-upsert", timeout: 60_000, run: async () => upsertTaskStateMutation(loopCtx, msg.body) });
break;
case "user.command.task_state.delete":
result = await loopCtx.step({ name: "user-task-state-delete", timeout: 60_000, run: async () => deleteTaskStateMutation(loopCtx, msg.body) });
break;
default:
return Loop.continue(undefined);
} }
await msg.complete(result);
} catch (error) { } catch (error) {
const message = resolveErrorMessage(error); const message = resolveErrorMessage(error);
logActorWarning("user", "user workflow command failed", { logActorWarning("user", "user command failed", {
queueName: msg.name, queueName: msg.name,
error: message, error: message,
}); });
await msg.complete({ error: message }).catch(() => {}); await msg.complete({ error: message }).catch(() => {});
} }
}
return Loop.continue(undefined);
});
} }

View file

@ -183,6 +183,7 @@ export interface BackendClientOptions {
endpoint: string; endpoint: string;
defaultOrganizationId?: string; defaultOrganizationId?: string;
mode?: "remote" | "mock"; mode?: "remote" | "mock";
encoding?: "json" | "cbor" | "bare";
} }
export interface BackendClient { export interface BackendClient {
@ -413,7 +414,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien
const endpoints = deriveBackendEndpoints(options.endpoint); const endpoints = deriveBackendEndpoints(options.endpoint);
const rivetApiEndpoint = endpoints.rivetEndpoint; const rivetApiEndpoint = endpoints.rivetEndpoint;
const appApiEndpoint = endpoints.appEndpoint; const appApiEndpoint = endpoints.appEndpoint;
const client = createClient({ endpoint: rivetApiEndpoint }) as unknown as RivetClient; const client = createClient({ endpoint: rivetApiEndpoint, encoding: options.encoding }) as unknown as RivetClient;
const workspaceSubscriptions = new Map< const workspaceSubscriptions = new Map<
string, string,
{ {
@ -514,10 +515,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien
const sandboxes = detail.sandboxes as Array<(typeof detail.sandboxes)[number] & { sandboxActorId?: string }>; const sandboxes = detail.sandboxes as Array<(typeof detail.sandboxes)[number] & { sandboxActorId?: string }>;
const sandbox = sandboxes.find( const sandbox = sandboxes.find(
(sb) => (sb) =>
sb.sandboxId === sandboxId && sb.sandboxId === sandboxId && sb.sandboxProviderId === sandboxProviderId && typeof sb.sandboxActorId === "string" && sb.sandboxActorId.length > 0,
sb.sandboxProviderId === sandboxProviderId &&
typeof sb.sandboxActorId === "string" &&
sb.sandboxActorId.length > 0,
); );
if (sandbox?.sandboxActorId) { if (sandbox?.sandboxActorId) {
return (client as any).taskSandbox.getForId(sandbox.sandboxActorId); return (client as any).taskSandbox.getForId(sandbox.sandboxActorId);
@ -582,12 +580,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien
return (await task(organizationId, repoId, taskIdValue)).getTaskDetail(await getAuthSessionInput()); return (await task(organizationId, repoId, taskIdValue)).getTaskDetail(await getAuthSessionInput());
}; };
const getSessionDetailWithAuth = async ( const getSessionDetailWithAuth = async (organizationId: string, repoId: string, taskIdValue: string, sessionId: string): Promise<WorkspaceSessionDetail> => {
organizationId: string,
repoId: string,
taskIdValue: string,
sessionId: string,
): Promise<WorkspaceSessionDetail> => {
return (await task(organizationId, repoId, taskIdValue)).getSessionDetail(await withAuthSessionInput({ sessionId })); return (await task(organizationId, repoId, taskIdValue)).getSessionDetail(await withAuthSessionInput({ sessionId }));
}; };
@ -596,67 +589,67 @@ export function createBackendClient(options: BackendClientOptions): BackendClien
const summary = await (await organization(organizationId)).getOrganizationSummary({ organizationId }); const summary = await (await organization(organizationId)).getOrganizationSummary({ organizationId });
const resolvedTasks = await Promise.all( const resolvedTasks = await Promise.all(
summary.taskSummaries.map(async (taskSummary) => { summary.taskSummaries.map(async (taskSummary) => {
let detail; let detail;
try { try {
const taskHandle = await task(organizationId, taskSummary.repoId, taskSummary.id); const taskHandle = await task(organizationId, taskSummary.repoId, taskSummary.id);
detail = await taskHandle.getTaskDetail(authSessionInput); detail = await taskHandle.getTaskDetail(authSessionInput);
} catch (error) { } catch (error) {
if (isActorNotFoundError(error)) { if (isActorNotFoundError(error)) {
return null; return null;
}
throw error;
} }
const sessionDetails = await Promise.all( throw error;
detail.sessionsSummary.map(async (session) => { }
try { const sessionDetails = await Promise.all(
const full = await (await task(organizationId, detail.repoId, detail.id)).getSessionDetail({ detail.sessionsSummary.map(async (session) => {
sessionId: session.id, try {
...(authSessionInput ?? {}), const full = await (await task(organizationId, detail.repoId, detail.id)).getSessionDetail({
}); sessionId: session.id,
return [session.id, full] as const; ...(authSessionInput ?? {}),
} catch (error) { });
if (isActorNotFoundError(error)) { return [session.id, full] as const;
return null; } catch (error) {
} if (isActorNotFoundError(error)) {
throw error; return null;
} }
}), throw error;
); }
const sessionDetailsById = new Map(sessionDetails.filter((entry): entry is readonly [string, WorkspaceSessionDetail] => entry !== null)); }),
return { );
id: detail.id, const sessionDetailsById = new Map(sessionDetails.filter((entry): entry is readonly [string, WorkspaceSessionDetail] => entry !== null));
repoId: detail.repoId, return {
title: detail.title, id: detail.id,
status: detail.status, repoId: detail.repoId,
repoName: detail.repoName, title: detail.title,
updatedAtMs: detail.updatedAtMs, status: detail.status,
branch: detail.branch, repoName: detail.repoName,
pullRequest: detail.pullRequest, updatedAtMs: detail.updatedAtMs,
activeSessionId: detail.activeSessionId ?? null, branch: detail.branch,
sessions: detail.sessionsSummary.map((session) => { pullRequest: detail.pullRequest,
const full = sessionDetailsById.get(session.id); activeSessionId: detail.activeSessionId ?? null,
return { sessions: detail.sessionsSummary.map((session) => {
id: session.id, const full = sessionDetailsById.get(session.id);
sessionId: session.sessionId, return {
sessionName: session.sessionName, id: session.id,
agent: session.agent, sessionId: session.sessionId,
model: session.model, sessionName: session.sessionName,
status: session.status, agent: session.agent,
thinkingSinceMs: session.thinkingSinceMs, model: session.model,
unread: session.unread, status: session.status,
created: session.created, thinkingSinceMs: session.thinkingSinceMs,
draft: full?.draft ?? { text: "", attachments: [], updatedAtMs: null }, unread: session.unread,
transcript: full?.transcript ?? [], created: session.created,
}; draft: full?.draft ?? { text: "", attachments: [], updatedAtMs: null },
}), transcript: full?.transcript ?? [],
fileChanges: detail.fileChanges, };
diffs: detail.diffs, }),
fileTree: detail.fileTree, fileChanges: detail.fileChanges,
minutesUsed: detail.minutesUsed, diffs: detail.diffs,
activeSandboxId: detail.activeSandboxId ?? null, fileTree: detail.fileTree,
}; minutesUsed: detail.minutesUsed,
}), activeSandboxId: detail.activeSandboxId ?? null,
); };
}),
);
const tasks = resolvedTasks.filter((task): task is Exclude<(typeof resolvedTasks)[number], null> => task !== null); const tasks = resolvedTasks.filter((task): task is Exclude<(typeof resolvedTasks)[number], null> => task !== null);
const repositories = summary.repos const repositories = summary.repos
@ -1205,11 +1198,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien
return await withSandboxHandle(organizationId, sandboxProviderId, sandboxId, async (handle) => handle.sandboxAgentConnection()); return await withSandboxHandle(organizationId, sandboxProviderId, sandboxId, async (handle) => handle.sandboxAgentConnection());
}, },
async getSandboxWorkspaceModelGroups( async getSandboxWorkspaceModelGroups(organizationId: string, sandboxProviderId: SandboxProviderId, sandboxId: string): Promise<WorkspaceModelGroup[]> {
organizationId: string,
sandboxProviderId: SandboxProviderId,
sandboxId: string,
): Promise<WorkspaceModelGroup[]> {
return await withSandboxHandle(organizationId, sandboxProviderId, sandboxId, async (handle) => handle.listWorkspaceModelGroups()); return await withSandboxHandle(organizationId, sandboxProviderId, sandboxId, async (handle) => handle.listWorkspaceModelGroups());
}, },

View file

@ -207,15 +207,38 @@ function sessionStateMessage(tab: Task["sessions"][number] | null | undefined):
return null; return null;
} }
function groupRepositories(repos: Array<{ id: string; label: string }>, tasks: Task[]) { function groupRepositories(
repos: Array<{ id: string; label: string }>,
tasks: Task[],
openPullRequests?: Array<{
repoId: string;
repoFullName: string;
number: number;
title: string;
state: string;
url: string;
headRefName: string;
authorLogin: string | null;
isDraft: boolean;
}>,
) {
return repos return repos
.map((repo) => ({ .map((repo) => ({
id: repo.id, id: repo.id,
label: repo.label, label: repo.label,
updatedAtMs: tasks.filter((task) => task.repoId === repo.id).reduce((latest, task) => Math.max(latest, task.updatedAtMs), 0), updatedAtMs: tasks.filter((task) => task.repoId === repo.id).reduce((latest, task) => Math.max(latest, task.updatedAtMs), 0),
tasks: tasks.filter((task) => task.repoId === repo.id).sort((left, right) => right.updatedAtMs - left.updatedAtMs), tasks: tasks.filter((task) => task.repoId === repo.id).sort((left, right) => right.updatedAtMs - left.updatedAtMs),
pullRequests: (openPullRequests ?? []).filter((pr) => pr.repoId === repo.id),
})) }))
.filter((repo) => repo.tasks.length > 0); .sort((a, b) => {
// Repos with tasks first, then repos with PRs, then alphabetical
const aHasActivity = a.tasks.length > 0 || a.pullRequests.length > 0;
const bHasActivity = b.tasks.length > 0 || b.pullRequests.length > 0;
if (aHasActivity && !bHasActivity) return -1;
if (!aHasActivity && bHasActivity) return 1;
if (a.updatedAtMs !== b.updatedAtMs) return b.updatedAtMs - a.updatedAtMs;
return a.label.localeCompare(b.label);
});
} }
interface WorkspaceActions { interface WorkspaceActions {
@ -1378,7 +1401,8 @@ export function MockLayout({ organizationId, selectedTaskId, selectedSessionId }
); );
return hydratedTasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs); return hydratedTasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs);
}, [selectedTaskSummary, selectedSessionId, sessionState.data, taskState.data, taskSummaries, organizationId]); }, [selectedTaskSummary, selectedSessionId, sessionState.data, taskState.data, taskSummaries, organizationId]);
const rawRepositories = useMemo(() => groupRepositories(organizationRepos, tasks), [tasks, organizationRepos]); const openPullRequests = organizationState.data?.openPullRequests ?? [];
const rawRepositories = useMemo(() => groupRepositories(organizationRepos, tasks, openPullRequests), [tasks, organizationRepos, openPullRequests]);
const appSnapshot = useMockAppSnapshot(); const appSnapshot = useMockAppSnapshot();
const currentUser = activeMockUser(appSnapshot); const currentUser = activeMockUser(appSnapshot);
const activeOrg = activeMockOrganization(appSnapshot); const activeOrg = activeMockOrganization(appSnapshot);

View file

@ -506,6 +506,7 @@ export const Sidebar = memo(function Sidebar({
return ( return (
<div <div
key={item.key} key={item.key}
data-index={virtualItem.index}
data-repository-idx={repositoryIndex} data-repository-idx={repositoryIndex}
ref={(node) => { ref={(node) => {
if (node) { if (node) {
@ -663,6 +664,7 @@ export const Sidebar = memo(function Sidebar({
return ( return (
<div <div
key={item.key} key={item.key}
data-index={virtualItem.index}
data-task-idx={taskIndex} data-task-idx={taskIndex}
data-task-repository-id={repository.id} data-task-repository-id={repository.id}
ref={(node) => { ref={(node) => {
@ -775,6 +777,7 @@ export const Sidebar = memo(function Sidebar({
return ( return (
<div <div
key={item.key} key={item.key}
data-index={virtualItem.index}
data-task-idx={taskCount} data-task-idx={taskCount}
data-task-repository-id={repository.id} data-task-repository-id={repository.id}
ref={(node) => { ref={(node) => {
@ -812,6 +815,7 @@ export const Sidebar = memo(function Sidebar({
return ( return (
<div <div
key={item.key} key={item.key}
data-index={virtualItem.index}
data-repository-idx={item.repositoryCount} data-repository-idx={item.repositoryCount}
ref={(node) => { ref={(node) => {
if (node) { if (node) {

View file

@ -5,4 +5,5 @@ export const backendClient = createBackendClient({
endpoint: backendEndpoint, endpoint: backendEndpoint,
defaultOrganizationId, defaultOrganizationId,
mode: frontendClientMode, mode: frontendClientMode,
encoding: import.meta.env.DEV ? "json" : undefined,
}); });

View file

@ -174,12 +174,27 @@ export interface OrganizationGithubSummary {
totalRepositoryCount: number; totalRepositoryCount: number;
} }
export interface WorkspaceOpenPullRequest {
repoId: string;
repoFullName: string;
number: number;
title: string;
status: string;
state: string;
url: string;
headRefName: string;
baseRefName: string;
authorLogin: string | null;
isDraft: boolean;
}
/** Organization-level snapshot — initial fetch for the organization topic. */ /** Organization-level snapshot — initial fetch for the organization topic. */
export interface OrganizationSummarySnapshot { export interface OrganizationSummarySnapshot {
organizationId: string; organizationId: string;
github: OrganizationGithubSummary; github: OrganizationGithubSummary;
repos: WorkspaceRepositorySummary[]; repos: WorkspaceRepositorySummary[];
taskSummaries: WorkspaceTaskSummary[]; taskSummaries: WorkspaceTaskSummary[];
openPullRequests?: WorkspaceOpenPullRequest[];
} }
export interface WorkspaceSession extends WorkspaceSessionSummary { export interface WorkspaceSession extends WorkspaceSessionSummary {