From 29e5821fef8d910e8b486f0162799749177a9149 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 16 Mar 2026 02:00:31 -0700 Subject: [PATCH] wip: convert all actors from workflow to plain run handlers Workaround for RivetKit bug where c.queue.iter() never yields messages for actors created via getOrCreate from another actor's context. The queue accepts messages (visible in inspector) but the iterator hangs. Sleep/wake fixes it, but actors with active connections never sleep. Converted organization, github-data, task, and user actors from run: workflow(...) to plain run: async (c) => { for await ... }. Also fixes: - Missing auth tables in org migration (auth_verification etc) - default_model NOT NULL constraint on org profile upsert - Nested workflow step in github-data (HistoryDivergedError) - Removed --force from frontend Dockerfile pnpm install Co-Authored-By: Claude Opus 4.6 (1M context) --- foundry/CLAUDE.md | 5 +- foundry/packages/backend/CLAUDE.md | 1 - .../backend/src/actors/audit-log/index.ts | 5 +- .../backend/src/actors/audit-log/workflow.ts | 36 +- .../backend/src/actors/github-data/index.ts | 74 +-- .../src/actors/github-data/workflow.ts | 93 ++-- .../src/actors/organization/actions.ts | 180 ++----- .../organization/actions/task-mutations.ts | 7 +- .../src/actors/organization/actions/tasks.ts | 8 +- .../src/actors/organization/app-shell.ts | 11 +- .../src/actors/organization/db/schema.ts | 11 - .../backend/src/actors/organization/index.ts | 5 +- .../backend/src/actors/organization/queues.ts | 2 - .../src/actors/organization/workflow.ts | 468 +++++------------- .../packages/backend/src/actors/task/index.ts | 5 +- .../backend/src/actors/task/workflow/index.ts | 213 +++----- .../packages/backend/src/actors/user/index.ts | 5 +- .../backend/src/actors/user/workflow.ts | 102 ++-- foundry/packages/client/src/backend-client.ts | 139 +++--- .../frontend/src/components/mock-layout.tsx | 30 +- .../src/components/mock-layout/sidebar.tsx | 4 + foundry/packages/frontend/src/lib/backend.ts | 1 + foundry/packages/shared/src/workspace.ts | 15 + 23 files changed, 490 insertions(+), 930 deletions(-) diff --git a/foundry/CLAUDE.md b/foundry/CLAUDE.md index ab51fc6..3cb5ee4 100644 --- a/foundry/CLAUDE.md +++ b/foundry/CLAUDE.md @@ -56,6 +56,8 @@ Use `pnpm` workspaces and Turborepo. - mock frontend changes: `just foundry-mock` or restart with `just foundry-mock-down && just foundry-mock` - local frontend-only work outside Docker: restart `pnpm --filter @sandbox-agent/foundry-frontend dev` or `just foundry-dev-mock` as appropriate - The backend does **not** hot reload. Bun's `--hot` flag causes the server to re-bind on a different port (e.g. 6421 instead of 6420), breaking all client connections while the container still exposes the original port. After backend code changes, restart the backend container: `just foundry-dev-down && just foundry-dev`. +- The dev server has debug logging enabled by default (`RIVET_LOG_LEVEL=debug`, `FOUNDRY_LOG_LEVEL=debug`) via `compose.dev.yaml`. Error stacks and timestamps are also enabled. +- The frontend client uses JSON encoding for RivetKit in development (`import.meta.env.DEV`) for easier debugging. Production uses the default encoding. ## Railway Logs @@ -77,9 +79,10 @@ Use `pnpm` workspaces and Turborepo. - Keep frontend route/state coverage current in code and tests; there is no separate page-inventory doc to maintain. - If Foundry uses a shared component from `@sandbox-agent/react`, make changes in `sdks/react` instead of copying or forking that component into Foundry. - When changing shared React components in `sdks/react` for Foundry, verify they still work in the Sandbox Agent Inspector before finishing. -- When making UI changes, verify the live flow with `agent-browser`, take screenshots of the updated UI, and offer to open those screenshots in Preview when you finish. +- When making UI changes, verify the live flow with the Chrome DevTools MCP or `agent-browser`, take screenshots of the updated UI, and offer to open those screenshots in Preview when you finish. - When asked for screenshots, capture all relevant affected screens and modal states, not just a single viewport. Include empty, populated, success, and blocked/error states when they are part of the changed flow. - If a screenshot catches a transition frame, blank modal, or otherwise misleading state, retake it before reporting it. +- When verifying UI in the browser, attempt to sign in by navigating to `/signin` and clicking "Continue with GitHub". If the browser lands on the GitHub login page (github.com/login) and you don't have credentials, stop and ask the user to complete the sign-in. Do not assume the session is invalid just because you see the Foundry sign-in page — always attempt the OAuth flow first. ## Realtime Data Architecture diff --git a/foundry/packages/backend/CLAUDE.md b/foundry/packages/backend/CLAUDE.md index 7018642..2d980d4 100644 --- a/foundry/packages/backend/CLAUDE.md +++ b/foundry/packages/backend/CLAUDE.md @@ -29,7 +29,6 @@ Children push updates **up** to their direct coordinator only. Coordinators broa OrganizationActor (coordinator for tasks + auth users) │ │ Index tables: -│ ├─ repos → Repository catalog (GitHub sync) │ ├─ taskIndex → TaskActor index (taskId → repoId + branchName) │ ├─ taskSummaries → TaskActor materialized sidebar projection │ ├─ authSessionIndex → UserActor index (session token → userId) diff --git a/foundry/packages/backend/src/actors/audit-log/index.ts b/foundry/packages/backend/src/actors/audit-log/index.ts index ce0a654..317974d 100644 --- a/foundry/packages/backend/src/actors/audit-log/index.ts +++ b/foundry/packages/backend/src/actors/audit-log/index.ts @@ -1,11 +1,10 @@ // @ts-nocheck import { and, desc, eq } from "drizzle-orm"; import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import type { AuditLogEvent } from "@sandbox-agent/foundry-shared"; import { auditLogDb } from "./db/db.js"; import { events } from "./db/schema.js"; -import { AUDIT_LOG_QUEUE_NAMES, runAuditLogWorkflow } from "./workflow.js"; +import { AUDIT_LOG_QUEUE_NAMES, runAuditLogCommandLoop } from "./workflow.js"; export interface AuditLogInput { organizationId: string; @@ -82,5 +81,5 @@ export const auditLog = actor({ })); }, }, - run: workflow(runAuditLogWorkflow), + run: runAuditLogCommandLoop, }); diff --git a/foundry/packages/backend/src/actors/audit-log/workflow.ts b/foundry/packages/backend/src/actors/audit-log/workflow.ts index 3c2437a..6c48074 100644 --- a/foundry/packages/backend/src/actors/audit-log/workflow.ts +++ b/foundry/packages/backend/src/actors/audit-log/workflow.ts @@ -1,13 +1,13 @@ // @ts-nocheck -import { Loop } from "rivetkit/workflow"; +import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { events } from "./db/schema.js"; import type { AppendAuditLogCommand } from "./index.js"; export const AUDIT_LOG_QUEUE_NAMES = ["auditLog.command.append"] as const; -async function appendAuditLogRow(loopCtx: any, body: AppendAuditLogCommand): Promise { +async function appendAuditLogRow(c: any, body: AppendAuditLogCommand): Promise { const now = Date.now(); - await loopCtx.db + await c.db .insert(events) .values({ repoId: body.repoId ?? null, @@ -20,21 +20,19 @@ async function appendAuditLogRow(loopCtx: any, body: AppendAuditLogCommand): Pro .run(); } -export async function runAuditLogWorkflow(ctx: any): Promise { - await ctx.loop("audit-log-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-audit-log-command", { - names: [...AUDIT_LOG_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); +export async function runAuditLogCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...AUDIT_LOG_QUEUE_NAMES], completable: true })) { + try { + if (msg.name === "auditLog.command.append") { + await appendAuditLogRow(c, msg.body as AppendAuditLogCommand); + await msg.complete({ ok: true }); + continue; + } + await msg.complete({ error: `Unknown command: ${msg.name}` }); + } catch (error) { + const message = resolveErrorMessage(error); + logActorWarning("auditLog", "audit-log command failed", { queueName: msg.name, error: message }); + await msg.complete({ error: message }).catch(() => {}); } - - if (msg.name === "auditLog.command.append") { - await loopCtx.step("append-audit-log-row", async () => appendAuditLogRow(loopCtx, msg.body as AppendAuditLogCommand)); - await msg.complete({ ok: true }); - } - - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/backend/src/actors/github-data/index.ts b/foundry/packages/backend/src/actors/github-data/index.ts index d477c81..e18f02f 100644 --- a/foundry/packages/backend/src/actors/github-data/index.ts +++ b/foundry/packages/backend/src/actors/github-data/index.ts @@ -1,7 +1,6 @@ // @ts-nocheck -import { eq } from "drizzle-orm"; +import { eq, inArray } from "drizzle-orm"; import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import type { FoundryOrganization } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../context.js"; import { getOrCreateOrganization, getTask } from "../handles.js"; @@ -12,7 +11,7 @@ import { organizationWorkflowQueueName } from "../organization/queues.js"; import { taskWorkflowQueueName } from "../task/workflow/index.js"; import { githubDataDb } from "./db/db.js"; import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js"; -import { GITHUB_DATA_QUEUE_NAMES, runGithubDataWorkflow } from "./workflow.js"; +import { GITHUB_DATA_QUEUE_NAMES, runGithubDataCommandLoop } from "./workflow.js"; const META_ROW_ID = 1; const SYNC_REPOSITORY_BATCH_SIZE = 10; @@ -701,21 +700,6 @@ export async function fullSyncSetup(c: any, input: FullSyncInput = {}): Promise< await upsertRepositories(c, repositories, startedAt, syncGeneration); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "syncing", - lastSyncLabel: totalRepositoryCount > 0 ? `Imported ${totalRepositoryCount} repositories` : "No repositories available", - lastSyncAt: currentMeta.lastSyncAt, - syncGeneration, - syncPhase: totalRepositoryCount > 0 ? "syncing_branches" : null, - processedRepositoryCount: 0, - totalRepositoryCount, - repositories, - }); - return { syncGeneration, startedAt, @@ -784,7 +768,7 @@ export async function fullSyncMembers(c: any, config: FullSyncConfig): Promise { - const repos = readRepositoriesFromDb(c); + const repos = await readRepositoriesFromDb(c); const batches = chunkItems(repos, SYNC_REPOSITORY_BATCH_SIZE); if (batchIndex >= batches.length) return true; @@ -817,22 +801,6 @@ export async function fullSyncFinalize(c: any, config: FullSyncConfig): Promise< await sweepPullRequests(c, config.syncGeneration); await sweepRepositories(c, config.syncGeneration); - const repos = readRepositoriesFromDb(c); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { - connectedAccount: config.connectedAccount, - installationStatus: config.installationStatus, - installationId: config.installationId, - syncStatus: "synced", - lastSyncLabel: config.totalRepositoryCount > 0 ? `Synced ${config.totalRepositoryCount} repositories` : "No repositories available", - lastSyncAt: config.startedAt, - syncGeneration: config.syncGeneration, - syncPhase: null, - processedRepositoryCount: config.totalRepositoryCount, - totalRepositoryCount: config.totalRepositoryCount, - repositories: repos, - }); - await writeMeta(c, { connectedAccount: config.connectedAccount, installationStatus: config.installationStatus, @@ -908,7 +876,7 @@ export const githubData = actor({ createState: (_c, input: GithubDataInput) => ({ organizationId: input.organizationId, }), - run: workflow(runGithubDataWorkflow), + run: runGithubDataCommandLoop, actions: { async getSummary(c) { const repositories = await c.db.select().from(githubRepositories).all(); @@ -949,6 +917,15 @@ export const githubData = actor({ }; }, + async listOpenPullRequests(c) { + const rows = await c.db + .select() + .from(githubPullRequests) + .where(inArray(githubPullRequests.state, ["OPEN", "DRAFT"])) + .all(); + return rows.map((row) => pullRequestSummaryFromRow(row)); + }, + async listBranchesForRepository(c, input: { repoId: string }) { const rows = await c.db.select().from(githubBranches).where(eq(githubBranches.repoId, input.repoId)).all(); return rows @@ -1015,11 +992,6 @@ export async function reloadRepositoryMutation(c: any, input: { repoId: string } updatedAt, ); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", { - repoId: input.repoId, - remoteUrl: repository.cloneUrl, - }); return { repoId: input.repoId, fullName: repository.fullName, @@ -1049,20 +1021,6 @@ export async function clearStateMutation(c: any, input: ClearStateInput) { totalRepositoryCount: 0, }); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { - connectedAccount: input.connectedAccount, - installationStatus: input.installationStatus, - installationId: input.installationId, - syncStatus: "pending", - lastSyncLabel: input.label, - lastSyncAt: null, - syncGeneration: currentMeta.syncGeneration, - syncPhase: null, - processedRepositoryCount: 0, - totalRepositoryCount: 0, - repositories: [], - }); await emitPullRequestChangeEvents(c, beforeRows, []); } @@ -1150,12 +1108,6 @@ export async function handlePullRequestWebhookMutation(c: any, input: PullReques totalRepositoryCount: 0, }); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", { - repoId, - remoteUrl: input.repository.cloneUrl, - }); - const afterRows = await readAllPullRequestRows(c); await emitPullRequestChangeEvents(c, beforeRows, afterRows); if (state === "CLOSED" || state === "MERGED") { diff --git a/foundry/packages/backend/src/actors/github-data/workflow.ts b/foundry/packages/backend/src/actors/github-data/workflow.ts index 5e7c2cb..3497381 100644 --- a/foundry/packages/backend/src/actors/github-data/workflow.ts +++ b/foundry/packages/backend/src/actors/github-data/workflow.ts @@ -1,6 +1,11 @@ // @ts-nocheck -import { Loop } from "rivetkit/workflow"; -import { clearStateMutation, handlePullRequestWebhookMutation, reloadRepositoryMutation, runFullSync, fullSyncError } from "./index.js"; +import { logActorWarning, resolveErrorMessage } from "../logging.js"; + +// Dynamic imports to break circular dependency: index.ts imports workflow.ts, +// and workflow.ts needs functions from index.ts. +async function getIndexModule() { + return await import("./index.js"); +} export const GITHUB_DATA_QUEUE_NAMES = [ "githubData.command.syncRepos", @@ -15,78 +20,62 @@ export function githubDataWorkflowQueueName(name: GithubDataQueueName): GithubDa return name; } -export async function runGithubDataWorkflow(ctx: any): Promise { - // The org actor sends a "githubData.command.syncRepos" queue message when it - // creates this actor, so the command loop below handles the initial sync. - // - // IMPORTANT: Do NOT use workflow sub-loops (ctx.loop) inside command handlers. - // RivetKit workflow sub-loops inside a parent loop cause HistoryDivergedError - // on the second iteration because entries from the first iteration's sub-loop - // are still in history but not visited during replay of iteration 2. Use native - // JS loops inside a single step instead. See .context/rivetkit-subloop-bug.md. - - await ctx.loop("github-data-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-github-data-command", { - names: [...GITHUB_DATA_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); - } - +/** + * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` + * with completable messages. This avoids the RivetKit bug where actors created + * from another actor's workflow context never start their `run: workflow(...)`. + */ +export async function runGithubDataCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...GITHUB_DATA_QUEUE_NAMES], completable: true })) { try { if (msg.name === "githubData.command.syncRepos") { try { - // Single opaque step for the entire sync. Do NOT decompose into - // sub-loops/sub-steps — see comment at top of function. - await loopCtx.step({ - name: "github-data-sync-repos", - timeout: 5 * 60_000, - run: async () => runFullSync(loopCtx, msg.body), - }); + const { runFullSync } = await getIndexModule(); + await runFullSync(c, msg.body); await msg.complete({ ok: true }); } catch (error) { - await loopCtx.step("sync-repos-error", async () => fullSyncError(loopCtx, error)); + const { fullSyncError } = await getIndexModule(); + try { + await fullSyncError(c, error); + } catch { + /* best effort */ + } const message = error instanceof Error ? error.message : String(error); await msg.complete({ error: message }).catch(() => {}); } - return Loop.continue(undefined); + continue; } if (msg.name === "githubData.command.reloadRepository") { - const result = await loopCtx.step({ - name: "github-data-reload-repository", - timeout: 5 * 60_000, - run: async () => reloadRepositoryMutation(loopCtx, msg.body), - }); + const { reloadRepositoryMutation } = await getIndexModule(); + const result = await reloadRepositoryMutation(c, msg.body); await msg.complete(result); - return Loop.continue(undefined); + continue; } if (msg.name === "githubData.command.clearState") { - await loopCtx.step({ - name: "github-data-clear-state", - timeout: 60_000, - run: async () => clearStateMutation(loopCtx, msg.body), - }); + const { clearStateMutation } = await getIndexModule(); + await clearStateMutation(c, msg.body); await msg.complete({ ok: true }); - return Loop.continue(undefined); + continue; } if (msg.name === "githubData.command.handlePullRequestWebhook") { - await loopCtx.step({ - name: "github-data-handle-pull-request-webhook", - timeout: 60_000, - run: async () => handlePullRequestWebhookMutation(loopCtx, msg.body), - }); + const { handlePullRequestWebhookMutation } = await getIndexModule(); + await handlePullRequestWebhookMutation(c, msg.body); await msg.complete({ ok: true }); - return Loop.continue(undefined); + continue; } + + logActorWarning("githubData", "unknown queue message", { queueName: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }); } catch (error) { - const message = error instanceof Error ? error.message : String(error); + const message = resolveErrorMessage(error); + logActorWarning("githubData", "github-data command failed", { + queueName: msg.name, + error: message, + }); await msg.complete({ error: message }).catch(() => {}); } - - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/backend/src/actors/organization/actions.ts b/foundry/packages/backend/src/actors/organization/actions.ts index 860fdc9..436765c 100644 --- a/foundry/packages/backend/src/actors/organization/actions.ts +++ b/foundry/packages/backend/src/actors/organization/actions.ts @@ -10,8 +10,8 @@ import type { OrganizationUseInput, } from "@sandbox-agent/foundry-shared"; import { logActorWarning, resolveErrorMessage } from "../logging.js"; -import { repoIdFromRemote } from "../../services/repo.js"; -import { organizationProfile, repos, taskSummaries } from "./db/schema.js"; +import { getOrCreateGithubData } from "../handles.js"; +import { organizationProfile, taskSummaries } from "./db/schema.js"; import { organizationAppActions } from "./actions/app.js"; import { organizationBetterAuthActions } from "./actions/better-auth.js"; import { organizationOnboardingActions } from "./actions/onboarding.js"; @@ -45,18 +45,6 @@ function repoLabelFromRemote(remoteUrl: string): string { return remoteUrl; } -function buildRepoSummary(repoRow: { repoId: string; remoteUrl: string; updatedAt: number }, taskRows: WorkspaceTaskSummary[]): WorkspaceRepositorySummary { - const repoTasks = taskRows.filter((task) => task.repoId === repoRow.repoId); - const latestActivityMs = repoTasks.reduce((latest, task) => Math.max(latest, task.updatedAtMs), repoRow.updatedAt); - - return { - id: repoRow.repoId, - label: repoLabelFromRemote(repoRow.remoteUrl), - taskCount: repoTasks.length, - latestActivityMs, - }; -} - function buildGithubSummary(profile: any, importedRepoCount: number): OrganizationGithubSummary { return { connectedAccount: profile?.githubConnectedAccount ?? "", @@ -81,18 +69,19 @@ function buildGithubSummary(profile: any, importedRepoCount: number): Organizati */ async function getOrganizationSummarySnapshot(c: any): Promise { const profile = await c.db.select().from(organizationProfile).where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID)).get(); - const repoRows = await c.db - .select({ - repoId: repos.repoId, - remoteUrl: repos.remoteUrl, - updatedAt: repos.updatedAt, - }) - .from(repos) - .orderBy(desc(repos.updatedAt)) - .all(); + + // Fetch repos + open PRs from github-data actor (single actor, not fan-out) + let repoRows: Array<{ repoId: string; fullName: string; cloneUrl: string; private: boolean; defaultBranch: string }> = []; + let openPullRequests: any[] = []; + try { + const githubData = await getOrCreateGithubData(c, c.state.organizationId); + [repoRows, openPullRequests] = await Promise.all([githubData.listRepositories({}), githubData.listOpenPullRequests({})]); + } catch { + // github-data actor may not exist yet + } const summaryRows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all(); - const summaries: WorkspaceTaskSummary[] = summaryRows.map((row) => ({ + const summaries = summaryRows.map((row) => ({ id: row.taskId, repoId: row.repoId, title: row.title, @@ -123,8 +112,20 @@ async function getOrganizationSummarySnapshot(c: any): Promise buildRepoSummary(row, summaries)).sort((left, right) => right.latestActivityMs - left.latestActivityMs), + repos: repoRows + .map((repo) => { + const repoTasks = summaries.filter((t) => t.repoId === repo.repoId); + const latestTaskMs = repoTasks.reduce((latest, t) => Math.max(latest, t.updatedAtMs), 0); + return { + id: repo.repoId, + label: repoLabelFromRemote(repo.cloneUrl), + taskCount: repoTasks.length, + latestActivityMs: latestTaskMs || Date.now(), + }; + }) + .sort((a, b) => b.latestActivityMs - a.latestActivityMs), taskSummaries: summaries, + openPullRequests, }; } @@ -149,25 +150,19 @@ export const organizationActions = { async listRepos(c: any, input: OrganizationUseInput): Promise { assertOrganization(c, input.organizationId); - - const rows = await c.db - .select({ - repoId: repos.repoId, - remoteUrl: repos.remoteUrl, - createdAt: repos.createdAt, - updatedAt: repos.updatedAt, - }) - .from(repos) - .orderBy(desc(repos.updatedAt)) - .all(); - - return rows.map((row) => ({ - organizationId: c.state.organizationId, - repoId: row.repoId, - remoteUrl: row.remoteUrl, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - })); + try { + const githubData = await getOrCreateGithubData(c, c.state.organizationId); + const rows = await githubData.listRepositories({}); + return rows.map((row: any) => ({ + organizationId: c.state.organizationId, + repoId: row.repoId, + remoteUrl: row.cloneUrl, + createdAt: row.updatedAt ?? Date.now(), + updatedAt: row.updatedAt ?? Date.now(), + })); + } catch { + return []; + } }, async getOrganizationSummary(c: any, input: OrganizationUseInput): Promise { @@ -176,103 +171,6 @@ export const organizationActions = { }, }; -export async function applyGithubRepositoryProjectionMutation(c: any, input: { repoId: string; remoteUrl: string }): Promise { - const now = Date.now(); - await c.db - .insert(repos) - .values({ - repoId: input.repoId, - remoteUrl: input.remoteUrl, - createdAt: now, - updatedAt: now, - }) - .onConflictDoUpdate({ - target: repos.repoId, - set: { - remoteUrl: input.remoteUrl, - updatedAt: now, - }, - }) - .run(); - await refreshOrganizationSnapshotMutation(c); -} - -export async function applyGithubDataProjectionMutation( - c: any, - input: { - connectedAccount: string; - installationStatus: string; - installationId: number | null; - syncStatus: string; - lastSyncLabel: string; - lastSyncAt: number | null; - syncGeneration: number; - syncPhase: string | null; - processedRepositoryCount: number; - totalRepositoryCount: number; - repositories: Array<{ fullName: string; cloneUrl: string; private: boolean }>; - }, -): Promise { - const existingRepos = await c.db.select({ repoId: repos.repoId }).from(repos).all(); - const nextRepoIds = new Set(); - const now = Date.now(); - - const profile = await c.db - .select({ id: organizationProfile.id }) - .from(organizationProfile) - .where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID)) - .get(); - if (profile) { - await c.db - .update(organizationProfile) - .set({ - githubConnectedAccount: input.connectedAccount, - githubInstallationStatus: input.installationStatus, - githubSyncStatus: input.syncStatus, - githubInstallationId: input.installationId, - githubLastSyncLabel: input.lastSyncLabel, - githubLastSyncAt: input.lastSyncAt, - githubSyncGeneration: input.syncGeneration, - githubSyncPhase: input.syncPhase, - githubProcessedRepositoryCount: input.processedRepositoryCount, - githubTotalRepositoryCount: input.totalRepositoryCount, - updatedAt: now, - }) - .where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID)) - .run(); - } - - for (const repository of input.repositories) { - const repoId = repoIdFromRemote(repository.cloneUrl); - nextRepoIds.add(repoId); - await c.db - .insert(repos) - .values({ - repoId, - remoteUrl: repository.cloneUrl, - createdAt: now, - updatedAt: now, - }) - .onConflictDoUpdate({ - target: repos.repoId, - set: { - remoteUrl: repository.cloneUrl, - updatedAt: now, - }, - }) - .run(); - } - - for (const repo of existingRepos) { - if (nextRepoIds.has(repo.repoId)) { - continue; - } - await c.db.delete(repos).where(eq(repos.repoId, repo.repoId)).run(); - } - - await refreshOrganizationSnapshotMutation(c); -} - export async function applyGithubSyncProgressMutation( c: any, input: { diff --git a/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts index 72edf92..c6b0c02 100644 --- a/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts +++ b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts @@ -17,7 +17,7 @@ import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../../service import { expectQueueResponse } from "../../../services/queue.js"; import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; -import { taskIndex, taskSummaries, repos } from "../db/schema.js"; +import { taskIndex, taskSummaries } from "../db/schema.js"; import { refreshOrganizationSnapshotMutation } from "../actions.js"; interface CreateTaskCommand { @@ -120,11 +120,6 @@ async function listGitHubBranches(c: any, repoId: string): Promise { - const repoRow = await c.db.select({ remoteUrl: repos.remoteUrl }).from(repos).where(eq(repos.repoId, repoId)).get(); - if (repoRow?.remoteUrl) { - return repoRow.remoteUrl; - } - const repository = await resolveGitHubRepository(c, repoId); const remoteUrl = repository?.cloneUrl?.trim(); if (!remoteUrl) { diff --git a/foundry/packages/backend/src/actors/organization/actions/tasks.ts b/foundry/packages/backend/src/actors/organization/actions/tasks.ts index be2cd62..b29baa1 100644 --- a/foundry/packages/backend/src/actors/organization/actions/tasks.ts +++ b/foundry/packages/backend/src/actors/organization/actions/tasks.ts @@ -21,12 +21,11 @@ import type { TaskWorkspaceUpdateDraftInput, } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../../context.js"; -import { getOrCreateAuditLog, getTask as getTaskHandle, selfOrganization } from "../../handles.js"; +import { getOrCreateAuditLog, getOrCreateGithubData, getTask as getTaskHandle, selfOrganization } from "../../handles.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; import { expectQueueResponse } from "../../../services/queue.js"; import { logActorWarning, resolveErrorMessage } from "../../logging.js"; import { taskWorkflowQueueName } from "../../task/workflow/index.js"; -import { repos } from "../db/schema.js"; import { organizationWorkflowQueueName } from "../queues.js"; import { createTaskMutation, @@ -44,8 +43,9 @@ function assertOrganization(c: { state: { organizationId: string } }, organizati } async function requireRepoExists(c: any, repoId: string): Promise { - const repoRow = await c.db.select({ repoId: repos.repoId }).from(repos).where(eq(repos.repoId, repoId)).get(); - if (!repoRow) { + const githubData = await getOrCreateGithubData(c, c.state.organizationId); + const repo = await githubData.getRepository({ repoId }); + if (!repo) { throw new Error(`Unknown repo: ${repoId}`); } } diff --git a/foundry/packages/backend/src/actors/organization/app-shell.ts b/foundry/packages/backend/src/actors/organization/app-shell.ts index 4a95681..dddca16 100644 --- a/foundry/packages/backend/src/actors/organization/app-shell.ts +++ b/foundry/packages/backend/src/actors/organization/app-shell.ts @@ -19,7 +19,7 @@ import { getBetterAuthService } from "../../services/better-auth.js"; import { expectQueueResponse } from "../../services/queue.js"; import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js"; import { logger } from "../../logging.js"; -import { invoices, organizationMembers, organizationProfile, repos, seatAssignments, stripeLookup } from "./db/schema.js"; +import { invoices, organizationMembers, organizationProfile, seatAssignments, stripeLookup } from "./db/schema.js"; import { APP_SHELL_ORGANIZATION_ID } from "./constants.js"; import { organizationWorkflowQueueName } from "./queues.js"; @@ -575,8 +575,13 @@ async function listOrganizationInvoices(c: any): Promise { assertOrganizationShell(c); - const rows = await c.db.select({ remoteUrl: repos.remoteUrl }).from(repos).orderBy(desc(repos.updatedAt)).all(); - return rows.map((row) => repoLabelFromRemote(row.remoteUrl)).sort((left, right) => left.localeCompare(right)); + try { + const githubData = await getOrCreateGithubData(c, c.state.organizationId); + const rows = await githubData.listRepositories({}); + return rows.map((row: any) => repoLabelFromRemote(row.cloneUrl)).sort((a: string, b: string) => a.localeCompare(b)); + } catch { + return []; + } } export async function buildOrganizationState(c: any) { diff --git a/foundry/packages/backend/src/actors/organization/db/schema.ts b/foundry/packages/backend/src/actors/organization/db/schema.ts index 2f19fe5..5071a25 100644 --- a/foundry/packages/backend/src/actors/organization/db/schema.ts +++ b/foundry/packages/backend/src/actors/organization/db/schema.ts @@ -4,17 +4,6 @@ import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared"; // SQLite is per organization actor instance, so no organizationId column needed. -/** - * Repository catalog. Rows are created/removed when repos are added/removed - * from the organization via GitHub sync. - */ -export const repos = sqliteTable("repos", { - repoId: text("repo_id").notNull().primaryKey(), - remoteUrl: text("remote_url").notNull(), - createdAt: integer("created_at").notNull(), - updatedAt: integer("updated_at").notNull(), -}); - /** * Coordinator index of TaskActor instances. * The organization actor is the direct coordinator for tasks (not a per-repo diff --git a/foundry/packages/backend/src/actors/organization/index.ts b/foundry/packages/backend/src/actors/organization/index.ts index 2fff662..08f979c 100644 --- a/foundry/packages/backend/src/actors/organization/index.ts +++ b/foundry/packages/backend/src/actors/organization/index.ts @@ -1,9 +1,8 @@ import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import { organizationDb } from "./db/db.js"; import { organizationActions } from "./actions.js"; import { ORGANIZATION_QUEUE_NAMES } from "./queues.js"; -import { runOrganizationWorkflow } from "./workflow.js"; +import { runOrganizationCommandLoop } from "./workflow.js"; export const organization = actor({ db: organizationDb, @@ -17,5 +16,5 @@ export const organization = actor({ organizationId, }), actions: organizationActions, - run: workflow(runOrganizationWorkflow), + run: runOrganizationCommandLoop, }); diff --git a/foundry/packages/backend/src/actors/organization/queues.ts b/foundry/packages/backend/src/actors/organization/queues.ts index 4fbfe0a..f84e818 100644 --- a/foundry/packages/backend/src/actors/organization/queues.ts +++ b/foundry/packages/backend/src/actors/organization/queues.ts @@ -18,8 +18,6 @@ export const ORGANIZATION_QUEUE_NAMES = [ "organization.command.better_auth.verification.update_many", "organization.command.better_auth.verification.delete", "organization.command.better_auth.verification.delete_many", - "organization.command.github.repository_projection.apply", - "organization.command.github.data_projection.apply", "organization.command.github.sync_progress.apply", "organization.command.github.webhook_receipt.record", "organization.command.github.organization_shell.sync_from_github", diff --git a/foundry/packages/backend/src/actors/organization/workflow.ts b/foundry/packages/backend/src/actors/organization/workflow.ts index 5e10c99..bd7a205 100644 --- a/foundry/packages/backend/src/actors/organization/workflow.ts +++ b/foundry/packages/backend/src/actors/organization/workflow.ts @@ -1,13 +1,6 @@ // @ts-nocheck -import { Loop } from "rivetkit/workflow"; import { logActorWarning, resolveErrorMessage } from "../logging.js"; -import { - applyGithubDataProjectionMutation, - applyGithubRepositoryProjectionMutation, - applyGithubSyncProgressMutation, - recordGithubWebhookReceiptMutation, - refreshOrganizationSnapshotMutation, -} from "./actions.js"; +import { applyGithubSyncProgressMutation, recordGithubWebhookReceiptMutation, refreshOrganizationSnapshotMutation } from "./actions.js"; import { applyTaskSummaryUpdateMutation, createTaskMutation, @@ -42,363 +35,134 @@ import { } from "./app-shell.js"; import { ORGANIZATION_QUEUE_NAMES } from "./queues.js"; -export async function runOrganizationWorkflow(ctx: any): Promise { - await ctx.loop("organization-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-organization-command", { - names: [...ORGANIZATION_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); - } +// Command handler dispatch table — maps queue name to handler function. +const COMMAND_HANDLERS: Record Promise> = { + "organization.command.createTask": (c, body) => createTaskMutation(c, body), + "organization.command.materializeTask": (c, body) => createTaskMutation(c, body), + "organization.command.registerTaskBranch": (c, body) => registerTaskBranchMutation(c, body), + "organization.command.applyTaskSummaryUpdate": async (c, body) => { + await applyTaskSummaryUpdateMutation(c, body); + return { ok: true }; + }, + "organization.command.removeTaskSummary": async (c, body) => { + await removeTaskSummaryMutation(c, body); + return { ok: true }; + }, + "organization.command.refreshTaskSummaryForBranch": async (c, body) => { + await refreshTaskSummaryForBranchMutation(c, body); + return { ok: true }; + }, + "organization.command.snapshot.broadcast": async (c, _body) => { + await refreshOrganizationSnapshotMutation(c); + return { ok: true }; + }, + "organization.command.syncGithubSession": async (c, body) => { + const { syncGithubOrganizations } = await import("./app-shell.js"); + await syncGithubOrganizations(c, body); + return { ok: true }; + }, + "organization.command.better_auth.session_index.upsert": (c, body) => betterAuthUpsertSessionIndexMutation(c, body), + "organization.command.better_auth.session_index.delete": async (c, body) => { + await betterAuthDeleteSessionIndexMutation(c, body); + return { ok: true }; + }, + "organization.command.better_auth.email_index.upsert": (c, body) => betterAuthUpsertEmailIndexMutation(c, body), + "organization.command.better_auth.email_index.delete": async (c, body) => { + await betterAuthDeleteEmailIndexMutation(c, body); + return { ok: true }; + }, + "organization.command.better_auth.account_index.upsert": (c, body) => betterAuthUpsertAccountIndexMutation(c, body), + "organization.command.better_auth.account_index.delete": async (c, body) => { + await betterAuthDeleteAccountIndexMutation(c, body); + return { ok: true }; + }, + "organization.command.better_auth.verification.create": (c, body) => betterAuthCreateVerificationMutation(c, body), + "organization.command.better_auth.verification.update": (c, body) => betterAuthUpdateVerificationMutation(c, body), + "organization.command.better_auth.verification.update_many": (c, body) => betterAuthUpdateManyVerificationMutation(c, body), + "organization.command.better_auth.verification.delete": async (c, body) => { + await betterAuthDeleteVerificationMutation(c, body); + return { ok: true }; + }, + "organization.command.better_auth.verification.delete_many": (c, body) => betterAuthDeleteManyVerificationMutation(c, body), + "organization.command.github.sync_progress.apply": async (c, body) => { + await applyGithubSyncProgressMutation(c, body); + return { ok: true }; + }, + "organization.command.github.webhook_receipt.record": async (c, body) => { + await recordGithubWebhookReceiptMutation(c, body); + return { ok: true }; + }, + "organization.command.github.organization_shell.sync_from_github": (c, body) => syncOrganizationShellFromGithubMutation(c, body), + "organization.command.shell.profile.update": async (c, body) => { + await updateOrganizationShellProfileMutation(c, body); + return { ok: true }; + }, + "organization.command.shell.sync_started.mark": async (c, body) => { + await markOrganizationSyncStartedMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.stripe_customer.apply": async (c, body) => { + await applyOrganizationStripeCustomerMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.stripe_subscription.apply": async (c, body) => { + await applyOrganizationStripeSubscriptionMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.free_plan.apply": async (c, body) => { + await applyOrganizationFreePlanMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.payment_method.set": async (c, body) => { + await setOrganizationBillingPaymentMethodMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.status.set": async (c, body) => { + await setOrganizationBillingStatusMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.invoice.upsert": async (c, body) => { + await upsertOrganizationInvoiceMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.seat_usage.record": async (c, body) => { + await recordOrganizationSeatUsageMutation(c, body); + return { ok: true }; + }, +}; +/** + * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` + * with completable messages. This avoids the RivetKit bug where actors created + * from another actor's workflow context never start their `run: workflow(...)`. + * + * The queue is still durable — messages survive restarts. Only in-flight processing + * of a single message is lost on crash (the message is retried). All mutations are + * idempotent, so this is safe. + */ +export async function runOrganizationCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...ORGANIZATION_QUEUE_NAMES], completable: true })) { try { - if (msg.name === "organization.command.createTask") { - const result = await loopCtx.step({ - name: "organization-create-task", - timeout: 5 * 60_000, - run: async () => createTaskMutation(loopCtx, msg.body), - }); + const handler = COMMAND_HANDLERS[msg.name]; + if (handler) { + const result = await handler(c, msg.body); await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.materializeTask") { - const result = await loopCtx.step({ - name: "organization-materialize-task", - timeout: 5 * 60_000, - run: async () => createTaskMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.registerTaskBranch") { - const result = await loopCtx.step({ - name: "organization-register-task-branch", - timeout: 60_000, - run: async () => registerTaskBranchMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.applyTaskSummaryUpdate") { - await loopCtx.step({ - name: "organization-apply-task-summary-update", - timeout: 30_000, - run: async () => applyTaskSummaryUpdateMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.removeTaskSummary") { - await loopCtx.step({ - name: "organization-remove-task-summary", - timeout: 30_000, - run: async () => removeTaskSummaryMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.refreshTaskSummaryForBranch") { - await loopCtx.step({ - name: "organization-refresh-task-summary-for-branch", - timeout: 60_000, - run: async () => refreshTaskSummaryForBranchMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.snapshot.broadcast") { - await loopCtx.step({ - name: "organization-snapshot-broadcast", - timeout: 60_000, - run: async () => refreshOrganizationSnapshotMutation(loopCtx), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.syncGithubSession") { - await loopCtx.step({ - name: "organization-sync-github-session", - timeout: 60_000, - run: async () => { - const { syncGithubOrganizations } = await import("./app-shell.js"); - await syncGithubOrganizations(loopCtx, msg.body as { sessionId: string; accessToken: string }); - }, - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.session_index.upsert") { - const result = await loopCtx.step({ - name: "organization-better-auth-session-index-upsert", - timeout: 60_000, - run: async () => betterAuthUpsertSessionIndexMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.session_index.delete") { - await loopCtx.step({ - name: "organization-better-auth-session-index-delete", - timeout: 60_000, - run: async () => betterAuthDeleteSessionIndexMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.email_index.upsert") { - const result = await loopCtx.step({ - name: "organization-better-auth-email-index-upsert", - timeout: 60_000, - run: async () => betterAuthUpsertEmailIndexMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.email_index.delete") { - await loopCtx.step({ - name: "organization-better-auth-email-index-delete", - timeout: 60_000, - run: async () => betterAuthDeleteEmailIndexMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.account_index.upsert") { - const result = await loopCtx.step({ - name: "organization-better-auth-account-index-upsert", - timeout: 60_000, - run: async () => betterAuthUpsertAccountIndexMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.account_index.delete") { - await loopCtx.step({ - name: "organization-better-auth-account-index-delete", - timeout: 60_000, - run: async () => betterAuthDeleteAccountIndexMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.create") { - const result = await loopCtx.step({ - name: "organization-better-auth-verification-create", - timeout: 60_000, - run: async () => betterAuthCreateVerificationMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.update") { - const result = await loopCtx.step({ - name: "organization-better-auth-verification-update", - timeout: 60_000, - run: async () => betterAuthUpdateVerificationMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.update_many") { - const result = await loopCtx.step({ - name: "organization-better-auth-verification-update-many", - timeout: 60_000, - run: async () => betterAuthUpdateManyVerificationMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.delete") { - await loopCtx.step({ - name: "organization-better-auth-verification-delete", - timeout: 60_000, - run: async () => betterAuthDeleteVerificationMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.delete_many") { - const result = await loopCtx.step({ - name: "organization-better-auth-verification-delete-many", - timeout: 60_000, - run: async () => betterAuthDeleteManyVerificationMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.repository_projection.apply") { - await loopCtx.step({ - name: "organization-github-repository-projection-apply", - timeout: 60_000, - run: async () => applyGithubRepositoryProjectionMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.data_projection.apply") { - await loopCtx.step({ - name: "organization-github-data-projection-apply", - timeout: 60_000, - run: async () => applyGithubDataProjectionMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.sync_progress.apply") { - await loopCtx.step({ - name: "organization-github-sync-progress-apply", - timeout: 60_000, - run: async () => applyGithubSyncProgressMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.webhook_receipt.record") { - await loopCtx.step({ - name: "organization-github-webhook-receipt-record", - timeout: 60_000, - run: async () => recordGithubWebhookReceiptMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.organization_shell.sync_from_github") { - const result = await loopCtx.step({ - name: "organization-github-organization-shell-sync-from-github", - timeout: 60_000, - run: async () => syncOrganizationShellFromGithubMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.shell.profile.update") { - await loopCtx.step({ - name: "organization-shell-profile-update", - timeout: 60_000, - run: async () => updateOrganizationShellProfileMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.shell.sync_started.mark") { - await loopCtx.step({ - name: "organization-shell-sync-started-mark", - timeout: 60_000, - run: async () => markOrganizationSyncStartedMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.stripe_customer.apply") { - await loopCtx.step({ - name: "organization-billing-stripe-customer-apply", - timeout: 60_000, - run: async () => applyOrganizationStripeCustomerMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.stripe_subscription.apply") { - await loopCtx.step({ - name: "organization-billing-stripe-subscription-apply", - timeout: 60_000, - run: async () => applyOrganizationStripeSubscriptionMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.free_plan.apply") { - await loopCtx.step({ - name: "organization-billing-free-plan-apply", - timeout: 60_000, - run: async () => applyOrganizationFreePlanMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.payment_method.set") { - await loopCtx.step({ - name: "organization-billing-payment-method-set", - timeout: 60_000, - run: async () => setOrganizationBillingPaymentMethodMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.status.set") { - await loopCtx.step({ - name: "organization-billing-status-set", - timeout: 60_000, - run: async () => setOrganizationBillingStatusMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.invoice.upsert") { - await loopCtx.step({ - name: "organization-billing-invoice-upsert", - timeout: 60_000, - run: async () => upsertOrganizationInvoiceMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.seat_usage.record") { - await loopCtx.step({ - name: "organization-billing-seat-usage-record", - timeout: 60_000, - run: async () => recordOrganizationSeatUsageMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); + } else { + logActorWarning("organization", "unknown queue message", { queueName: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }); } } catch (error) { const message = resolveErrorMessage(error); - logActorWarning("organization", "organization workflow command failed", { + logActorWarning("organization", "organization command failed", { queueName: msg.name, error: message, }); await msg.complete({ error: message }).catch((completeError: unknown) => { - logActorWarning("organization", "organization workflow failed completing error response", { + logActorWarning("organization", "organization command failed completing error response", { queueName: msg.name, error: resolveErrorMessage(completeError), }); }); } - - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/backend/src/actors/task/index.ts b/foundry/packages/backend/src/actors/task/index.ts index ad66f14..30f6836 100644 --- a/foundry/packages/backend/src/actors/task/index.ts +++ b/foundry/packages/backend/src/actors/task/index.ts @@ -1,10 +1,9 @@ import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import type { TaskRecord } from "@sandbox-agent/foundry-shared"; import { taskDb } from "./db/db.js"; import { getCurrentRecord } from "./workflow/common.js"; import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js"; -import { TASK_QUEUE_NAMES, runTaskWorkflow } from "./workflow/index.js"; +import { TASK_QUEUE_NAMES, runTaskCommandLoop } from "./workflow/index.js"; export interface TaskInput { organizationId: string; @@ -42,7 +41,7 @@ export const task = actor({ return await getSessionDetail(c, input.sessionId, input.authSessionId); }, }, - run: workflow(runTaskWorkflow), + run: runTaskCommandLoop, }); export { TASK_QUEUE_NAMES }; diff --git a/foundry/packages/backend/src/actors/task/workflow/index.ts b/foundry/packages/backend/src/actors/task/workflow/index.ts index 9b18d20..f23871a 100644 --- a/foundry/packages/backend/src/actors/task/workflow/index.ts +++ b/foundry/packages/backend/src/actors/task/workflow/index.ts @@ -1,4 +1,3 @@ -import { Loop } from "rivetkit/workflow"; import { logActorWarning, resolveErrorMessage } from "../../logging.js"; import { getCurrentRecord } from "./common.js"; import { initBootstrapDbActivity, initCompleteActivity, initEnqueueProvisionActivity, initFailedActivity } from "./init.js"; @@ -38,16 +37,14 @@ export { TASK_QUEUE_NAMES, taskWorkflowQueueName } from "./queue.js"; type TaskQueueName = (typeof TASK_QUEUE_NAMES)[number]; -type WorkflowHandler = (loopCtx: any, msg: { name: TaskQueueName; body: any; complete: (response: unknown) => Promise }) => Promise; +type CommandHandler = (c: any, msg: { name: TaskQueueName; body: any; complete: (response: unknown) => Promise }) => Promise; -const commandHandlers: Record = { - "task.command.initialize": async (loopCtx, msg) => { +const commandHandlers: Record = { + "task.command.initialize": async (c, msg) => { const body = msg.body; - - await loopCtx.step("init-bootstrap-db", async () => initBootstrapDbActivity(loopCtx, body)); - await loopCtx.step("init-enqueue-provision", async () => initEnqueueProvisionActivity(loopCtx, body)); - await loopCtx.removed("init-dispatch-provision-v2", "step"); - const currentRecord = await loopCtx.step("init-read-current-record", async () => getCurrentRecord(loopCtx)); + await initBootstrapDbActivity(c, body); + await initEnqueueProvisionActivity(c, body); + const currentRecord = await getCurrentRecord(c); try { await msg.complete(currentRecord); } catch (error) { @@ -57,23 +54,12 @@ const commandHandlers: Record = { } }, - "task.command.provision": async (loopCtx, msg) => { - await loopCtx.removed("init-failed", "step"); - await loopCtx.removed("init-failed-v2", "step"); + "task.command.provision": async (c, msg) => { try { - await loopCtx.removed("init-ensure-name", "step"); - await loopCtx.removed("init-assert-name", "step"); - await loopCtx.removed("init-create-sandbox", "step"); - await loopCtx.removed("init-ensure-agent", "step"); - await loopCtx.removed("init-start-sandbox-instance", "step"); - await loopCtx.removed("init-expose-sandbox", "step"); - await loopCtx.removed("init-create-session", "step"); - await loopCtx.removed("init-write-db", "step"); - await loopCtx.removed("init-start-status-sync", "step"); - await loopCtx.step("init-complete", async () => initCompleteActivity(loopCtx, msg.body)); + await initCompleteActivity(c, msg.body); await msg.complete({ ok: true }); } catch (error) { - await loopCtx.step("init-failed-v3", async () => initFailedActivity(loopCtx, error, msg.body)); + await initFailedActivity(c, error, msg.body); await msg.complete({ ok: false, error: resolveErrorMessage(error), @@ -81,79 +67,67 @@ const commandHandlers: Record = { } }, - "task.command.attach": async (loopCtx, msg) => { - await loopCtx.step("handle-attach", async () => handleAttachActivity(loopCtx, msg)); + "task.command.attach": async (c, msg) => { + await handleAttachActivity(c, msg); }, - "task.command.switch": async (loopCtx, msg) => { - await loopCtx.step("handle-switch", async () => handleSwitchActivity(loopCtx, msg)); + "task.command.switch": async (c, msg) => { + await handleSwitchActivity(c, msg); }, - "task.command.push": async (loopCtx, msg) => { - await loopCtx.step("handle-push", async () => handlePushActivity(loopCtx, msg)); + "task.command.push": async (c, msg) => { + await handlePushActivity(c, msg); }, - "task.command.sync": async (loopCtx, msg) => { - await loopCtx.step("handle-sync", async () => handleSimpleCommandActivity(loopCtx, msg, "task.sync")); + "task.command.sync": async (c, msg) => { + await handleSimpleCommandActivity(c, msg, "task.sync"); }, - "task.command.merge": async (loopCtx, msg) => { - await loopCtx.step("handle-merge", async () => handleSimpleCommandActivity(loopCtx, msg, "task.merge")); + "task.command.merge": async (c, msg) => { + await handleSimpleCommandActivity(c, msg, "task.merge"); }, - "task.command.archive": async (loopCtx, msg) => { - await loopCtx.step("handle-archive", async () => handleArchiveActivity(loopCtx, msg)); + "task.command.archive": async (c, msg) => { + await handleArchiveActivity(c, msg); }, - "task.command.kill": async (loopCtx, msg) => { - await loopCtx.step("kill-destroy-sandbox", async () => killDestroySandboxActivity(loopCtx)); - await loopCtx.step("kill-write-db", async () => killWriteDbActivity(loopCtx, msg)); + "task.command.kill": async (c, msg) => { + await killDestroySandboxActivity(c); + await killWriteDbActivity(c, msg); }, - "task.command.get": async (loopCtx, msg) => { - await loopCtx.step("handle-get", async () => handleGetActivity(loopCtx, msg)); + "task.command.get": async (c, msg) => { + await handleGetActivity(c, msg); }, - "task.command.pull_request.sync": async (loopCtx, msg) => { - await loopCtx.step("task-pull-request-sync", async () => syncTaskPullRequest(loopCtx, msg.body?.pullRequest ?? null)); + "task.command.pull_request.sync": async (c, msg) => { + await syncTaskPullRequest(c, msg.body?.pullRequest ?? null); await msg.complete({ ok: true }); }, - "task.command.workspace.mark_unread": async (loopCtx, msg) => { - await loopCtx.step("workspace-mark-unread", async () => markWorkspaceUnread(loopCtx, msg.body?.authSessionId)); + "task.command.workspace.mark_unread": async (c, msg) => { + await markWorkspaceUnread(c, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.rename_task": async (loopCtx, msg) => { - await loopCtx.step("workspace-rename-task", async () => renameWorkspaceTask(loopCtx, msg.body.value)); + "task.command.workspace.rename_task": async (c, msg) => { + await renameWorkspaceTask(c, msg.body.value); await msg.complete({ ok: true }); }, - "task.command.workspace.create_session": async (loopCtx, msg) => { + "task.command.workspace.create_session": async (c, msg) => { try { - const created = await loopCtx.step({ - name: "workspace-create-session", - timeout: 5 * 60_000, - run: async () => createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId), - }); + const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId); await msg.complete(created); } catch (error) { await msg.complete({ error: resolveErrorMessage(error) }); } }, - "task.command.workspace.create_session_and_send": async (loopCtx, msg) => { + "task.command.workspace.create_session_and_send": async (c, msg) => { try { - const created = await loopCtx.step({ - name: "workspace-create-session-for-send", - timeout: 5 * 60_000, - run: async () => createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId), - }); - await loopCtx.step({ - name: "workspace-send-initial-message", - timeout: 5 * 60_000, - run: async () => sendWorkspaceMessage(loopCtx, created.sessionId, msg.body.text, [], msg.body?.authSessionId), - }); + const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId); + await sendWorkspaceMessage(c, created.sessionId, msg.body.text, [], msg.body?.authSessionId); } catch (error) { logActorWarning("task.workflow", "create_session_and_send failed", { error: resolveErrorMessage(error), @@ -162,135 +136,102 @@ const commandHandlers: Record = { await msg.complete({ ok: true }); }, - "task.command.workspace.ensure_session": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-ensure-session", - timeout: 5 * 60_000, - run: async () => ensureWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId), - }); + "task.command.workspace.ensure_session": async (c, msg) => { + await ensureWorkspaceSession(c, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.rename_session": async (loopCtx, msg) => { - await loopCtx.step("workspace-rename-session", async () => renameWorkspaceSession(loopCtx, msg.body.sessionId, msg.body.title)); + "task.command.workspace.rename_session": async (c, msg) => { + await renameWorkspaceSession(c, msg.body.sessionId, msg.body.title); await msg.complete({ ok: true }); }, - "task.command.workspace.select_session": async (loopCtx, msg) => { - await loopCtx.step("workspace-select-session", async () => selectWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId)); + "task.command.workspace.select_session": async (c, msg) => { + await selectWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.set_session_unread": async (loopCtx, msg) => { - await loopCtx.step("workspace-set-session-unread", async () => setWorkspaceSessionUnread(loopCtx, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId)); + "task.command.workspace.set_session_unread": async (c, msg) => { + await setWorkspaceSessionUnread(c, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.update_draft": async (loopCtx, msg) => { - await loopCtx.step("workspace-update-draft", async () => updateWorkspaceDraft(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId)); + "task.command.workspace.update_draft": async (c, msg) => { + await updateWorkspaceDraft(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.change_model": async (loopCtx, msg) => { - await loopCtx.step("workspace-change-model", async () => changeWorkspaceModel(loopCtx, msg.body.sessionId, msg.body.model, msg.body?.authSessionId)); + "task.command.workspace.change_model": async (c, msg) => { + await changeWorkspaceModel(c, msg.body.sessionId, msg.body.model, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.send_message": async (loopCtx, msg) => { + "task.command.workspace.send_message": async (c, msg) => { try { - await loopCtx.step({ - name: "workspace-send-message", - timeout: 10 * 60_000, - run: async () => sendWorkspaceMessage(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId), - }); + await sendWorkspaceMessage(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId); await msg.complete({ ok: true }); } catch (error) { await msg.complete({ error: resolveErrorMessage(error) }); } }, - "task.command.workspace.stop_session": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-stop-session", - timeout: 5 * 60_000, - run: async () => stopWorkspaceSession(loopCtx, msg.body.sessionId), - }); + "task.command.workspace.stop_session": async (c, msg) => { + await stopWorkspaceSession(c, msg.body.sessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.sync_session_status": async (loopCtx, msg) => { - await loopCtx.step("workspace-sync-session-status", async () => syncWorkspaceSessionStatus(loopCtx, msg.body.sessionId, msg.body.status, msg.body.at)); + "task.command.workspace.sync_session_status": async (c, msg) => { + await syncWorkspaceSessionStatus(c, msg.body.sessionId, msg.body.status, msg.body.at); await msg.complete({ ok: true }); }, - "task.command.workspace.refresh_derived": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-refresh-derived", - timeout: 5 * 60_000, - run: async () => refreshWorkspaceDerivedState(loopCtx), - }); + "task.command.workspace.refresh_derived": async (c, msg) => { + await refreshWorkspaceDerivedState(c); await msg.complete({ ok: true }); }, - "task.command.workspace.refresh_session_transcript": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-refresh-session-transcript", - timeout: 60_000, - run: async () => refreshWorkspaceSessionTranscript(loopCtx, msg.body.sessionId), - }); + "task.command.workspace.refresh_session_transcript": async (c, msg) => { + await refreshWorkspaceSessionTranscript(c, msg.body.sessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.close_session": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-close-session", - timeout: 5 * 60_000, - run: async () => closeWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId), - }); + "task.command.workspace.close_session": async (c, msg) => { + await closeWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.publish_pr": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-publish-pr", - timeout: 10 * 60_000, - run: async () => publishWorkspacePr(loopCtx), - }); + "task.command.workspace.publish_pr": async (c, msg) => { + await publishWorkspacePr(c); await msg.complete({ ok: true }); }, - "task.command.workspace.revert_file": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-revert-file", - timeout: 5 * 60_000, - run: async () => revertWorkspaceFile(loopCtx, msg.body.path), - }); + "task.command.workspace.revert_file": async (c, msg) => { + await revertWorkspaceFile(c, msg.body.path); await msg.complete({ ok: true }); }, }; -export async function runTaskWorkflow(ctx: any): Promise { - await ctx.loop("task-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-command", { - names: [...TASK_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); - } +/** + * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` + * with completable messages. + */ +export async function runTaskCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...TASK_QUEUE_NAMES], completable: true })) { const handler = commandHandlers[msg.name as TaskQueueName]; if (handler) { try { - await handler(loopCtx, msg); + await handler(c, msg); } catch (error) { const message = resolveErrorMessage(error); - logActorWarning("task.workflow", "task workflow command failed", { + logActorWarning("task.workflow", "task command failed", { queueName: msg.name, error: message, }); await msg.complete({ error: message }).catch(() => {}); } + } else { + logActorWarning("task.workflow", "unknown queue message", { queueName: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }); } - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/backend/src/actors/user/index.ts b/foundry/packages/backend/src/actors/user/index.ts index 28d48e6..01d6e0f 100644 --- a/foundry/packages/backend/src/actors/user/index.ts +++ b/foundry/packages/backend/src/actors/user/index.ts @@ -1,9 +1,8 @@ import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import { userDb } from "./db/db.js"; import { betterAuthActions } from "./actions/better-auth.js"; import { userActions } from "./actions/user.js"; -import { USER_QUEUE_NAMES, runUserWorkflow } from "./workflow.js"; +import { USER_QUEUE_NAMES, runUserCommandLoop } from "./workflow.js"; export const user = actor({ db: userDb, @@ -20,5 +19,5 @@ export const user = actor({ ...betterAuthActions, ...userActions, }, - run: workflow(runUserWorkflow), + run: runUserCommandLoop, }); diff --git a/foundry/packages/backend/src/actors/user/workflow.ts b/foundry/packages/backend/src/actors/user/workflow.ts index 3d61f6c..87f5326 100644 --- a/foundry/packages/backend/src/actors/user/workflow.ts +++ b/foundry/packages/backend/src/actors/user/workflow.ts @@ -1,5 +1,4 @@ import { eq, count as sqlCount, and } from "drizzle-orm"; -import { Loop } from "rivetkit/workflow"; import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared"; import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js"; @@ -26,8 +25,15 @@ export function userWorkflowQueueName(name: UserQueueName): UserQueueName { async function createAuthRecordMutation(c: any, input: { model: string; data: Record }) { const table = tableFor(input.model); const persisted = persistInput(input.model, input.data); - await c.db.insert(table).values(persisted as any).run(); - const row = await c.db.select().from(table).where(eq(columnFor(input.model, table, "id"), input.data.id as any)).get(); + await c.db + .insert(table) + .values(persisted as any) + .run(); + const row = await c.db + .select() + .from(table) + .where(eq(columnFor(input.model, table, "id"), input.data.id as any)) + .get(); return materializeRow(input.model, row); } @@ -37,7 +43,11 @@ async function updateAuthRecordMutation(c: any, input: { model: string; where: a if (!predicate) { throw new Error("updateAuthRecord requires a where clause"); } - await c.db.update(table).set(persistPatch(input.model, input.update) as any).where(predicate).run(); + await c.db + .update(table) + .set(persistPatch(input.model, input.update) as any) + .where(predicate) + .run(); return materializeRow(input.model, await c.db.select().from(table).where(predicate).get()); } @@ -47,7 +57,11 @@ async function updateManyAuthRecordsMutation(c: any, input: { model: string; whe if (!predicate) { throw new Error("updateManyAuthRecords requires a where clause"); } - await c.db.update(table).set(persistPatch(input.model, input.update) as any).where(predicate).run(); + await c.db + .update(table) + .set(persistPatch(input.model, input.update) as any) + .where(predicate) + .run(); const row = await c.db.select({ value: sqlCount() }).from(table).where(predicate).get(); return row?.value ?? 0; } @@ -222,60 +236,46 @@ async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionI await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run(); } -export async function runUserWorkflow(ctx: any): Promise { - await ctx.loop("user-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-user-command", { - names: [...USER_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); - } +const COMMAND_HANDLERS: Record Promise> = { + "user.command.auth.create": (c, body) => createAuthRecordMutation(c, body), + "user.command.auth.update": (c, body) => updateAuthRecordMutation(c, body), + "user.command.auth.update_many": (c, body) => updateManyAuthRecordsMutation(c, body), + "user.command.auth.delete": async (c, body) => { + await deleteAuthRecordMutation(c, body); + return { ok: true }; + }, + "user.command.auth.delete_many": (c, body) => deleteManyAuthRecordsMutation(c, body), + "user.command.profile.upsert": (c, body) => upsertUserProfileMutation(c, body), + "user.command.session_state.upsert": (c, body) => upsertSessionStateMutation(c, body), + "user.command.task_state.upsert": (c, body) => upsertTaskStateMutation(c, body), + "user.command.task_state.delete": async (c, body) => { + await deleteTaskStateMutation(c, body); + return { ok: true }; + }, +}; +/** + * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` + * with completable messages. + */ +export async function runUserCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...USER_QUEUE_NAMES], completable: true })) { try { - let result: unknown; - switch (msg.name) { - case "user.command.auth.create": - result = await loopCtx.step({ name: "user-auth-create", timeout: 60_000, run: async () => createAuthRecordMutation(loopCtx, msg.body) }); - break; - case "user.command.auth.update": - result = await loopCtx.step({ name: "user-auth-update", timeout: 60_000, run: async () => updateAuthRecordMutation(loopCtx, msg.body) }); - break; - case "user.command.auth.update_many": - result = await loopCtx.step({ name: "user-auth-update-many", timeout: 60_000, run: async () => updateManyAuthRecordsMutation(loopCtx, msg.body) }); - break; - case "user.command.auth.delete": - result = await loopCtx.step({ name: "user-auth-delete", timeout: 60_000, run: async () => deleteAuthRecordMutation(loopCtx, msg.body) }); - break; - case "user.command.auth.delete_many": - result = await loopCtx.step({ name: "user-auth-delete-many", timeout: 60_000, run: async () => deleteManyAuthRecordsMutation(loopCtx, msg.body) }); - break; - case "user.command.profile.upsert": - result = await loopCtx.step({ name: "user-profile-upsert", timeout: 60_000, run: async () => upsertUserProfileMutation(loopCtx, msg.body) }); - break; - case "user.command.session_state.upsert": - result = await loopCtx.step({ name: "user-session-state-upsert", timeout: 60_000, run: async () => upsertSessionStateMutation(loopCtx, msg.body) }); - break; - case "user.command.task_state.upsert": - result = await loopCtx.step({ name: "user-task-state-upsert", timeout: 60_000, run: async () => upsertTaskStateMutation(loopCtx, msg.body) }); - break; - case "user.command.task_state.delete": - result = await loopCtx.step({ name: "user-task-state-delete", timeout: 60_000, run: async () => deleteTaskStateMutation(loopCtx, msg.body) }); - break; - default: - return Loop.continue(undefined); + const handler = COMMAND_HANDLERS[msg.name]; + if (handler) { + const result = await handler(c, msg.body); + await msg.complete(result); + } else { + logActorWarning("user", "unknown queue message", { queueName: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }); } - - await msg.complete(result); } catch (error) { const message = resolveErrorMessage(error); - logActorWarning("user", "user workflow command failed", { + logActorWarning("user", "user command failed", { queueName: msg.name, error: message, }); await msg.complete({ error: message }).catch(() => {}); } - - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/client/src/backend-client.ts b/foundry/packages/client/src/backend-client.ts index a0ff36c..8331bf6 100644 --- a/foundry/packages/client/src/backend-client.ts +++ b/foundry/packages/client/src/backend-client.ts @@ -183,6 +183,7 @@ export interface BackendClientOptions { endpoint: string; defaultOrganizationId?: string; mode?: "remote" | "mock"; + encoding?: "json" | "cbor" | "bare"; } export interface BackendClient { @@ -413,7 +414,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien const endpoints = deriveBackendEndpoints(options.endpoint); const rivetApiEndpoint = endpoints.rivetEndpoint; const appApiEndpoint = endpoints.appEndpoint; - const client = createClient({ endpoint: rivetApiEndpoint }) as unknown as RivetClient; + const client = createClient({ endpoint: rivetApiEndpoint, encoding: options.encoding }) as unknown as RivetClient; const workspaceSubscriptions = new Map< string, { @@ -514,10 +515,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien const sandboxes = detail.sandboxes as Array<(typeof detail.sandboxes)[number] & { sandboxActorId?: string }>; const sandbox = sandboxes.find( (sb) => - sb.sandboxId === sandboxId && - sb.sandboxProviderId === sandboxProviderId && - typeof sb.sandboxActorId === "string" && - sb.sandboxActorId.length > 0, + sb.sandboxId === sandboxId && sb.sandboxProviderId === sandboxProviderId && typeof sb.sandboxActorId === "string" && sb.sandboxActorId.length > 0, ); if (sandbox?.sandboxActorId) { return (client as any).taskSandbox.getForId(sandbox.sandboxActorId); @@ -582,12 +580,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien return (await task(organizationId, repoId, taskIdValue)).getTaskDetail(await getAuthSessionInput()); }; - const getSessionDetailWithAuth = async ( - organizationId: string, - repoId: string, - taskIdValue: string, - sessionId: string, - ): Promise => { + const getSessionDetailWithAuth = async (organizationId: string, repoId: string, taskIdValue: string, sessionId: string): Promise => { return (await task(organizationId, repoId, taskIdValue)).getSessionDetail(await withAuthSessionInput({ sessionId })); }; @@ -596,67 +589,67 @@ export function createBackendClient(options: BackendClientOptions): BackendClien const summary = await (await organization(organizationId)).getOrganizationSummary({ organizationId }); const resolvedTasks = await Promise.all( summary.taskSummaries.map(async (taskSummary) => { - let detail; - try { - const taskHandle = await task(organizationId, taskSummary.repoId, taskSummary.id); - detail = await taskHandle.getTaskDetail(authSessionInput); - } catch (error) { - if (isActorNotFoundError(error)) { - return null; - } - throw error; + let detail; + try { + const taskHandle = await task(organizationId, taskSummary.repoId, taskSummary.id); + detail = await taskHandle.getTaskDetail(authSessionInput); + } catch (error) { + if (isActorNotFoundError(error)) { + return null; } - const sessionDetails = await Promise.all( - detail.sessionsSummary.map(async (session) => { - try { - const full = await (await task(organizationId, detail.repoId, detail.id)).getSessionDetail({ - sessionId: session.id, - ...(authSessionInput ?? {}), - }); - return [session.id, full] as const; - } catch (error) { - if (isActorNotFoundError(error)) { - return null; - } - throw error; + throw error; + } + const sessionDetails = await Promise.all( + detail.sessionsSummary.map(async (session) => { + try { + const full = await (await task(organizationId, detail.repoId, detail.id)).getSessionDetail({ + sessionId: session.id, + ...(authSessionInput ?? {}), + }); + return [session.id, full] as const; + } catch (error) { + if (isActorNotFoundError(error)) { + return null; } - }), - ); - const sessionDetailsById = new Map(sessionDetails.filter((entry): entry is readonly [string, WorkspaceSessionDetail] => entry !== null)); - return { - id: detail.id, - repoId: detail.repoId, - title: detail.title, - status: detail.status, - repoName: detail.repoName, - updatedAtMs: detail.updatedAtMs, - branch: detail.branch, - pullRequest: detail.pullRequest, - activeSessionId: detail.activeSessionId ?? null, - sessions: detail.sessionsSummary.map((session) => { - const full = sessionDetailsById.get(session.id); - return { - id: session.id, - sessionId: session.sessionId, - sessionName: session.sessionName, - agent: session.agent, - model: session.model, - status: session.status, - thinkingSinceMs: session.thinkingSinceMs, - unread: session.unread, - created: session.created, - draft: full?.draft ?? { text: "", attachments: [], updatedAtMs: null }, - transcript: full?.transcript ?? [], - }; - }), - fileChanges: detail.fileChanges, - diffs: detail.diffs, - fileTree: detail.fileTree, - minutesUsed: detail.minutesUsed, - activeSandboxId: detail.activeSandboxId ?? null, - }; - }), - ); + throw error; + } + }), + ); + const sessionDetailsById = new Map(sessionDetails.filter((entry): entry is readonly [string, WorkspaceSessionDetail] => entry !== null)); + return { + id: detail.id, + repoId: detail.repoId, + title: detail.title, + status: detail.status, + repoName: detail.repoName, + updatedAtMs: detail.updatedAtMs, + branch: detail.branch, + pullRequest: detail.pullRequest, + activeSessionId: detail.activeSessionId ?? null, + sessions: detail.sessionsSummary.map((session) => { + const full = sessionDetailsById.get(session.id); + return { + id: session.id, + sessionId: session.sessionId, + sessionName: session.sessionName, + agent: session.agent, + model: session.model, + status: session.status, + thinkingSinceMs: session.thinkingSinceMs, + unread: session.unread, + created: session.created, + draft: full?.draft ?? { text: "", attachments: [], updatedAtMs: null }, + transcript: full?.transcript ?? [], + }; + }), + fileChanges: detail.fileChanges, + diffs: detail.diffs, + fileTree: detail.fileTree, + minutesUsed: detail.minutesUsed, + activeSandboxId: detail.activeSandboxId ?? null, + }; + }), + ); const tasks = resolvedTasks.filter((task): task is Exclude<(typeof resolvedTasks)[number], null> => task !== null); const repositories = summary.repos @@ -1205,11 +1198,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien return await withSandboxHandle(organizationId, sandboxProviderId, sandboxId, async (handle) => handle.sandboxAgentConnection()); }, - async getSandboxWorkspaceModelGroups( - organizationId: string, - sandboxProviderId: SandboxProviderId, - sandboxId: string, - ): Promise { + async getSandboxWorkspaceModelGroups(organizationId: string, sandboxProviderId: SandboxProviderId, sandboxId: string): Promise { return await withSandboxHandle(organizationId, sandboxProviderId, sandboxId, async (handle) => handle.listWorkspaceModelGroups()); }, diff --git a/foundry/packages/frontend/src/components/mock-layout.tsx b/foundry/packages/frontend/src/components/mock-layout.tsx index b1dadeb..72c2616 100644 --- a/foundry/packages/frontend/src/components/mock-layout.tsx +++ b/foundry/packages/frontend/src/components/mock-layout.tsx @@ -207,15 +207,38 @@ function sessionStateMessage(tab: Task["sessions"][number] | null | undefined): return null; } -function groupRepositories(repos: Array<{ id: string; label: string }>, tasks: Task[]) { +function groupRepositories( + repos: Array<{ id: string; label: string }>, + tasks: Task[], + openPullRequests?: Array<{ + repoId: string; + repoFullName: string; + number: number; + title: string; + state: string; + url: string; + headRefName: string; + authorLogin: string | null; + isDraft: boolean; + }>, +) { return repos .map((repo) => ({ id: repo.id, label: repo.label, updatedAtMs: tasks.filter((task) => task.repoId === repo.id).reduce((latest, task) => Math.max(latest, task.updatedAtMs), 0), tasks: tasks.filter((task) => task.repoId === repo.id).sort((left, right) => right.updatedAtMs - left.updatedAtMs), + pullRequests: (openPullRequests ?? []).filter((pr) => pr.repoId === repo.id), })) - .filter((repo) => repo.tasks.length > 0); + .sort((a, b) => { + // Repos with tasks first, then repos with PRs, then alphabetical + const aHasActivity = a.tasks.length > 0 || a.pullRequests.length > 0; + const bHasActivity = b.tasks.length > 0 || b.pullRequests.length > 0; + if (aHasActivity && !bHasActivity) return -1; + if (!aHasActivity && bHasActivity) return 1; + if (a.updatedAtMs !== b.updatedAtMs) return b.updatedAtMs - a.updatedAtMs; + return a.label.localeCompare(b.label); + }); } interface WorkspaceActions { @@ -1378,7 +1401,8 @@ export function MockLayout({ organizationId, selectedTaskId, selectedSessionId } ); return hydratedTasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs); }, [selectedTaskSummary, selectedSessionId, sessionState.data, taskState.data, taskSummaries, organizationId]); - const rawRepositories = useMemo(() => groupRepositories(organizationRepos, tasks), [tasks, organizationRepos]); + const openPullRequests = organizationState.data?.openPullRequests ?? []; + const rawRepositories = useMemo(() => groupRepositories(organizationRepos, tasks, openPullRequests), [tasks, organizationRepos, openPullRequests]); const appSnapshot = useMockAppSnapshot(); const currentUser = activeMockUser(appSnapshot); const activeOrg = activeMockOrganization(appSnapshot); diff --git a/foundry/packages/frontend/src/components/mock-layout/sidebar.tsx b/foundry/packages/frontend/src/components/mock-layout/sidebar.tsx index c8cafe6..4e8b7ce 100644 --- a/foundry/packages/frontend/src/components/mock-layout/sidebar.tsx +++ b/foundry/packages/frontend/src/components/mock-layout/sidebar.tsx @@ -506,6 +506,7 @@ export const Sidebar = memo(function Sidebar({ return (
{ if (node) { @@ -663,6 +664,7 @@ export const Sidebar = memo(function Sidebar({ return (
{ @@ -775,6 +777,7 @@ export const Sidebar = memo(function Sidebar({ return (
{ @@ -812,6 +815,7 @@ export const Sidebar = memo(function Sidebar({ return (
{ if (node) { diff --git a/foundry/packages/frontend/src/lib/backend.ts b/foundry/packages/frontend/src/lib/backend.ts index b57cc51..be58cdd 100644 --- a/foundry/packages/frontend/src/lib/backend.ts +++ b/foundry/packages/frontend/src/lib/backend.ts @@ -5,4 +5,5 @@ export const backendClient = createBackendClient({ endpoint: backendEndpoint, defaultOrganizationId, mode: frontendClientMode, + encoding: import.meta.env.DEV ? "json" : undefined, }); diff --git a/foundry/packages/shared/src/workspace.ts b/foundry/packages/shared/src/workspace.ts index d1bda9c..d8dff2f 100644 --- a/foundry/packages/shared/src/workspace.ts +++ b/foundry/packages/shared/src/workspace.ts @@ -174,12 +174,27 @@ export interface OrganizationGithubSummary { totalRepositoryCount: number; } +export interface WorkspaceOpenPullRequest { + repoId: string; + repoFullName: string; + number: number; + title: string; + status: string; + state: string; + url: string; + headRefName: string; + baseRefName: string; + authorLogin: string | null; + isDraft: boolean; +} + /** Organization-level snapshot — initial fetch for the organization topic. */ export interface OrganizationSummarySnapshot { organizationId: string; github: OrganizationGithubSummary; repos: WorkspaceRepositorySummary[]; taskSummaries: WorkspaceTaskSummary[]; + openPullRequests?: WorkspaceOpenPullRequest[]; } export interface WorkspaceSession extends WorkspaceSessionSummary {