diff --git a/foundry/CLAUDE.md b/foundry/CLAUDE.md index ab51fc6..3cb5ee4 100644 --- a/foundry/CLAUDE.md +++ b/foundry/CLAUDE.md @@ -56,6 +56,8 @@ Use `pnpm` workspaces and Turborepo. - mock frontend changes: `just foundry-mock` or restart with `just foundry-mock-down && just foundry-mock` - local frontend-only work outside Docker: restart `pnpm --filter @sandbox-agent/foundry-frontend dev` or `just foundry-dev-mock` as appropriate - The backend does **not** hot reload. Bun's `--hot` flag causes the server to re-bind on a different port (e.g. 6421 instead of 6420), breaking all client connections while the container still exposes the original port. After backend code changes, restart the backend container: `just foundry-dev-down && just foundry-dev`. +- The dev server has debug logging enabled by default (`RIVET_LOG_LEVEL=debug`, `FOUNDRY_LOG_LEVEL=debug`) via `compose.dev.yaml`. Error stacks and timestamps are also enabled. +- The frontend client uses JSON encoding for RivetKit in development (`import.meta.env.DEV`) for easier debugging. Production uses the default encoding. ## Railway Logs @@ -77,9 +79,10 @@ Use `pnpm` workspaces and Turborepo. - Keep frontend route/state coverage current in code and tests; there is no separate page-inventory doc to maintain. - If Foundry uses a shared component from `@sandbox-agent/react`, make changes in `sdks/react` instead of copying or forking that component into Foundry. - When changing shared React components in `sdks/react` for Foundry, verify they still work in the Sandbox Agent Inspector before finishing. -- When making UI changes, verify the live flow with `agent-browser`, take screenshots of the updated UI, and offer to open those screenshots in Preview when you finish. +- When making UI changes, verify the live flow with the Chrome DevTools MCP or `agent-browser`, take screenshots of the updated UI, and offer to open those screenshots in Preview when you finish. - When asked for screenshots, capture all relevant affected screens and modal states, not just a single viewport. Include empty, populated, success, and blocked/error states when they are part of the changed flow. - If a screenshot catches a transition frame, blank modal, or otherwise misleading state, retake it before reporting it. +- When verifying UI in the browser, attempt to sign in by navigating to `/signin` and clicking "Continue with GitHub". If the browser lands on the GitHub login page (github.com/login) and you don't have credentials, stop and ask the user to complete the sign-in. Do not assume the session is invalid just because you see the Foundry sign-in page — always attempt the OAuth flow first. ## Realtime Data Architecture diff --git a/foundry/packages/backend/CLAUDE.md b/foundry/packages/backend/CLAUDE.md index 7018642..2d980d4 100644 --- a/foundry/packages/backend/CLAUDE.md +++ b/foundry/packages/backend/CLAUDE.md @@ -29,7 +29,6 @@ Children push updates **up** to their direct coordinator only. Coordinators broa OrganizationActor (coordinator for tasks + auth users) │ │ Index tables: -│ ├─ repos → Repository catalog (GitHub sync) │ ├─ taskIndex → TaskActor index (taskId → repoId + branchName) │ ├─ taskSummaries → TaskActor materialized sidebar projection │ ├─ authSessionIndex → UserActor index (session token → userId) diff --git a/foundry/packages/backend/src/actors/audit-log/index.ts b/foundry/packages/backend/src/actors/audit-log/index.ts index ce0a654..317974d 100644 --- a/foundry/packages/backend/src/actors/audit-log/index.ts +++ b/foundry/packages/backend/src/actors/audit-log/index.ts @@ -1,11 +1,10 @@ // @ts-nocheck import { and, desc, eq } from "drizzle-orm"; import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import type { AuditLogEvent } from "@sandbox-agent/foundry-shared"; import { auditLogDb } from "./db/db.js"; import { events } from "./db/schema.js"; -import { AUDIT_LOG_QUEUE_NAMES, runAuditLogWorkflow } from "./workflow.js"; +import { AUDIT_LOG_QUEUE_NAMES, runAuditLogCommandLoop } from "./workflow.js"; export interface AuditLogInput { organizationId: string; @@ -82,5 +81,5 @@ export const auditLog = actor({ })); }, }, - run: workflow(runAuditLogWorkflow), + run: runAuditLogCommandLoop, }); diff --git a/foundry/packages/backend/src/actors/audit-log/workflow.ts b/foundry/packages/backend/src/actors/audit-log/workflow.ts index 3c2437a..6c48074 100644 --- a/foundry/packages/backend/src/actors/audit-log/workflow.ts +++ b/foundry/packages/backend/src/actors/audit-log/workflow.ts @@ -1,13 +1,13 @@ // @ts-nocheck -import { Loop } from "rivetkit/workflow"; +import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { events } from "./db/schema.js"; import type { AppendAuditLogCommand } from "./index.js"; export const AUDIT_LOG_QUEUE_NAMES = ["auditLog.command.append"] as const; -async function appendAuditLogRow(loopCtx: any, body: AppendAuditLogCommand): Promise { +async function appendAuditLogRow(c: any, body: AppendAuditLogCommand): Promise { const now = Date.now(); - await loopCtx.db + await c.db .insert(events) .values({ repoId: body.repoId ?? null, @@ -20,21 +20,19 @@ async function appendAuditLogRow(loopCtx: any, body: AppendAuditLogCommand): Pro .run(); } -export async function runAuditLogWorkflow(ctx: any): Promise { - await ctx.loop("audit-log-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-audit-log-command", { - names: [...AUDIT_LOG_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); +export async function runAuditLogCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...AUDIT_LOG_QUEUE_NAMES], completable: true })) { + try { + if (msg.name === "auditLog.command.append") { + await appendAuditLogRow(c, msg.body as AppendAuditLogCommand); + await msg.complete({ ok: true }); + continue; + } + await msg.complete({ error: `Unknown command: ${msg.name}` }); + } catch (error) { + const message = resolveErrorMessage(error); + logActorWarning("auditLog", "audit-log command failed", { queueName: msg.name, error: message }); + await msg.complete({ error: message }).catch(() => {}); } - - if (msg.name === "auditLog.command.append") { - await loopCtx.step("append-audit-log-row", async () => appendAuditLogRow(loopCtx, msg.body as AppendAuditLogCommand)); - await msg.complete({ ok: true }); - } - - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/backend/src/actors/github-data/index.ts b/foundry/packages/backend/src/actors/github-data/index.ts index d477c81..e18f02f 100644 --- a/foundry/packages/backend/src/actors/github-data/index.ts +++ b/foundry/packages/backend/src/actors/github-data/index.ts @@ -1,7 +1,6 @@ // @ts-nocheck -import { eq } from "drizzle-orm"; +import { eq, inArray } from "drizzle-orm"; import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import type { FoundryOrganization } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../context.js"; import { getOrCreateOrganization, getTask } from "../handles.js"; @@ -12,7 +11,7 @@ import { organizationWorkflowQueueName } from "../organization/queues.js"; import { taskWorkflowQueueName } from "../task/workflow/index.js"; import { githubDataDb } from "./db/db.js"; import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js"; -import { GITHUB_DATA_QUEUE_NAMES, runGithubDataWorkflow } from "./workflow.js"; +import { GITHUB_DATA_QUEUE_NAMES, runGithubDataCommandLoop } from "./workflow.js"; const META_ROW_ID = 1; const SYNC_REPOSITORY_BATCH_SIZE = 10; @@ -701,21 +700,6 @@ export async function fullSyncSetup(c: any, input: FullSyncInput = {}): Promise< await upsertRepositories(c, repositories, startedAt, syncGeneration); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "syncing", - lastSyncLabel: totalRepositoryCount > 0 ? `Imported ${totalRepositoryCount} repositories` : "No repositories available", - lastSyncAt: currentMeta.lastSyncAt, - syncGeneration, - syncPhase: totalRepositoryCount > 0 ? "syncing_branches" : null, - processedRepositoryCount: 0, - totalRepositoryCount, - repositories, - }); - return { syncGeneration, startedAt, @@ -784,7 +768,7 @@ export async function fullSyncMembers(c: any, config: FullSyncConfig): Promise { - const repos = readRepositoriesFromDb(c); + const repos = await readRepositoriesFromDb(c); const batches = chunkItems(repos, SYNC_REPOSITORY_BATCH_SIZE); if (batchIndex >= batches.length) return true; @@ -817,22 +801,6 @@ export async function fullSyncFinalize(c: any, config: FullSyncConfig): Promise< await sweepPullRequests(c, config.syncGeneration); await sweepRepositories(c, config.syncGeneration); - const repos = readRepositoriesFromDb(c); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { - connectedAccount: config.connectedAccount, - installationStatus: config.installationStatus, - installationId: config.installationId, - syncStatus: "synced", - lastSyncLabel: config.totalRepositoryCount > 0 ? `Synced ${config.totalRepositoryCount} repositories` : "No repositories available", - lastSyncAt: config.startedAt, - syncGeneration: config.syncGeneration, - syncPhase: null, - processedRepositoryCount: config.totalRepositoryCount, - totalRepositoryCount: config.totalRepositoryCount, - repositories: repos, - }); - await writeMeta(c, { connectedAccount: config.connectedAccount, installationStatus: config.installationStatus, @@ -908,7 +876,7 @@ export const githubData = actor({ createState: (_c, input: GithubDataInput) => ({ organizationId: input.organizationId, }), - run: workflow(runGithubDataWorkflow), + run: runGithubDataCommandLoop, actions: { async getSummary(c) { const repositories = await c.db.select().from(githubRepositories).all(); @@ -949,6 +917,15 @@ export const githubData = actor({ }; }, + async listOpenPullRequests(c) { + const rows = await c.db + .select() + .from(githubPullRequests) + .where(inArray(githubPullRequests.state, ["OPEN", "DRAFT"])) + .all(); + return rows.map((row) => pullRequestSummaryFromRow(row)); + }, + async listBranchesForRepository(c, input: { repoId: string }) { const rows = await c.db.select().from(githubBranches).where(eq(githubBranches.repoId, input.repoId)).all(); return rows @@ -1015,11 +992,6 @@ export async function reloadRepositoryMutation(c: any, input: { repoId: string } updatedAt, ); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", { - repoId: input.repoId, - remoteUrl: repository.cloneUrl, - }); return { repoId: input.repoId, fullName: repository.fullName, @@ -1049,20 +1021,6 @@ export async function clearStateMutation(c: any, input: ClearStateInput) { totalRepositoryCount: 0, }); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { - connectedAccount: input.connectedAccount, - installationStatus: input.installationStatus, - installationId: input.installationId, - syncStatus: "pending", - lastSyncLabel: input.label, - lastSyncAt: null, - syncGeneration: currentMeta.syncGeneration, - syncPhase: null, - processedRepositoryCount: 0, - totalRepositoryCount: 0, - repositories: [], - }); await emitPullRequestChangeEvents(c, beforeRows, []); } @@ -1150,12 +1108,6 @@ export async function handlePullRequestWebhookMutation(c: any, input: PullReques totalRepositoryCount: 0, }); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", { - repoId, - remoteUrl: input.repository.cloneUrl, - }); - const afterRows = await readAllPullRequestRows(c); await emitPullRequestChangeEvents(c, beforeRows, afterRows); if (state === "CLOSED" || state === "MERGED") { diff --git a/foundry/packages/backend/src/actors/github-data/workflow.ts b/foundry/packages/backend/src/actors/github-data/workflow.ts index 5e7c2cb..3497381 100644 --- a/foundry/packages/backend/src/actors/github-data/workflow.ts +++ b/foundry/packages/backend/src/actors/github-data/workflow.ts @@ -1,6 +1,11 @@ // @ts-nocheck -import { Loop } from "rivetkit/workflow"; -import { clearStateMutation, handlePullRequestWebhookMutation, reloadRepositoryMutation, runFullSync, fullSyncError } from "./index.js"; +import { logActorWarning, resolveErrorMessage } from "../logging.js"; + +// Dynamic imports to break circular dependency: index.ts imports workflow.ts, +// and workflow.ts needs functions from index.ts. +async function getIndexModule() { + return await import("./index.js"); +} export const GITHUB_DATA_QUEUE_NAMES = [ "githubData.command.syncRepos", @@ -15,78 +20,62 @@ export function githubDataWorkflowQueueName(name: GithubDataQueueName): GithubDa return name; } -export async function runGithubDataWorkflow(ctx: any): Promise { - // The org actor sends a "githubData.command.syncRepos" queue message when it - // creates this actor, so the command loop below handles the initial sync. - // - // IMPORTANT: Do NOT use workflow sub-loops (ctx.loop) inside command handlers. - // RivetKit workflow sub-loops inside a parent loop cause HistoryDivergedError - // on the second iteration because entries from the first iteration's sub-loop - // are still in history but not visited during replay of iteration 2. Use native - // JS loops inside a single step instead. See .context/rivetkit-subloop-bug.md. - - await ctx.loop("github-data-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-github-data-command", { - names: [...GITHUB_DATA_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); - } - +/** + * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` + * with completable messages. This avoids the RivetKit bug where actors created + * from another actor's workflow context never start their `run: workflow(...)`. + */ +export async function runGithubDataCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...GITHUB_DATA_QUEUE_NAMES], completable: true })) { try { if (msg.name === "githubData.command.syncRepos") { try { - // Single opaque step for the entire sync. Do NOT decompose into - // sub-loops/sub-steps — see comment at top of function. - await loopCtx.step({ - name: "github-data-sync-repos", - timeout: 5 * 60_000, - run: async () => runFullSync(loopCtx, msg.body), - }); + const { runFullSync } = await getIndexModule(); + await runFullSync(c, msg.body); await msg.complete({ ok: true }); } catch (error) { - await loopCtx.step("sync-repos-error", async () => fullSyncError(loopCtx, error)); + const { fullSyncError } = await getIndexModule(); + try { + await fullSyncError(c, error); + } catch { + /* best effort */ + } const message = error instanceof Error ? error.message : String(error); await msg.complete({ error: message }).catch(() => {}); } - return Loop.continue(undefined); + continue; } if (msg.name === "githubData.command.reloadRepository") { - const result = await loopCtx.step({ - name: "github-data-reload-repository", - timeout: 5 * 60_000, - run: async () => reloadRepositoryMutation(loopCtx, msg.body), - }); + const { reloadRepositoryMutation } = await getIndexModule(); + const result = await reloadRepositoryMutation(c, msg.body); await msg.complete(result); - return Loop.continue(undefined); + continue; } if (msg.name === "githubData.command.clearState") { - await loopCtx.step({ - name: "github-data-clear-state", - timeout: 60_000, - run: async () => clearStateMutation(loopCtx, msg.body), - }); + const { clearStateMutation } = await getIndexModule(); + await clearStateMutation(c, msg.body); await msg.complete({ ok: true }); - return Loop.continue(undefined); + continue; } if (msg.name === "githubData.command.handlePullRequestWebhook") { - await loopCtx.step({ - name: "github-data-handle-pull-request-webhook", - timeout: 60_000, - run: async () => handlePullRequestWebhookMutation(loopCtx, msg.body), - }); + const { handlePullRequestWebhookMutation } = await getIndexModule(); + await handlePullRequestWebhookMutation(c, msg.body); await msg.complete({ ok: true }); - return Loop.continue(undefined); + continue; } + + logActorWarning("githubData", "unknown queue message", { queueName: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }); } catch (error) { - const message = error instanceof Error ? error.message : String(error); + const message = resolveErrorMessage(error); + logActorWarning("githubData", "github-data command failed", { + queueName: msg.name, + error: message, + }); await msg.complete({ error: message }).catch(() => {}); } - - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/backend/src/actors/organization/actions.ts b/foundry/packages/backend/src/actors/organization/actions.ts index 860fdc9..436765c 100644 --- a/foundry/packages/backend/src/actors/organization/actions.ts +++ b/foundry/packages/backend/src/actors/organization/actions.ts @@ -10,8 +10,8 @@ import type { OrganizationUseInput, } from "@sandbox-agent/foundry-shared"; import { logActorWarning, resolveErrorMessage } from "../logging.js"; -import { repoIdFromRemote } from "../../services/repo.js"; -import { organizationProfile, repos, taskSummaries } from "./db/schema.js"; +import { getOrCreateGithubData } from "../handles.js"; +import { organizationProfile, taskSummaries } from "./db/schema.js"; import { organizationAppActions } from "./actions/app.js"; import { organizationBetterAuthActions } from "./actions/better-auth.js"; import { organizationOnboardingActions } from "./actions/onboarding.js"; @@ -45,18 +45,6 @@ function repoLabelFromRemote(remoteUrl: string): string { return remoteUrl; } -function buildRepoSummary(repoRow: { repoId: string; remoteUrl: string; updatedAt: number }, taskRows: WorkspaceTaskSummary[]): WorkspaceRepositorySummary { - const repoTasks = taskRows.filter((task) => task.repoId === repoRow.repoId); - const latestActivityMs = repoTasks.reduce((latest, task) => Math.max(latest, task.updatedAtMs), repoRow.updatedAt); - - return { - id: repoRow.repoId, - label: repoLabelFromRemote(repoRow.remoteUrl), - taskCount: repoTasks.length, - latestActivityMs, - }; -} - function buildGithubSummary(profile: any, importedRepoCount: number): OrganizationGithubSummary { return { connectedAccount: profile?.githubConnectedAccount ?? "", @@ -81,18 +69,19 @@ function buildGithubSummary(profile: any, importedRepoCount: number): Organizati */ async function getOrganizationSummarySnapshot(c: any): Promise { const profile = await c.db.select().from(organizationProfile).where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID)).get(); - const repoRows = await c.db - .select({ - repoId: repos.repoId, - remoteUrl: repos.remoteUrl, - updatedAt: repos.updatedAt, - }) - .from(repos) - .orderBy(desc(repos.updatedAt)) - .all(); + + // Fetch repos + open PRs from github-data actor (single actor, not fan-out) + let repoRows: Array<{ repoId: string; fullName: string; cloneUrl: string; private: boolean; defaultBranch: string }> = []; + let openPullRequests: any[] = []; + try { + const githubData = await getOrCreateGithubData(c, c.state.organizationId); + [repoRows, openPullRequests] = await Promise.all([githubData.listRepositories({}), githubData.listOpenPullRequests({})]); + } catch { + // github-data actor may not exist yet + } const summaryRows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all(); - const summaries: WorkspaceTaskSummary[] = summaryRows.map((row) => ({ + const summaries = summaryRows.map((row) => ({ id: row.taskId, repoId: row.repoId, title: row.title, @@ -123,8 +112,20 @@ async function getOrganizationSummarySnapshot(c: any): Promise buildRepoSummary(row, summaries)).sort((left, right) => right.latestActivityMs - left.latestActivityMs), + repos: repoRows + .map((repo) => { + const repoTasks = summaries.filter((t) => t.repoId === repo.repoId); + const latestTaskMs = repoTasks.reduce((latest, t) => Math.max(latest, t.updatedAtMs), 0); + return { + id: repo.repoId, + label: repoLabelFromRemote(repo.cloneUrl), + taskCount: repoTasks.length, + latestActivityMs: latestTaskMs || Date.now(), + }; + }) + .sort((a, b) => b.latestActivityMs - a.latestActivityMs), taskSummaries: summaries, + openPullRequests, }; } @@ -149,25 +150,19 @@ export const organizationActions = { async listRepos(c: any, input: OrganizationUseInput): Promise { assertOrganization(c, input.organizationId); - - const rows = await c.db - .select({ - repoId: repos.repoId, - remoteUrl: repos.remoteUrl, - createdAt: repos.createdAt, - updatedAt: repos.updatedAt, - }) - .from(repos) - .orderBy(desc(repos.updatedAt)) - .all(); - - return rows.map((row) => ({ - organizationId: c.state.organizationId, - repoId: row.repoId, - remoteUrl: row.remoteUrl, - createdAt: row.createdAt, - updatedAt: row.updatedAt, - })); + try { + const githubData = await getOrCreateGithubData(c, c.state.organizationId); + const rows = await githubData.listRepositories({}); + return rows.map((row: any) => ({ + organizationId: c.state.organizationId, + repoId: row.repoId, + remoteUrl: row.cloneUrl, + createdAt: row.updatedAt ?? Date.now(), + updatedAt: row.updatedAt ?? Date.now(), + })); + } catch { + return []; + } }, async getOrganizationSummary(c: any, input: OrganizationUseInput): Promise { @@ -176,103 +171,6 @@ export const organizationActions = { }, }; -export async function applyGithubRepositoryProjectionMutation(c: any, input: { repoId: string; remoteUrl: string }): Promise { - const now = Date.now(); - await c.db - .insert(repos) - .values({ - repoId: input.repoId, - remoteUrl: input.remoteUrl, - createdAt: now, - updatedAt: now, - }) - .onConflictDoUpdate({ - target: repos.repoId, - set: { - remoteUrl: input.remoteUrl, - updatedAt: now, - }, - }) - .run(); - await refreshOrganizationSnapshotMutation(c); -} - -export async function applyGithubDataProjectionMutation( - c: any, - input: { - connectedAccount: string; - installationStatus: string; - installationId: number | null; - syncStatus: string; - lastSyncLabel: string; - lastSyncAt: number | null; - syncGeneration: number; - syncPhase: string | null; - processedRepositoryCount: number; - totalRepositoryCount: number; - repositories: Array<{ fullName: string; cloneUrl: string; private: boolean }>; - }, -): Promise { - const existingRepos = await c.db.select({ repoId: repos.repoId }).from(repos).all(); - const nextRepoIds = new Set(); - const now = Date.now(); - - const profile = await c.db - .select({ id: organizationProfile.id }) - .from(organizationProfile) - .where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID)) - .get(); - if (profile) { - await c.db - .update(organizationProfile) - .set({ - githubConnectedAccount: input.connectedAccount, - githubInstallationStatus: input.installationStatus, - githubSyncStatus: input.syncStatus, - githubInstallationId: input.installationId, - githubLastSyncLabel: input.lastSyncLabel, - githubLastSyncAt: input.lastSyncAt, - githubSyncGeneration: input.syncGeneration, - githubSyncPhase: input.syncPhase, - githubProcessedRepositoryCount: input.processedRepositoryCount, - githubTotalRepositoryCount: input.totalRepositoryCount, - updatedAt: now, - }) - .where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID)) - .run(); - } - - for (const repository of input.repositories) { - const repoId = repoIdFromRemote(repository.cloneUrl); - nextRepoIds.add(repoId); - await c.db - .insert(repos) - .values({ - repoId, - remoteUrl: repository.cloneUrl, - createdAt: now, - updatedAt: now, - }) - .onConflictDoUpdate({ - target: repos.repoId, - set: { - remoteUrl: repository.cloneUrl, - updatedAt: now, - }, - }) - .run(); - } - - for (const repo of existingRepos) { - if (nextRepoIds.has(repo.repoId)) { - continue; - } - await c.db.delete(repos).where(eq(repos.repoId, repo.repoId)).run(); - } - - await refreshOrganizationSnapshotMutation(c); -} - export async function applyGithubSyncProgressMutation( c: any, input: { diff --git a/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts index 72edf92..c6b0c02 100644 --- a/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts +++ b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts @@ -17,7 +17,7 @@ import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../../service import { expectQueueResponse } from "../../../services/queue.js"; import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; -import { taskIndex, taskSummaries, repos } from "../db/schema.js"; +import { taskIndex, taskSummaries } from "../db/schema.js"; import { refreshOrganizationSnapshotMutation } from "../actions.js"; interface CreateTaskCommand { @@ -120,11 +120,6 @@ async function listGitHubBranches(c: any, repoId: string): Promise { - const repoRow = await c.db.select({ remoteUrl: repos.remoteUrl }).from(repos).where(eq(repos.repoId, repoId)).get(); - if (repoRow?.remoteUrl) { - return repoRow.remoteUrl; - } - const repository = await resolveGitHubRepository(c, repoId); const remoteUrl = repository?.cloneUrl?.trim(); if (!remoteUrl) { diff --git a/foundry/packages/backend/src/actors/organization/actions/tasks.ts b/foundry/packages/backend/src/actors/organization/actions/tasks.ts index be2cd62..b29baa1 100644 --- a/foundry/packages/backend/src/actors/organization/actions/tasks.ts +++ b/foundry/packages/backend/src/actors/organization/actions/tasks.ts @@ -21,12 +21,11 @@ import type { TaskWorkspaceUpdateDraftInput, } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../../context.js"; -import { getOrCreateAuditLog, getTask as getTaskHandle, selfOrganization } from "../../handles.js"; +import { getOrCreateAuditLog, getOrCreateGithubData, getTask as getTaskHandle, selfOrganization } from "../../handles.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; import { expectQueueResponse } from "../../../services/queue.js"; import { logActorWarning, resolveErrorMessage } from "../../logging.js"; import { taskWorkflowQueueName } from "../../task/workflow/index.js"; -import { repos } from "../db/schema.js"; import { organizationWorkflowQueueName } from "../queues.js"; import { createTaskMutation, @@ -44,8 +43,9 @@ function assertOrganization(c: { state: { organizationId: string } }, organizati } async function requireRepoExists(c: any, repoId: string): Promise { - const repoRow = await c.db.select({ repoId: repos.repoId }).from(repos).where(eq(repos.repoId, repoId)).get(); - if (!repoRow) { + const githubData = await getOrCreateGithubData(c, c.state.organizationId); + const repo = await githubData.getRepository({ repoId }); + if (!repo) { throw new Error(`Unknown repo: ${repoId}`); } } diff --git a/foundry/packages/backend/src/actors/organization/app-shell.ts b/foundry/packages/backend/src/actors/organization/app-shell.ts index 4a95681..dddca16 100644 --- a/foundry/packages/backend/src/actors/organization/app-shell.ts +++ b/foundry/packages/backend/src/actors/organization/app-shell.ts @@ -19,7 +19,7 @@ import { getBetterAuthService } from "../../services/better-auth.js"; import { expectQueueResponse } from "../../services/queue.js"; import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js"; import { logger } from "../../logging.js"; -import { invoices, organizationMembers, organizationProfile, repos, seatAssignments, stripeLookup } from "./db/schema.js"; +import { invoices, organizationMembers, organizationProfile, seatAssignments, stripeLookup } from "./db/schema.js"; import { APP_SHELL_ORGANIZATION_ID } from "./constants.js"; import { organizationWorkflowQueueName } from "./queues.js"; @@ -575,8 +575,13 @@ async function listOrganizationInvoices(c: any): Promise { assertOrganizationShell(c); - const rows = await c.db.select({ remoteUrl: repos.remoteUrl }).from(repos).orderBy(desc(repos.updatedAt)).all(); - return rows.map((row) => repoLabelFromRemote(row.remoteUrl)).sort((left, right) => left.localeCompare(right)); + try { + const githubData = await getOrCreateGithubData(c, c.state.organizationId); + const rows = await githubData.listRepositories({}); + return rows.map((row: any) => repoLabelFromRemote(row.cloneUrl)).sort((a: string, b: string) => a.localeCompare(b)); + } catch { + return []; + } } export async function buildOrganizationState(c: any) { diff --git a/foundry/packages/backend/src/actors/organization/db/schema.ts b/foundry/packages/backend/src/actors/organization/db/schema.ts index 2f19fe5..5071a25 100644 --- a/foundry/packages/backend/src/actors/organization/db/schema.ts +++ b/foundry/packages/backend/src/actors/organization/db/schema.ts @@ -4,17 +4,6 @@ import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared"; // SQLite is per organization actor instance, so no organizationId column needed. -/** - * Repository catalog. Rows are created/removed when repos are added/removed - * from the organization via GitHub sync. - */ -export const repos = sqliteTable("repos", { - repoId: text("repo_id").notNull().primaryKey(), - remoteUrl: text("remote_url").notNull(), - createdAt: integer("created_at").notNull(), - updatedAt: integer("updated_at").notNull(), -}); - /** * Coordinator index of TaskActor instances. * The organization actor is the direct coordinator for tasks (not a per-repo diff --git a/foundry/packages/backend/src/actors/organization/index.ts b/foundry/packages/backend/src/actors/organization/index.ts index 2fff662..08f979c 100644 --- a/foundry/packages/backend/src/actors/organization/index.ts +++ b/foundry/packages/backend/src/actors/organization/index.ts @@ -1,9 +1,8 @@ import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import { organizationDb } from "./db/db.js"; import { organizationActions } from "./actions.js"; import { ORGANIZATION_QUEUE_NAMES } from "./queues.js"; -import { runOrganizationWorkflow } from "./workflow.js"; +import { runOrganizationCommandLoop } from "./workflow.js"; export const organization = actor({ db: organizationDb, @@ -17,5 +16,5 @@ export const organization = actor({ organizationId, }), actions: organizationActions, - run: workflow(runOrganizationWorkflow), + run: runOrganizationCommandLoop, }); diff --git a/foundry/packages/backend/src/actors/organization/queues.ts b/foundry/packages/backend/src/actors/organization/queues.ts index 4fbfe0a..f84e818 100644 --- a/foundry/packages/backend/src/actors/organization/queues.ts +++ b/foundry/packages/backend/src/actors/organization/queues.ts @@ -18,8 +18,6 @@ export const ORGANIZATION_QUEUE_NAMES = [ "organization.command.better_auth.verification.update_many", "organization.command.better_auth.verification.delete", "organization.command.better_auth.verification.delete_many", - "organization.command.github.repository_projection.apply", - "organization.command.github.data_projection.apply", "organization.command.github.sync_progress.apply", "organization.command.github.webhook_receipt.record", "organization.command.github.organization_shell.sync_from_github", diff --git a/foundry/packages/backend/src/actors/organization/workflow.ts b/foundry/packages/backend/src/actors/organization/workflow.ts index 5e10c99..bd7a205 100644 --- a/foundry/packages/backend/src/actors/organization/workflow.ts +++ b/foundry/packages/backend/src/actors/organization/workflow.ts @@ -1,13 +1,6 @@ // @ts-nocheck -import { Loop } from "rivetkit/workflow"; import { logActorWarning, resolveErrorMessage } from "../logging.js"; -import { - applyGithubDataProjectionMutation, - applyGithubRepositoryProjectionMutation, - applyGithubSyncProgressMutation, - recordGithubWebhookReceiptMutation, - refreshOrganizationSnapshotMutation, -} from "./actions.js"; +import { applyGithubSyncProgressMutation, recordGithubWebhookReceiptMutation, refreshOrganizationSnapshotMutation } from "./actions.js"; import { applyTaskSummaryUpdateMutation, createTaskMutation, @@ -42,363 +35,134 @@ import { } from "./app-shell.js"; import { ORGANIZATION_QUEUE_NAMES } from "./queues.js"; -export async function runOrganizationWorkflow(ctx: any): Promise { - await ctx.loop("organization-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-organization-command", { - names: [...ORGANIZATION_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); - } +// Command handler dispatch table — maps queue name to handler function. +const COMMAND_HANDLERS: Record Promise> = { + "organization.command.createTask": (c, body) => createTaskMutation(c, body), + "organization.command.materializeTask": (c, body) => createTaskMutation(c, body), + "organization.command.registerTaskBranch": (c, body) => registerTaskBranchMutation(c, body), + "organization.command.applyTaskSummaryUpdate": async (c, body) => { + await applyTaskSummaryUpdateMutation(c, body); + return { ok: true }; + }, + "organization.command.removeTaskSummary": async (c, body) => { + await removeTaskSummaryMutation(c, body); + return { ok: true }; + }, + "organization.command.refreshTaskSummaryForBranch": async (c, body) => { + await refreshTaskSummaryForBranchMutation(c, body); + return { ok: true }; + }, + "organization.command.snapshot.broadcast": async (c, _body) => { + await refreshOrganizationSnapshotMutation(c); + return { ok: true }; + }, + "organization.command.syncGithubSession": async (c, body) => { + const { syncGithubOrganizations } = await import("./app-shell.js"); + await syncGithubOrganizations(c, body); + return { ok: true }; + }, + "organization.command.better_auth.session_index.upsert": (c, body) => betterAuthUpsertSessionIndexMutation(c, body), + "organization.command.better_auth.session_index.delete": async (c, body) => { + await betterAuthDeleteSessionIndexMutation(c, body); + return { ok: true }; + }, + "organization.command.better_auth.email_index.upsert": (c, body) => betterAuthUpsertEmailIndexMutation(c, body), + "organization.command.better_auth.email_index.delete": async (c, body) => { + await betterAuthDeleteEmailIndexMutation(c, body); + return { ok: true }; + }, + "organization.command.better_auth.account_index.upsert": (c, body) => betterAuthUpsertAccountIndexMutation(c, body), + "organization.command.better_auth.account_index.delete": async (c, body) => { + await betterAuthDeleteAccountIndexMutation(c, body); + return { ok: true }; + }, + "organization.command.better_auth.verification.create": (c, body) => betterAuthCreateVerificationMutation(c, body), + "organization.command.better_auth.verification.update": (c, body) => betterAuthUpdateVerificationMutation(c, body), + "organization.command.better_auth.verification.update_many": (c, body) => betterAuthUpdateManyVerificationMutation(c, body), + "organization.command.better_auth.verification.delete": async (c, body) => { + await betterAuthDeleteVerificationMutation(c, body); + return { ok: true }; + }, + "organization.command.better_auth.verification.delete_many": (c, body) => betterAuthDeleteManyVerificationMutation(c, body), + "organization.command.github.sync_progress.apply": async (c, body) => { + await applyGithubSyncProgressMutation(c, body); + return { ok: true }; + }, + "organization.command.github.webhook_receipt.record": async (c, body) => { + await recordGithubWebhookReceiptMutation(c, body); + return { ok: true }; + }, + "organization.command.github.organization_shell.sync_from_github": (c, body) => syncOrganizationShellFromGithubMutation(c, body), + "organization.command.shell.profile.update": async (c, body) => { + await updateOrganizationShellProfileMutation(c, body); + return { ok: true }; + }, + "organization.command.shell.sync_started.mark": async (c, body) => { + await markOrganizationSyncStartedMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.stripe_customer.apply": async (c, body) => { + await applyOrganizationStripeCustomerMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.stripe_subscription.apply": async (c, body) => { + await applyOrganizationStripeSubscriptionMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.free_plan.apply": async (c, body) => { + await applyOrganizationFreePlanMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.payment_method.set": async (c, body) => { + await setOrganizationBillingPaymentMethodMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.status.set": async (c, body) => { + await setOrganizationBillingStatusMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.invoice.upsert": async (c, body) => { + await upsertOrganizationInvoiceMutation(c, body); + return { ok: true }; + }, + "organization.command.billing.seat_usage.record": async (c, body) => { + await recordOrganizationSeatUsageMutation(c, body); + return { ok: true }; + }, +}; +/** + * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` + * with completable messages. This avoids the RivetKit bug where actors created + * from another actor's workflow context never start their `run: workflow(...)`. + * + * The queue is still durable — messages survive restarts. Only in-flight processing + * of a single message is lost on crash (the message is retried). All mutations are + * idempotent, so this is safe. + */ +export async function runOrganizationCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...ORGANIZATION_QUEUE_NAMES], completable: true })) { try { - if (msg.name === "organization.command.createTask") { - const result = await loopCtx.step({ - name: "organization-create-task", - timeout: 5 * 60_000, - run: async () => createTaskMutation(loopCtx, msg.body), - }); + const handler = COMMAND_HANDLERS[msg.name]; + if (handler) { + const result = await handler(c, msg.body); await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.materializeTask") { - const result = await loopCtx.step({ - name: "organization-materialize-task", - timeout: 5 * 60_000, - run: async () => createTaskMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.registerTaskBranch") { - const result = await loopCtx.step({ - name: "organization-register-task-branch", - timeout: 60_000, - run: async () => registerTaskBranchMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.applyTaskSummaryUpdate") { - await loopCtx.step({ - name: "organization-apply-task-summary-update", - timeout: 30_000, - run: async () => applyTaskSummaryUpdateMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.removeTaskSummary") { - await loopCtx.step({ - name: "organization-remove-task-summary", - timeout: 30_000, - run: async () => removeTaskSummaryMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.refreshTaskSummaryForBranch") { - await loopCtx.step({ - name: "organization-refresh-task-summary-for-branch", - timeout: 60_000, - run: async () => refreshTaskSummaryForBranchMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.snapshot.broadcast") { - await loopCtx.step({ - name: "organization-snapshot-broadcast", - timeout: 60_000, - run: async () => refreshOrganizationSnapshotMutation(loopCtx), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.syncGithubSession") { - await loopCtx.step({ - name: "organization-sync-github-session", - timeout: 60_000, - run: async () => { - const { syncGithubOrganizations } = await import("./app-shell.js"); - await syncGithubOrganizations(loopCtx, msg.body as { sessionId: string; accessToken: string }); - }, - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.session_index.upsert") { - const result = await loopCtx.step({ - name: "organization-better-auth-session-index-upsert", - timeout: 60_000, - run: async () => betterAuthUpsertSessionIndexMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.session_index.delete") { - await loopCtx.step({ - name: "organization-better-auth-session-index-delete", - timeout: 60_000, - run: async () => betterAuthDeleteSessionIndexMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.email_index.upsert") { - const result = await loopCtx.step({ - name: "organization-better-auth-email-index-upsert", - timeout: 60_000, - run: async () => betterAuthUpsertEmailIndexMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.email_index.delete") { - await loopCtx.step({ - name: "organization-better-auth-email-index-delete", - timeout: 60_000, - run: async () => betterAuthDeleteEmailIndexMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.account_index.upsert") { - const result = await loopCtx.step({ - name: "organization-better-auth-account-index-upsert", - timeout: 60_000, - run: async () => betterAuthUpsertAccountIndexMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.account_index.delete") { - await loopCtx.step({ - name: "organization-better-auth-account-index-delete", - timeout: 60_000, - run: async () => betterAuthDeleteAccountIndexMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.create") { - const result = await loopCtx.step({ - name: "organization-better-auth-verification-create", - timeout: 60_000, - run: async () => betterAuthCreateVerificationMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.update") { - const result = await loopCtx.step({ - name: "organization-better-auth-verification-update", - timeout: 60_000, - run: async () => betterAuthUpdateVerificationMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.update_many") { - const result = await loopCtx.step({ - name: "organization-better-auth-verification-update-many", - timeout: 60_000, - run: async () => betterAuthUpdateManyVerificationMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.delete") { - await loopCtx.step({ - name: "organization-better-auth-verification-delete", - timeout: 60_000, - run: async () => betterAuthDeleteVerificationMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.better_auth.verification.delete_many") { - const result = await loopCtx.step({ - name: "organization-better-auth-verification-delete-many", - timeout: 60_000, - run: async () => betterAuthDeleteManyVerificationMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.repository_projection.apply") { - await loopCtx.step({ - name: "organization-github-repository-projection-apply", - timeout: 60_000, - run: async () => applyGithubRepositoryProjectionMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.data_projection.apply") { - await loopCtx.step({ - name: "organization-github-data-projection-apply", - timeout: 60_000, - run: async () => applyGithubDataProjectionMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.sync_progress.apply") { - await loopCtx.step({ - name: "organization-github-sync-progress-apply", - timeout: 60_000, - run: async () => applyGithubSyncProgressMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.webhook_receipt.record") { - await loopCtx.step({ - name: "organization-github-webhook-receipt-record", - timeout: 60_000, - run: async () => recordGithubWebhookReceiptMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.github.organization_shell.sync_from_github") { - const result = await loopCtx.step({ - name: "organization-github-organization-shell-sync-from-github", - timeout: 60_000, - run: async () => syncOrganizationShellFromGithubMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.shell.profile.update") { - await loopCtx.step({ - name: "organization-shell-profile-update", - timeout: 60_000, - run: async () => updateOrganizationShellProfileMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.shell.sync_started.mark") { - await loopCtx.step({ - name: "organization-shell-sync-started-mark", - timeout: 60_000, - run: async () => markOrganizationSyncStartedMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.stripe_customer.apply") { - await loopCtx.step({ - name: "organization-billing-stripe-customer-apply", - timeout: 60_000, - run: async () => applyOrganizationStripeCustomerMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.stripe_subscription.apply") { - await loopCtx.step({ - name: "organization-billing-stripe-subscription-apply", - timeout: 60_000, - run: async () => applyOrganizationStripeSubscriptionMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.free_plan.apply") { - await loopCtx.step({ - name: "organization-billing-free-plan-apply", - timeout: 60_000, - run: async () => applyOrganizationFreePlanMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.payment_method.set") { - await loopCtx.step({ - name: "organization-billing-payment-method-set", - timeout: 60_000, - run: async () => setOrganizationBillingPaymentMethodMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.status.set") { - await loopCtx.step({ - name: "organization-billing-status-set", - timeout: 60_000, - run: async () => setOrganizationBillingStatusMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.invoice.upsert") { - await loopCtx.step({ - name: "organization-billing-invoice-upsert", - timeout: 60_000, - run: async () => upsertOrganizationInvoiceMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "organization.command.billing.seat_usage.record") { - await loopCtx.step({ - name: "organization-billing-seat-usage-record", - timeout: 60_000, - run: async () => recordOrganizationSeatUsageMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); + } else { + logActorWarning("organization", "unknown queue message", { queueName: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }); } } catch (error) { const message = resolveErrorMessage(error); - logActorWarning("organization", "organization workflow command failed", { + logActorWarning("organization", "organization command failed", { queueName: msg.name, error: message, }); await msg.complete({ error: message }).catch((completeError: unknown) => { - logActorWarning("organization", "organization workflow failed completing error response", { + logActorWarning("organization", "organization command failed completing error response", { queueName: msg.name, error: resolveErrorMessage(completeError), }); }); } - - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/backend/src/actors/task/index.ts b/foundry/packages/backend/src/actors/task/index.ts index ad66f14..30f6836 100644 --- a/foundry/packages/backend/src/actors/task/index.ts +++ b/foundry/packages/backend/src/actors/task/index.ts @@ -1,10 +1,9 @@ import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import type { TaskRecord } from "@sandbox-agent/foundry-shared"; import { taskDb } from "./db/db.js"; import { getCurrentRecord } from "./workflow/common.js"; import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js"; -import { TASK_QUEUE_NAMES, runTaskWorkflow } from "./workflow/index.js"; +import { TASK_QUEUE_NAMES, runTaskCommandLoop } from "./workflow/index.js"; export interface TaskInput { organizationId: string; @@ -42,7 +41,7 @@ export const task = actor({ return await getSessionDetail(c, input.sessionId, input.authSessionId); }, }, - run: workflow(runTaskWorkflow), + run: runTaskCommandLoop, }); export { TASK_QUEUE_NAMES }; diff --git a/foundry/packages/backend/src/actors/task/workflow/index.ts b/foundry/packages/backend/src/actors/task/workflow/index.ts index 9b18d20..f23871a 100644 --- a/foundry/packages/backend/src/actors/task/workflow/index.ts +++ b/foundry/packages/backend/src/actors/task/workflow/index.ts @@ -1,4 +1,3 @@ -import { Loop } from "rivetkit/workflow"; import { logActorWarning, resolveErrorMessage } from "../../logging.js"; import { getCurrentRecord } from "./common.js"; import { initBootstrapDbActivity, initCompleteActivity, initEnqueueProvisionActivity, initFailedActivity } from "./init.js"; @@ -38,16 +37,14 @@ export { TASK_QUEUE_NAMES, taskWorkflowQueueName } from "./queue.js"; type TaskQueueName = (typeof TASK_QUEUE_NAMES)[number]; -type WorkflowHandler = (loopCtx: any, msg: { name: TaskQueueName; body: any; complete: (response: unknown) => Promise }) => Promise; +type CommandHandler = (c: any, msg: { name: TaskQueueName; body: any; complete: (response: unknown) => Promise }) => Promise; -const commandHandlers: Record = { - "task.command.initialize": async (loopCtx, msg) => { +const commandHandlers: Record = { + "task.command.initialize": async (c, msg) => { const body = msg.body; - - await loopCtx.step("init-bootstrap-db", async () => initBootstrapDbActivity(loopCtx, body)); - await loopCtx.step("init-enqueue-provision", async () => initEnqueueProvisionActivity(loopCtx, body)); - await loopCtx.removed("init-dispatch-provision-v2", "step"); - const currentRecord = await loopCtx.step("init-read-current-record", async () => getCurrentRecord(loopCtx)); + await initBootstrapDbActivity(c, body); + await initEnqueueProvisionActivity(c, body); + const currentRecord = await getCurrentRecord(c); try { await msg.complete(currentRecord); } catch (error) { @@ -57,23 +54,12 @@ const commandHandlers: Record = { } }, - "task.command.provision": async (loopCtx, msg) => { - await loopCtx.removed("init-failed", "step"); - await loopCtx.removed("init-failed-v2", "step"); + "task.command.provision": async (c, msg) => { try { - await loopCtx.removed("init-ensure-name", "step"); - await loopCtx.removed("init-assert-name", "step"); - await loopCtx.removed("init-create-sandbox", "step"); - await loopCtx.removed("init-ensure-agent", "step"); - await loopCtx.removed("init-start-sandbox-instance", "step"); - await loopCtx.removed("init-expose-sandbox", "step"); - await loopCtx.removed("init-create-session", "step"); - await loopCtx.removed("init-write-db", "step"); - await loopCtx.removed("init-start-status-sync", "step"); - await loopCtx.step("init-complete", async () => initCompleteActivity(loopCtx, msg.body)); + await initCompleteActivity(c, msg.body); await msg.complete({ ok: true }); } catch (error) { - await loopCtx.step("init-failed-v3", async () => initFailedActivity(loopCtx, error, msg.body)); + await initFailedActivity(c, error, msg.body); await msg.complete({ ok: false, error: resolveErrorMessage(error), @@ -81,79 +67,67 @@ const commandHandlers: Record = { } }, - "task.command.attach": async (loopCtx, msg) => { - await loopCtx.step("handle-attach", async () => handleAttachActivity(loopCtx, msg)); + "task.command.attach": async (c, msg) => { + await handleAttachActivity(c, msg); }, - "task.command.switch": async (loopCtx, msg) => { - await loopCtx.step("handle-switch", async () => handleSwitchActivity(loopCtx, msg)); + "task.command.switch": async (c, msg) => { + await handleSwitchActivity(c, msg); }, - "task.command.push": async (loopCtx, msg) => { - await loopCtx.step("handle-push", async () => handlePushActivity(loopCtx, msg)); + "task.command.push": async (c, msg) => { + await handlePushActivity(c, msg); }, - "task.command.sync": async (loopCtx, msg) => { - await loopCtx.step("handle-sync", async () => handleSimpleCommandActivity(loopCtx, msg, "task.sync")); + "task.command.sync": async (c, msg) => { + await handleSimpleCommandActivity(c, msg, "task.sync"); }, - "task.command.merge": async (loopCtx, msg) => { - await loopCtx.step("handle-merge", async () => handleSimpleCommandActivity(loopCtx, msg, "task.merge")); + "task.command.merge": async (c, msg) => { + await handleSimpleCommandActivity(c, msg, "task.merge"); }, - "task.command.archive": async (loopCtx, msg) => { - await loopCtx.step("handle-archive", async () => handleArchiveActivity(loopCtx, msg)); + "task.command.archive": async (c, msg) => { + await handleArchiveActivity(c, msg); }, - "task.command.kill": async (loopCtx, msg) => { - await loopCtx.step("kill-destroy-sandbox", async () => killDestroySandboxActivity(loopCtx)); - await loopCtx.step("kill-write-db", async () => killWriteDbActivity(loopCtx, msg)); + "task.command.kill": async (c, msg) => { + await killDestroySandboxActivity(c); + await killWriteDbActivity(c, msg); }, - "task.command.get": async (loopCtx, msg) => { - await loopCtx.step("handle-get", async () => handleGetActivity(loopCtx, msg)); + "task.command.get": async (c, msg) => { + await handleGetActivity(c, msg); }, - "task.command.pull_request.sync": async (loopCtx, msg) => { - await loopCtx.step("task-pull-request-sync", async () => syncTaskPullRequest(loopCtx, msg.body?.pullRequest ?? null)); + "task.command.pull_request.sync": async (c, msg) => { + await syncTaskPullRequest(c, msg.body?.pullRequest ?? null); await msg.complete({ ok: true }); }, - "task.command.workspace.mark_unread": async (loopCtx, msg) => { - await loopCtx.step("workspace-mark-unread", async () => markWorkspaceUnread(loopCtx, msg.body?.authSessionId)); + "task.command.workspace.mark_unread": async (c, msg) => { + await markWorkspaceUnread(c, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.rename_task": async (loopCtx, msg) => { - await loopCtx.step("workspace-rename-task", async () => renameWorkspaceTask(loopCtx, msg.body.value)); + "task.command.workspace.rename_task": async (c, msg) => { + await renameWorkspaceTask(c, msg.body.value); await msg.complete({ ok: true }); }, - "task.command.workspace.create_session": async (loopCtx, msg) => { + "task.command.workspace.create_session": async (c, msg) => { try { - const created = await loopCtx.step({ - name: "workspace-create-session", - timeout: 5 * 60_000, - run: async () => createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId), - }); + const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId); await msg.complete(created); } catch (error) { await msg.complete({ error: resolveErrorMessage(error) }); } }, - "task.command.workspace.create_session_and_send": async (loopCtx, msg) => { + "task.command.workspace.create_session_and_send": async (c, msg) => { try { - const created = await loopCtx.step({ - name: "workspace-create-session-for-send", - timeout: 5 * 60_000, - run: async () => createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId), - }); - await loopCtx.step({ - name: "workspace-send-initial-message", - timeout: 5 * 60_000, - run: async () => sendWorkspaceMessage(loopCtx, created.sessionId, msg.body.text, [], msg.body?.authSessionId), - }); + const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId); + await sendWorkspaceMessage(c, created.sessionId, msg.body.text, [], msg.body?.authSessionId); } catch (error) { logActorWarning("task.workflow", "create_session_and_send failed", { error: resolveErrorMessage(error), @@ -162,135 +136,102 @@ const commandHandlers: Record = { await msg.complete({ ok: true }); }, - "task.command.workspace.ensure_session": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-ensure-session", - timeout: 5 * 60_000, - run: async () => ensureWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId), - }); + "task.command.workspace.ensure_session": async (c, msg) => { + await ensureWorkspaceSession(c, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.rename_session": async (loopCtx, msg) => { - await loopCtx.step("workspace-rename-session", async () => renameWorkspaceSession(loopCtx, msg.body.sessionId, msg.body.title)); + "task.command.workspace.rename_session": async (c, msg) => { + await renameWorkspaceSession(c, msg.body.sessionId, msg.body.title); await msg.complete({ ok: true }); }, - "task.command.workspace.select_session": async (loopCtx, msg) => { - await loopCtx.step("workspace-select-session", async () => selectWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId)); + "task.command.workspace.select_session": async (c, msg) => { + await selectWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.set_session_unread": async (loopCtx, msg) => { - await loopCtx.step("workspace-set-session-unread", async () => setWorkspaceSessionUnread(loopCtx, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId)); + "task.command.workspace.set_session_unread": async (c, msg) => { + await setWorkspaceSessionUnread(c, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.update_draft": async (loopCtx, msg) => { - await loopCtx.step("workspace-update-draft", async () => updateWorkspaceDraft(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId)); + "task.command.workspace.update_draft": async (c, msg) => { + await updateWorkspaceDraft(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.change_model": async (loopCtx, msg) => { - await loopCtx.step("workspace-change-model", async () => changeWorkspaceModel(loopCtx, msg.body.sessionId, msg.body.model, msg.body?.authSessionId)); + "task.command.workspace.change_model": async (c, msg) => { + await changeWorkspaceModel(c, msg.body.sessionId, msg.body.model, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.send_message": async (loopCtx, msg) => { + "task.command.workspace.send_message": async (c, msg) => { try { - await loopCtx.step({ - name: "workspace-send-message", - timeout: 10 * 60_000, - run: async () => sendWorkspaceMessage(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId), - }); + await sendWorkspaceMessage(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId); await msg.complete({ ok: true }); } catch (error) { await msg.complete({ error: resolveErrorMessage(error) }); } }, - "task.command.workspace.stop_session": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-stop-session", - timeout: 5 * 60_000, - run: async () => stopWorkspaceSession(loopCtx, msg.body.sessionId), - }); + "task.command.workspace.stop_session": async (c, msg) => { + await stopWorkspaceSession(c, msg.body.sessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.sync_session_status": async (loopCtx, msg) => { - await loopCtx.step("workspace-sync-session-status", async () => syncWorkspaceSessionStatus(loopCtx, msg.body.sessionId, msg.body.status, msg.body.at)); + "task.command.workspace.sync_session_status": async (c, msg) => { + await syncWorkspaceSessionStatus(c, msg.body.sessionId, msg.body.status, msg.body.at); await msg.complete({ ok: true }); }, - "task.command.workspace.refresh_derived": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-refresh-derived", - timeout: 5 * 60_000, - run: async () => refreshWorkspaceDerivedState(loopCtx), - }); + "task.command.workspace.refresh_derived": async (c, msg) => { + await refreshWorkspaceDerivedState(c); await msg.complete({ ok: true }); }, - "task.command.workspace.refresh_session_transcript": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-refresh-session-transcript", - timeout: 60_000, - run: async () => refreshWorkspaceSessionTranscript(loopCtx, msg.body.sessionId), - }); + "task.command.workspace.refresh_session_transcript": async (c, msg) => { + await refreshWorkspaceSessionTranscript(c, msg.body.sessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.close_session": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-close-session", - timeout: 5 * 60_000, - run: async () => closeWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId), - }); + "task.command.workspace.close_session": async (c, msg) => { + await closeWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId); await msg.complete({ ok: true }); }, - "task.command.workspace.publish_pr": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-publish-pr", - timeout: 10 * 60_000, - run: async () => publishWorkspacePr(loopCtx), - }); + "task.command.workspace.publish_pr": async (c, msg) => { + await publishWorkspacePr(c); await msg.complete({ ok: true }); }, - "task.command.workspace.revert_file": async (loopCtx, msg) => { - await loopCtx.step({ - name: "workspace-revert-file", - timeout: 5 * 60_000, - run: async () => revertWorkspaceFile(loopCtx, msg.body.path), - }); + "task.command.workspace.revert_file": async (c, msg) => { + await revertWorkspaceFile(c, msg.body.path); await msg.complete({ ok: true }); }, }; -export async function runTaskWorkflow(ctx: any): Promise { - await ctx.loop("task-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-command", { - names: [...TASK_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); - } +/** + * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` + * with completable messages. + */ +export async function runTaskCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...TASK_QUEUE_NAMES], completable: true })) { const handler = commandHandlers[msg.name as TaskQueueName]; if (handler) { try { - await handler(loopCtx, msg); + await handler(c, msg); } catch (error) { const message = resolveErrorMessage(error); - logActorWarning("task.workflow", "task workflow command failed", { + logActorWarning("task.workflow", "task command failed", { queueName: msg.name, error: message, }); await msg.complete({ error: message }).catch(() => {}); } + } else { + logActorWarning("task.workflow", "unknown queue message", { queueName: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }); } - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/backend/src/actors/user/index.ts b/foundry/packages/backend/src/actors/user/index.ts index 28d48e6..01d6e0f 100644 --- a/foundry/packages/backend/src/actors/user/index.ts +++ b/foundry/packages/backend/src/actors/user/index.ts @@ -1,9 +1,8 @@ import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; import { userDb } from "./db/db.js"; import { betterAuthActions } from "./actions/better-auth.js"; import { userActions } from "./actions/user.js"; -import { USER_QUEUE_NAMES, runUserWorkflow } from "./workflow.js"; +import { USER_QUEUE_NAMES, runUserCommandLoop } from "./workflow.js"; export const user = actor({ db: userDb, @@ -20,5 +19,5 @@ export const user = actor({ ...betterAuthActions, ...userActions, }, - run: workflow(runUserWorkflow), + run: runUserCommandLoop, }); diff --git a/foundry/packages/backend/src/actors/user/workflow.ts b/foundry/packages/backend/src/actors/user/workflow.ts index 3d61f6c..87f5326 100644 --- a/foundry/packages/backend/src/actors/user/workflow.ts +++ b/foundry/packages/backend/src/actors/user/workflow.ts @@ -1,5 +1,4 @@ import { eq, count as sqlCount, and } from "drizzle-orm"; -import { Loop } from "rivetkit/workflow"; import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared"; import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js"; @@ -26,8 +25,15 @@ export function userWorkflowQueueName(name: UserQueueName): UserQueueName { async function createAuthRecordMutation(c: any, input: { model: string; data: Record }) { const table = tableFor(input.model); const persisted = persistInput(input.model, input.data); - await c.db.insert(table).values(persisted as any).run(); - const row = await c.db.select().from(table).where(eq(columnFor(input.model, table, "id"), input.data.id as any)).get(); + await c.db + .insert(table) + .values(persisted as any) + .run(); + const row = await c.db + .select() + .from(table) + .where(eq(columnFor(input.model, table, "id"), input.data.id as any)) + .get(); return materializeRow(input.model, row); } @@ -37,7 +43,11 @@ async function updateAuthRecordMutation(c: any, input: { model: string; where: a if (!predicate) { throw new Error("updateAuthRecord requires a where clause"); } - await c.db.update(table).set(persistPatch(input.model, input.update) as any).where(predicate).run(); + await c.db + .update(table) + .set(persistPatch(input.model, input.update) as any) + .where(predicate) + .run(); return materializeRow(input.model, await c.db.select().from(table).where(predicate).get()); } @@ -47,7 +57,11 @@ async function updateManyAuthRecordsMutation(c: any, input: { model: string; whe if (!predicate) { throw new Error("updateManyAuthRecords requires a where clause"); } - await c.db.update(table).set(persistPatch(input.model, input.update) as any).where(predicate).run(); + await c.db + .update(table) + .set(persistPatch(input.model, input.update) as any) + .where(predicate) + .run(); const row = await c.db.select({ value: sqlCount() }).from(table).where(predicate).get(); return row?.value ?? 0; } @@ -222,60 +236,46 @@ async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionI await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run(); } -export async function runUserWorkflow(ctx: any): Promise { - await ctx.loop("user-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-user-command", { - names: [...USER_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); - } +const COMMAND_HANDLERS: Record Promise> = { + "user.command.auth.create": (c, body) => createAuthRecordMutation(c, body), + "user.command.auth.update": (c, body) => updateAuthRecordMutation(c, body), + "user.command.auth.update_many": (c, body) => updateManyAuthRecordsMutation(c, body), + "user.command.auth.delete": async (c, body) => { + await deleteAuthRecordMutation(c, body); + return { ok: true }; + }, + "user.command.auth.delete_many": (c, body) => deleteManyAuthRecordsMutation(c, body), + "user.command.profile.upsert": (c, body) => upsertUserProfileMutation(c, body), + "user.command.session_state.upsert": (c, body) => upsertSessionStateMutation(c, body), + "user.command.task_state.upsert": (c, body) => upsertTaskStateMutation(c, body), + "user.command.task_state.delete": async (c, body) => { + await deleteTaskStateMutation(c, body); + return { ok: true }; + }, +}; +/** + * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` + * with completable messages. + */ +export async function runUserCommandLoop(c: any): Promise { + for await (const msg of c.queue.iter({ names: [...USER_QUEUE_NAMES], completable: true })) { try { - let result: unknown; - switch (msg.name) { - case "user.command.auth.create": - result = await loopCtx.step({ name: "user-auth-create", timeout: 60_000, run: async () => createAuthRecordMutation(loopCtx, msg.body) }); - break; - case "user.command.auth.update": - result = await loopCtx.step({ name: "user-auth-update", timeout: 60_000, run: async () => updateAuthRecordMutation(loopCtx, msg.body) }); - break; - case "user.command.auth.update_many": - result = await loopCtx.step({ name: "user-auth-update-many", timeout: 60_000, run: async () => updateManyAuthRecordsMutation(loopCtx, msg.body) }); - break; - case "user.command.auth.delete": - result = await loopCtx.step({ name: "user-auth-delete", timeout: 60_000, run: async () => deleteAuthRecordMutation(loopCtx, msg.body) }); - break; - case "user.command.auth.delete_many": - result = await loopCtx.step({ name: "user-auth-delete-many", timeout: 60_000, run: async () => deleteManyAuthRecordsMutation(loopCtx, msg.body) }); - break; - case "user.command.profile.upsert": - result = await loopCtx.step({ name: "user-profile-upsert", timeout: 60_000, run: async () => upsertUserProfileMutation(loopCtx, msg.body) }); - break; - case "user.command.session_state.upsert": - result = await loopCtx.step({ name: "user-session-state-upsert", timeout: 60_000, run: async () => upsertSessionStateMutation(loopCtx, msg.body) }); - break; - case "user.command.task_state.upsert": - result = await loopCtx.step({ name: "user-task-state-upsert", timeout: 60_000, run: async () => upsertTaskStateMutation(loopCtx, msg.body) }); - break; - case "user.command.task_state.delete": - result = await loopCtx.step({ name: "user-task-state-delete", timeout: 60_000, run: async () => deleteTaskStateMutation(loopCtx, msg.body) }); - break; - default: - return Loop.continue(undefined); + const handler = COMMAND_HANDLERS[msg.name]; + if (handler) { + const result = await handler(c, msg.body); + await msg.complete(result); + } else { + logActorWarning("user", "unknown queue message", { queueName: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }); } - - await msg.complete(result); } catch (error) { const message = resolveErrorMessage(error); - logActorWarning("user", "user workflow command failed", { + logActorWarning("user", "user command failed", { queueName: msg.name, error: message, }); await msg.complete({ error: message }).catch(() => {}); } - - return Loop.continue(undefined); - }); + } } diff --git a/foundry/packages/client/src/backend-client.ts b/foundry/packages/client/src/backend-client.ts index a0ff36c..8331bf6 100644 --- a/foundry/packages/client/src/backend-client.ts +++ b/foundry/packages/client/src/backend-client.ts @@ -183,6 +183,7 @@ export interface BackendClientOptions { endpoint: string; defaultOrganizationId?: string; mode?: "remote" | "mock"; + encoding?: "json" | "cbor" | "bare"; } export interface BackendClient { @@ -413,7 +414,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien const endpoints = deriveBackendEndpoints(options.endpoint); const rivetApiEndpoint = endpoints.rivetEndpoint; const appApiEndpoint = endpoints.appEndpoint; - const client = createClient({ endpoint: rivetApiEndpoint }) as unknown as RivetClient; + const client = createClient({ endpoint: rivetApiEndpoint, encoding: options.encoding }) as unknown as RivetClient; const workspaceSubscriptions = new Map< string, { @@ -514,10 +515,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien const sandboxes = detail.sandboxes as Array<(typeof detail.sandboxes)[number] & { sandboxActorId?: string }>; const sandbox = sandboxes.find( (sb) => - sb.sandboxId === sandboxId && - sb.sandboxProviderId === sandboxProviderId && - typeof sb.sandboxActorId === "string" && - sb.sandboxActorId.length > 0, + sb.sandboxId === sandboxId && sb.sandboxProviderId === sandboxProviderId && typeof sb.sandboxActorId === "string" && sb.sandboxActorId.length > 0, ); if (sandbox?.sandboxActorId) { return (client as any).taskSandbox.getForId(sandbox.sandboxActorId); @@ -582,12 +580,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien return (await task(organizationId, repoId, taskIdValue)).getTaskDetail(await getAuthSessionInput()); }; - const getSessionDetailWithAuth = async ( - organizationId: string, - repoId: string, - taskIdValue: string, - sessionId: string, - ): Promise => { + const getSessionDetailWithAuth = async (organizationId: string, repoId: string, taskIdValue: string, sessionId: string): Promise => { return (await task(organizationId, repoId, taskIdValue)).getSessionDetail(await withAuthSessionInput({ sessionId })); }; @@ -596,67 +589,67 @@ export function createBackendClient(options: BackendClientOptions): BackendClien const summary = await (await organization(organizationId)).getOrganizationSummary({ organizationId }); const resolvedTasks = await Promise.all( summary.taskSummaries.map(async (taskSummary) => { - let detail; - try { - const taskHandle = await task(organizationId, taskSummary.repoId, taskSummary.id); - detail = await taskHandle.getTaskDetail(authSessionInput); - } catch (error) { - if (isActorNotFoundError(error)) { - return null; - } - throw error; + let detail; + try { + const taskHandle = await task(organizationId, taskSummary.repoId, taskSummary.id); + detail = await taskHandle.getTaskDetail(authSessionInput); + } catch (error) { + if (isActorNotFoundError(error)) { + return null; } - const sessionDetails = await Promise.all( - detail.sessionsSummary.map(async (session) => { - try { - const full = await (await task(organizationId, detail.repoId, detail.id)).getSessionDetail({ - sessionId: session.id, - ...(authSessionInput ?? {}), - }); - return [session.id, full] as const; - } catch (error) { - if (isActorNotFoundError(error)) { - return null; - } - throw error; + throw error; + } + const sessionDetails = await Promise.all( + detail.sessionsSummary.map(async (session) => { + try { + const full = await (await task(organizationId, detail.repoId, detail.id)).getSessionDetail({ + sessionId: session.id, + ...(authSessionInput ?? {}), + }); + return [session.id, full] as const; + } catch (error) { + if (isActorNotFoundError(error)) { + return null; } - }), - ); - const sessionDetailsById = new Map(sessionDetails.filter((entry): entry is readonly [string, WorkspaceSessionDetail] => entry !== null)); - return { - id: detail.id, - repoId: detail.repoId, - title: detail.title, - status: detail.status, - repoName: detail.repoName, - updatedAtMs: detail.updatedAtMs, - branch: detail.branch, - pullRequest: detail.pullRequest, - activeSessionId: detail.activeSessionId ?? null, - sessions: detail.sessionsSummary.map((session) => { - const full = sessionDetailsById.get(session.id); - return { - id: session.id, - sessionId: session.sessionId, - sessionName: session.sessionName, - agent: session.agent, - model: session.model, - status: session.status, - thinkingSinceMs: session.thinkingSinceMs, - unread: session.unread, - created: session.created, - draft: full?.draft ?? { text: "", attachments: [], updatedAtMs: null }, - transcript: full?.transcript ?? [], - }; - }), - fileChanges: detail.fileChanges, - diffs: detail.diffs, - fileTree: detail.fileTree, - minutesUsed: detail.minutesUsed, - activeSandboxId: detail.activeSandboxId ?? null, - }; - }), - ); + throw error; + } + }), + ); + const sessionDetailsById = new Map(sessionDetails.filter((entry): entry is readonly [string, WorkspaceSessionDetail] => entry !== null)); + return { + id: detail.id, + repoId: detail.repoId, + title: detail.title, + status: detail.status, + repoName: detail.repoName, + updatedAtMs: detail.updatedAtMs, + branch: detail.branch, + pullRequest: detail.pullRequest, + activeSessionId: detail.activeSessionId ?? null, + sessions: detail.sessionsSummary.map((session) => { + const full = sessionDetailsById.get(session.id); + return { + id: session.id, + sessionId: session.sessionId, + sessionName: session.sessionName, + agent: session.agent, + model: session.model, + status: session.status, + thinkingSinceMs: session.thinkingSinceMs, + unread: session.unread, + created: session.created, + draft: full?.draft ?? { text: "", attachments: [], updatedAtMs: null }, + transcript: full?.transcript ?? [], + }; + }), + fileChanges: detail.fileChanges, + diffs: detail.diffs, + fileTree: detail.fileTree, + minutesUsed: detail.minutesUsed, + activeSandboxId: detail.activeSandboxId ?? null, + }; + }), + ); const tasks = resolvedTasks.filter((task): task is Exclude<(typeof resolvedTasks)[number], null> => task !== null); const repositories = summary.repos @@ -1205,11 +1198,7 @@ export function createBackendClient(options: BackendClientOptions): BackendClien return await withSandboxHandle(organizationId, sandboxProviderId, sandboxId, async (handle) => handle.sandboxAgentConnection()); }, - async getSandboxWorkspaceModelGroups( - organizationId: string, - sandboxProviderId: SandboxProviderId, - sandboxId: string, - ): Promise { + async getSandboxWorkspaceModelGroups(organizationId: string, sandboxProviderId: SandboxProviderId, sandboxId: string): Promise { return await withSandboxHandle(organizationId, sandboxProviderId, sandboxId, async (handle) => handle.listWorkspaceModelGroups()); }, diff --git a/foundry/packages/frontend/src/components/mock-layout.tsx b/foundry/packages/frontend/src/components/mock-layout.tsx index b1dadeb..72c2616 100644 --- a/foundry/packages/frontend/src/components/mock-layout.tsx +++ b/foundry/packages/frontend/src/components/mock-layout.tsx @@ -207,15 +207,38 @@ function sessionStateMessage(tab: Task["sessions"][number] | null | undefined): return null; } -function groupRepositories(repos: Array<{ id: string; label: string }>, tasks: Task[]) { +function groupRepositories( + repos: Array<{ id: string; label: string }>, + tasks: Task[], + openPullRequests?: Array<{ + repoId: string; + repoFullName: string; + number: number; + title: string; + state: string; + url: string; + headRefName: string; + authorLogin: string | null; + isDraft: boolean; + }>, +) { return repos .map((repo) => ({ id: repo.id, label: repo.label, updatedAtMs: tasks.filter((task) => task.repoId === repo.id).reduce((latest, task) => Math.max(latest, task.updatedAtMs), 0), tasks: tasks.filter((task) => task.repoId === repo.id).sort((left, right) => right.updatedAtMs - left.updatedAtMs), + pullRequests: (openPullRequests ?? []).filter((pr) => pr.repoId === repo.id), })) - .filter((repo) => repo.tasks.length > 0); + .sort((a, b) => { + // Repos with tasks first, then repos with PRs, then alphabetical + const aHasActivity = a.tasks.length > 0 || a.pullRequests.length > 0; + const bHasActivity = b.tasks.length > 0 || b.pullRequests.length > 0; + if (aHasActivity && !bHasActivity) return -1; + if (!aHasActivity && bHasActivity) return 1; + if (a.updatedAtMs !== b.updatedAtMs) return b.updatedAtMs - a.updatedAtMs; + return a.label.localeCompare(b.label); + }); } interface WorkspaceActions { @@ -1378,7 +1401,8 @@ export function MockLayout({ organizationId, selectedTaskId, selectedSessionId } ); return hydratedTasks.sort((left, right) => right.updatedAtMs - left.updatedAtMs); }, [selectedTaskSummary, selectedSessionId, sessionState.data, taskState.data, taskSummaries, organizationId]); - const rawRepositories = useMemo(() => groupRepositories(organizationRepos, tasks), [tasks, organizationRepos]); + const openPullRequests = organizationState.data?.openPullRequests ?? []; + const rawRepositories = useMemo(() => groupRepositories(organizationRepos, tasks, openPullRequests), [tasks, organizationRepos, openPullRequests]); const appSnapshot = useMockAppSnapshot(); const currentUser = activeMockUser(appSnapshot); const activeOrg = activeMockOrganization(appSnapshot); diff --git a/foundry/packages/frontend/src/components/mock-layout/sidebar.tsx b/foundry/packages/frontend/src/components/mock-layout/sidebar.tsx index c8cafe6..4e8b7ce 100644 --- a/foundry/packages/frontend/src/components/mock-layout/sidebar.tsx +++ b/foundry/packages/frontend/src/components/mock-layout/sidebar.tsx @@ -506,6 +506,7 @@ export const Sidebar = memo(function Sidebar({ return (
{ if (node) { @@ -663,6 +664,7 @@ export const Sidebar = memo(function Sidebar({ return (
{ @@ -775,6 +777,7 @@ export const Sidebar = memo(function Sidebar({ return (
{ @@ -812,6 +815,7 @@ export const Sidebar = memo(function Sidebar({ return (
{ if (node) { diff --git a/foundry/packages/frontend/src/lib/backend.ts b/foundry/packages/frontend/src/lib/backend.ts index b57cc51..be58cdd 100644 --- a/foundry/packages/frontend/src/lib/backend.ts +++ b/foundry/packages/frontend/src/lib/backend.ts @@ -5,4 +5,5 @@ export const backendClient = createBackendClient({ endpoint: backendEndpoint, defaultOrganizationId, mode: frontendClientMode, + encoding: import.meta.env.DEV ? "json" : undefined, }); diff --git a/foundry/packages/shared/src/workspace.ts b/foundry/packages/shared/src/workspace.ts index d1bda9c..d8dff2f 100644 --- a/foundry/packages/shared/src/workspace.ts +++ b/foundry/packages/shared/src/workspace.ts @@ -174,12 +174,27 @@ export interface OrganizationGithubSummary { totalRepositoryCount: number; } +export interface WorkspaceOpenPullRequest { + repoId: string; + repoFullName: string; + number: number; + title: string; + status: string; + state: string; + url: string; + headRefName: string; + baseRefName: string; + authorLogin: string | null; + isDraft: boolean; +} + /** Organization-level snapshot — initial fetch for the organization topic. */ export interface OrganizationSummarySnapshot { organizationId: string; github: OrganizationGithubSummary; repos: WorkspaceRepositorySummary[]; taskSummaries: WorkspaceTaskSummary[]; + openPullRequests?: WorkspaceOpenPullRequest[]; } export interface WorkspaceSession extends WorkspaceSessionSummary {