From 78cd38d826852b87ac16103129b984f60a6b3383 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 16 Mar 2026 14:17:24 -0700 Subject: [PATCH] Convert all actors from queues/workflows to direct actions, lazy task creation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major refactor replacing all queue-based workflow communication with direct RivetKit action calls across all actors. This works around a RivetKit bug where c.queue.iter() deadlocks for actors created from another actor's context. Key changes: - All actors (organization, task, user, audit-log, github-data) converted from run: workflow(...) to actions-only (no run handler, no queues) - PR sync creates virtual task entries in org local DB instead of spawning task actors — prevents OOM from 200+ actors created simultaneously - Task actors created lazily on first user interaction via getOrCreate, self-initialize from org's getTaskIndexEntry data - Removed requireRepoExists cross-actor call (caused 500s), replaced with local resolveTaskRepoId from org's taskIndex table - Fixed getOrganizationContext to thread overrides through all sync phases - Fixed sandbox repo path (/home/user/repo for E2B compatibility) - Fixed buildSessionDetail to skip transcript fetch for pending sessions - Added process crash protection (uncaughtException/unhandledRejection) - Fixed React infinite render loop in mock-layout useEffect dependencies - Added sandbox listProcesses error handling for expired E2B sandboxes - Set E2B sandbox timeout to 1 hour (was 5 min default) - Updated CLAUDE.md with lazy task creation rules, no-silent-catch policy, React hook dependency safety rules Co-Authored-By: Claude Opus 4.6 (1M context) --- foundry/CLAUDE.md | 13 +- foundry/packages/backend/CLAUDE.md | 37 +++ .../backend/src/actors/audit-log/index.ts | 21 +- .../backend/src/actors/audit-log/workflow.ts | 38 --- .../backend/src/actors/github-data/index.ts | 98 ++++-- .../src/actors/organization/actions/github.ts | 27 +- .../organization/actions/organization.ts | 20 +- .../organization/actions/task-mutations.ts | 139 +++++---- .../src/actors/organization/actions/tasks.ts | 253 +++++++--------- .../src/actors/organization/app-shell.ts | 211 ++++++------- .../backend/src/actors/organization/index.ts | 12 +- .../src/actors/organization/workflow.ts | 137 +++++---- .../backend/src/actors/sandbox/index.ts | 26 +- .../packages/backend/src/actors/task/index.ts | 12 +- .../src/actors/task/workflow/common.ts | 73 ++++- .../backend/src/actors/task/workflow/index.ts | 279 ++++++++++-------- .../backend/src/actors/task/workflow/init.ts | 6 +- .../backend/src/actors/task/workspace.ts | 91 +++--- .../packages/backend/src/actors/user/index.ts | 45 ++- .../backend/src/actors/user/workflow.ts | 114 +------ foundry/packages/backend/src/index.ts | 13 + .../backend/src/services/better-auth.ts | 92 ++---- foundry/packages/client/src/backend-client.ts | 10 +- .../frontend/src/components/mock-layout.tsx | 7 +- 24 files changed, 887 insertions(+), 887 deletions(-) delete mode 100644 foundry/packages/backend/src/actors/audit-log/workflow.ts diff --git a/foundry/CLAUDE.md b/foundry/CLAUDE.md index 3cb5ee4..268b04c 100644 --- a/foundry/CLAUDE.md +++ b/foundry/CLAUDE.md @@ -144,6 +144,15 @@ The client subscribes to `app` always, `organization` when entering an organizat - Do not add backend git clone paths, `git fetch`, `git for-each-ref`, or direct backend git CLI calls. If you need git data, either read stored GitHub metadata or run the command inside a sandbox. - The `BackendDriver` has no `GitDriver` or `StackDriver`. Only `GithubDriver` and `TmuxDriver` remain. +## React Hook Dependency Safety + +- **Never use unstable references as `useEffect`/`useMemo`/`useCallback` dependencies.** React compares dependencies by reference, not value. Expressions like `?? []`, `?? {}`, `.map(...)`, `.filter(...)`, or object/array literals create new references every render, causing infinite re-render loops when used as dependencies. +- If the upstream value may be `undefined`/`null` and you need a fallback, either: + - Use the raw upstream value as the dependency and apply the fallback inside the effect body: `useEffect(() => { doThing(value ?? []); }, [value]);` + - Derive a stable primitive key: `const key = JSON.stringify(value ?? []);` then depend on `key` + - Memoize: `const stable = useMemo(() => value ?? [], [value]);` +- When reviewing code, treat any `?? []`, `?? {}`, or inline `.map()/.filter()` in a dependency array as a bug. + ## UI System - Foundry's base UI system is `BaseUI` with `Styletron`, plus Foundry-specific theme/tokens on top. Treat that as the default UI foundation. @@ -168,6 +177,7 @@ The client subscribes to `app` always, `organization` when entering an organizat - If the system reaches an unexpected state, raise an explicit error with actionable context. - Do not fail silently, swallow errors, or auto-ignore inconsistent data. - Prefer fail-fast behavior over hidden degradation when correctness is uncertain. +- **Never use bare `catch {}` or `catch { }` blocks.** Every catch must at minimum log the error with `logActorWarning` or `console.warn`. Silent catches hide bugs and make debugging impossible. If a catch is intentionally degrading (e.g. returning empty data when a sandbox is expired), it must still log so operators can see what happened. Use `catch (error) { logActorWarning(..., { error: resolveErrorMessage(error) }); }` or equivalent. ## RivetKit Dependency Policy @@ -208,8 +218,9 @@ For all Rivet/RivetKit implementation: - Do not add custom backend REST endpoints (no `/v1/*` shim layer). - We own the sandbox-agent project; treat sandbox-agent defects as first-party bugs and fix them instead of working around them. - Keep strict single-writer ownership: each table/row has exactly one actor writer. -- Parent actors (`organization`, `repository`, `task`, `history`, `sandbox-instance`) use command-only loops with no timeout. +- Parent actors (`organization`, `task`, `sandbox-instance`) use command-only loops with no timeout. - Periodic syncing lives in dedicated child actors with one timeout cadence each. +- **Task actors must be created lazily** — never during sync or bulk operations. PR sync writes virtual entries to the org's local `taskIndex`/`taskSummaries` tables. The task actor is created on first user interaction via `getOrCreate`. See `packages/backend/CLAUDE.md` "Lazy Task Actor Creation" for details. - Do not build blocking flows that wait on external systems to become ready or complete. Prefer push-based progression driven by actor messages, events, webhooks, or queue/workflow state changes. - Use workflows/background commands for any repo sync, sandbox provisioning, agent install, branch restack/rebase, or other multi-step external work. Do not keep user-facing actions/requests open while that work runs. - `send` policy: always `await` the `send(...)` call itself so enqueue failures surface immediately, but default to `wait: false`. diff --git a/foundry/packages/backend/CLAUDE.md b/foundry/packages/backend/CLAUDE.md index 2d980d4..ae4257e 100644 --- a/foundry/packages/backend/CLAUDE.md +++ b/foundry/packages/backend/CLAUDE.md @@ -49,6 +49,43 @@ OrganizationActor (coordinator for tasks + auth users) When adding a new index table, annotate it in the schema file with a doc comment identifying it as a coordinator index and which child actor it indexes (see existing examples). +## Lazy Task Actor Creation — CRITICAL + +**Task actors must NEVER be created during GitHub sync or bulk operations.** Creating hundreds of task actors simultaneously causes OOM crashes. An org can have 200+ PRs; spawning an actor per PR kills the process. + +### The two creation points + +There are exactly **two** places that may create a task actor: + +1. **`createTaskMutation`** in `task-mutations.ts` — the only backend code that calls `getOrCreateTask`. Triggered by explicit user action ("New Task" button). One actor at a time. + +2. **`backend-client.ts` client helper** — calls `client.task.getOrCreate(...)`. This is the lazy materialization point: when a user clicks a virtual task in the sidebar, the client creates the actor, and it self-initializes in `getCurrentRecord()` (`workflow/common.ts`) by reading branch/title from the org's `getTaskIndexEntry` action. + +### The rule + +### The rule + +**Never use `getOrCreateTask` inside a sync loop, webhook handler, or any bulk operation.** That's what caused the OOM — 186 actors spawned simultaneously during PR sync. + +`getOrCreateTask` IS allowed in: +- `createTaskMutation` — explicit user "New Task" action +- `requireWorkspaceTask` — user-initiated actions (createSession, sendMessage, etc.) that may hit a virtual task +- `getTask` action on the org — called by sandbox actor and client, needs to materialize virtual tasks +- `backend-client.ts` client helper — lazy materialization when user views a task + +### Virtual tasks (PR-driven) + +During PR sync, `refreshTaskSummaryForBranchMutation` is called for every changed PR (via github-data's `emitPullRequestChangeEvents`). It writes **virtual task entries** to the org actor's local `taskIndex` + `taskSummaries` tables only. No task actor is spawned. No cross-actor calls to task actors. + +When the user interacts with a virtual task (clicks it, creates a session): +1. Client or org actor calls `getOrCreate` on the task actor key → actor is created with empty DB +2. Any action on the actor calls `getCurrentRecord()` → sees empty DB → reads branch/title from org's `getTaskIndexEntry` → calls `initBootstrapDbActivity` + `initCompleteActivity` → task is now real + +### Call sites to watch + +- `refreshTaskSummaryForBranchMutation` — called in bulk during sync. Must ONLY write to org local tables. Never create task actors or call task actor actions. +- `emitPullRequestChangeEvents` in github-data — iterates all changed PRs. Must remain fire-and-forget with no actor fan-out. + ## Ownership Rules - `OrganizationActor` is the organization coordinator, direct coordinator for tasks, and lookup/index owner. It owns the task index, task summaries, and repo catalog. diff --git a/foundry/packages/backend/src/actors/audit-log/index.ts b/foundry/packages/backend/src/actors/audit-log/index.ts index 317974d..e189011 100644 --- a/foundry/packages/backend/src/actors/audit-log/index.ts +++ b/foundry/packages/backend/src/actors/audit-log/index.ts @@ -1,10 +1,9 @@ // @ts-nocheck import { and, desc, eq } from "drizzle-orm"; -import { actor, queue } from "rivetkit"; +import { actor } from "rivetkit"; import type { AuditLogEvent } from "@sandbox-agent/foundry-shared"; import { auditLogDb } from "./db/db.js"; import { events } from "./db/schema.js"; -import { AUDIT_LOG_QUEUE_NAMES, runAuditLogCommandLoop } from "./workflow.js"; export interface AuditLogInput { organizationId: string; @@ -36,7 +35,6 @@ export interface ListAuditLogParams { */ export const auditLog = actor({ db: auditLogDb, - queues: Object.fromEntries(AUDIT_LOG_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "Audit Log", icon: "database", @@ -45,6 +43,22 @@ export const auditLog = actor({ organizationId: input.organizationId, }), actions: { + async append(c, body: AppendAuditLogCommand): Promise<{ ok: true }> { + const now = Date.now(); + await c.db + .insert(events) + .values({ + repoId: body.repoId ?? null, + taskId: body.taskId ?? null, + branchName: body.branchName ?? null, + kind: body.kind, + payloadJson: JSON.stringify(body.payload), + createdAt: now, + }) + .run(); + return { ok: true }; + }, + async list(c, params?: ListAuditLogParams): Promise { const whereParts = []; if (params?.repoId) { @@ -81,5 +95,4 @@ export const auditLog = actor({ })); }, }, - run: runAuditLogCommandLoop, }); diff --git a/foundry/packages/backend/src/actors/audit-log/workflow.ts b/foundry/packages/backend/src/actors/audit-log/workflow.ts deleted file mode 100644 index 6c48074..0000000 --- a/foundry/packages/backend/src/actors/audit-log/workflow.ts +++ /dev/null @@ -1,38 +0,0 @@ -// @ts-nocheck -import { logActorWarning, resolveErrorMessage } from "../logging.js"; -import { events } from "./db/schema.js"; -import type { AppendAuditLogCommand } from "./index.js"; - -export const AUDIT_LOG_QUEUE_NAMES = ["auditLog.command.append"] as const; - -async function appendAuditLogRow(c: any, body: AppendAuditLogCommand): Promise { - const now = Date.now(); - await c.db - .insert(events) - .values({ - repoId: body.repoId ?? null, - taskId: body.taskId ?? null, - branchName: body.branchName ?? null, - kind: body.kind, - payloadJson: JSON.stringify(body.payload), - createdAt: now, - }) - .run(); -} - -export async function runAuditLogCommandLoop(c: any): Promise { - for await (const msg of c.queue.iter({ names: [...AUDIT_LOG_QUEUE_NAMES], completable: true })) { - try { - if (msg.name === "auditLog.command.append") { - await appendAuditLogRow(c, msg.body as AppendAuditLogCommand); - await msg.complete({ ok: true }); - continue; - } - await msg.complete({ error: `Unknown command: ${msg.name}` }); - } catch (error) { - const message = resolveErrorMessage(error); - logActorWarning("auditLog", "audit-log command failed", { queueName: msg.name, error: message }); - await msg.complete({ error: message }).catch(() => {}); - } - } -} diff --git a/foundry/packages/backend/src/actors/github-data/index.ts b/foundry/packages/backend/src/actors/github-data/index.ts index e18f02f..a7d65a0 100644 --- a/foundry/packages/backend/src/actors/github-data/index.ts +++ b/foundry/packages/backend/src/actors/github-data/index.ts @@ -1,17 +1,15 @@ // @ts-nocheck import { eq, inArray } from "drizzle-orm"; -import { actor, queue } from "rivetkit"; +import { actor } from "rivetkit"; import type { FoundryOrganization } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../context.js"; import { getOrCreateOrganization, getTask } from "../handles.js"; import { repoIdFromRemote } from "../../services/repo.js"; import { resolveOrganizationGithubAuth } from "../../services/github-auth.js"; -import { expectQueueResponse } from "../../services/queue.js"; -import { organizationWorkflowQueueName } from "../organization/queues.js"; -import { taskWorkflowQueueName } from "../task/workflow/index.js"; +// actions called directly (no queue) import { githubDataDb } from "./db/db.js"; import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js"; -import { GITHUB_DATA_QUEUE_NAMES, runGithubDataCommandLoop } from "./workflow.js"; +// workflow.ts is no longer used — commands are actions now const META_ROW_ID = 1; const SYNC_REPOSITORY_BATCH_SIZE = 10; @@ -76,9 +74,7 @@ interface ClearStateInput { label: string; } -async function sendOrganizationCommand(organization: any, name: Parameters[0], body: unknown): Promise { - await expectQueueResponse<{ ok: true }>(await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 })); -} +// sendOrganizationCommand removed — org actions called directly interface PullRequestWebhookInput { connectedAccount: string; @@ -213,7 +209,7 @@ async function writeMeta(c: any, patch: Partial) { async function publishSyncProgress(c: any, patch: Partial): Promise { const meta = await writeMeta(c, patch); const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.sync_progress.apply", { + await organization.commandApplyGithubSyncProgress({ connectedAccount: meta.connectedAccount, installationStatus: meta.installationStatus, installationId: meta.installationId, @@ -229,21 +225,29 @@ async function publishSyncProgress(c: any, patch: Partial): Pro } async function getOrganizationContext(c: any, overrides?: FullSyncInput) { + // Try to read the org profile for fallback values, but don't require it. + // Webhook-triggered syncs can arrive before the user signs in and creates the + // org profile row. The webhook callers already pass the necessary overrides + // (connectedAccount, installationId, githubLogin, kind), so we can proceed + // without the profile as long as overrides cover the required fields. const organizationHandle = await getOrCreateOrganization(c, c.state.organizationId); const organizationState = await organizationHandle.getOrganizationShellStateIfInitialized({}); - if (!organizationState) { - throw new Error(`Organization ${c.state.organizationId} is not initialized`); + + // If the org profile doesn't exist and overrides don't provide enough context, fail. + if (!organizationState && !overrides?.connectedAccount) { + throw new Error(`Organization ${c.state.organizationId} is not initialized and no override context was provided`); } + const auth = await resolveOrganizationGithubAuth(c, c.state.organizationId); return { - kind: overrides?.kind ?? organizationState.snapshot.kind, - githubLogin: overrides?.githubLogin ?? organizationState.githubLogin, - connectedAccount: overrides?.connectedAccount ?? organizationState.snapshot.github.connectedAccount ?? organizationState.githubLogin, - installationId: overrides?.installationId ?? organizationState.githubInstallationId ?? null, + kind: overrides?.kind ?? organizationState?.snapshot.kind, + githubLogin: overrides?.githubLogin ?? organizationState?.githubLogin, + connectedAccount: overrides?.connectedAccount ?? organizationState?.snapshot.github.connectedAccount ?? organizationState?.githubLogin, + installationId: overrides?.installationId ?? organizationState?.githubInstallationId ?? null, installationStatus: overrides?.installationStatus ?? - organizationState.snapshot.github.installationStatus ?? - (organizationState.snapshot.kind === "personal" ? "connected" : "reconnect_required"), + organizationState?.snapshot.github.installationStatus ?? + (organizationState?.snapshot.kind === "personal" ? "connected" : "reconnect_required"), accessToken: overrides?.accessToken ?? auth?.githubToken ?? null, }; } @@ -420,11 +424,7 @@ async function refreshTaskSummaryForBranch(c: any, repoId: string, branchName: s return; } const organization = await getOrCreateOrganization(c, c.state.organizationId); - await organization.send( - organizationWorkflowQueueName("organization.command.refreshTaskSummaryForBranch"), - { repoId, branchName, pullRequest }, - { wait: false }, - ); + void organization.commandRefreshTaskSummaryForBranch({ repoId, branchName, pullRequest, repoName: repositoryRecord.fullName ?? undefined }).catch(() => {}); } async function emitPullRequestChangeEvents(c: any, beforeRows: any[], afterRows: any[]) { @@ -472,7 +472,7 @@ async function autoArchiveTaskForClosedPullRequest(c: any, row: any) { } try { const task = getTask(c, c.state.organizationId, row.repoId, match.taskId); - await task.send(taskWorkflowQueueName("task.command.archive"), { reason: `PR ${String(row.state).toLowerCase()}` }, { wait: false }); + void task.archive({ reason: `PR ${String(row.state).toLowerCase()}` }).catch(() => {}); } catch { // Best-effort only. Task summary refresh will still clear the PR state. } @@ -721,7 +721,11 @@ export async function fullSyncBranchBatch(c: any, config: FullSyncConfig, batchI if (batchIndex >= batches.length) return true; const batch = batches[batchIndex]!; - const context = await getOrganizationContext(c); + const context = await getOrganizationContext(c, { + connectedAccount: config.connectedAccount, + installationStatus: config.installationStatus as any, + installationId: config.installationId, + }); const batchBranches = (await Promise.all(batch.map((repo) => listRepositoryBranchesForContext(context, repo)))).flat(); await upsertBranches(c, batchBranches, config.startedAt, config.syncGeneration); @@ -757,7 +761,11 @@ export async function fullSyncMembers(c: any, config: FullSyncConfig): Promise= batches.length) return true; const batch = batches[batchIndex]!; - const context = await getOrganizationContext(c); + const context = await getOrganizationContext(c, { + connectedAccount: config.connectedAccount, + installationStatus: config.installationStatus as any, + installationId: config.installationId, + }); const batchPRs = await listPullRequestsForRepositories(context, batch); await upsertPullRequests(c, batchPRs, config.syncGeneration); @@ -801,7 +813,7 @@ export async function fullSyncFinalize(c: any, config: FullSyncConfig): Promise< await sweepPullRequests(c, config.syncGeneration); await sweepRepositories(c, config.syncGeneration); - await writeMeta(c, { + await publishSyncProgress(c, { connectedAccount: config.connectedAccount, installationStatus: config.installationStatus, installationId: config.installationId, @@ -867,16 +879,14 @@ export async function fullSyncError(c: any, error: unknown): Promise { export const githubData = actor({ db: githubDataDb, - queues: Object.fromEntries(GITHUB_DATA_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "GitHub Data", icon: "github", - actionTimeout: 5 * 60_000, + actionTimeout: 10 * 60_000, }, createState: (_c, input: GithubDataInput) => ({ organizationId: input.organizationId, }), - run: runGithubDataCommandLoop, actions: { async getSummary(c) { const repositories = await c.db.select().from(githubRepositories).all(); @@ -935,6 +945,34 @@ export const githubData = actor({ })) .sort((left, right) => left.branchName.localeCompare(right.branchName)); }, + + async syncRepos(c, body: any) { + try { + await runFullSync(c, body); + return { ok: true }; + } catch (error) { + try { + await fullSyncError(c, error); + } catch { + /* best effort */ + } + throw error; + } + }, + + async reloadRepository(c, body: { repoId: string }) { + return await reloadRepositoryMutation(c, body); + }, + + async clearState(c, body: any) { + await clearStateMutation(c, body); + return { ok: true }; + }, + + async handlePullRequestWebhook(c, body: any) { + await handlePullRequestWebhookMutation(c, body); + return { ok: true }; + }, }, }); diff --git a/foundry/packages/backend/src/actors/organization/actions/github.ts b/foundry/packages/backend/src/actors/organization/actions/github.ts index d176d52..ff14d7e 100644 --- a/foundry/packages/backend/src/actors/organization/actions/github.ts +++ b/foundry/packages/backend/src/actors/organization/actions/github.ts @@ -2,16 +2,15 @@ import { desc } from "drizzle-orm"; import type { FoundryAppSnapshot } from "@sandbox-agent/foundry-shared"; import { getOrCreateGithubData, getOrCreateOrganization } from "../../handles.js"; import { authSessionIndex } from "../db/schema.js"; -import { githubDataWorkflowQueueName } from "../../github-data/workflow.js"; import { assertAppOrganization, buildAppSnapshot, requireEligibleOrganization, requireSignedInSession, + markOrganizationSyncStartedMutation, } from "../app-shell.js"; import { getBetterAuthService } from "../../../services/better-auth.js"; -import { expectQueueResponse } from "../../../services/queue.js"; -import { organizationWorkflowQueueName } from "../queues.js"; +import { refreshOrganizationSnapshotMutation } from "../actions.js"; export const organizationGithubActions = { async resolveAppGithubToken( @@ -59,33 +58,21 @@ export const organizationGithubActions = { } const organizationHandle = await getOrCreateOrganization(c, input.organizationId); - await expectQueueResponse<{ ok: true }>( - await organizationHandle.send( - organizationWorkflowQueueName("organization.command.shell.sync_started.mark"), - { label: "Importing repository catalog..." }, - { wait: true, timeout: 10_000 }, - ), - ); - await expectQueueResponse<{ ok: true }>( - await organizationHandle.send(organizationWorkflowQueueName("organization.command.snapshot.broadcast"), {}, { wait: true, timeout: 10_000 }), - ); + await organizationHandle.commandMarkSyncStarted({ label: "Importing repository catalog..." }); + await organizationHandle.commandBroadcastSnapshot({}); - await githubData.send("githubData.command.syncRepos", { label: "Importing repository catalog..." }, { wait: false }); + void githubData.syncRepos({ label: "Importing repository catalog..." }).catch(() => {}); return await buildAppSnapshot(c, input.sessionId); }, async adminReloadGithubOrganization(c: any): Promise { const githubData = await getOrCreateGithubData(c, c.state.organizationId); - await expectQueueResponse<{ ok: true }>( - await githubData.send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Reloading GitHub organization..." }, { wait: true, timeout: 10_000 }), - ); + await githubData.syncRepos({ label: "Reloading GitHub organization..." }); }, async adminReloadGithubRepository(c: any, input: { repoId: string }): Promise { const githubData = await getOrCreateGithubData(c, c.state.organizationId); - await expectQueueResponse( - await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), input, { wait: true, timeout: 10_000 }), - ); + await githubData.reloadRepository(input); }, }; diff --git a/foundry/packages/backend/src/actors/organization/actions/organization.ts b/foundry/packages/backend/src/actors/organization/actions/organization.ts index 8bc3b6d..d38e113 100644 --- a/foundry/packages/backend/src/actors/organization/actions/organization.ts +++ b/foundry/packages/backend/src/actors/organization/actions/organization.ts @@ -1,7 +1,7 @@ import type { FoundryAppSnapshot, UpdateFoundryOrganizationProfileInput, WorkspaceModelId } from "@sandbox-agent/foundry-shared"; import { getBetterAuthService } from "../../../services/better-auth.js"; import { getOrCreateOrganization } from "../../handles.js"; -import { expectQueueResponse } from "../../../services/queue.js"; +// actions called directly (no queue) import { assertAppOrganization, assertOrganizationShell, @@ -11,7 +11,7 @@ import { requireEligibleOrganization, requireSignedInSession, } from "../app-shell.js"; -import { organizationWorkflowQueueName } from "../queues.js"; +// org queue names removed — using direct actions export const organizationShellActions = { async getAppSnapshot(c: any, input: { sessionId: string }): Promise { @@ -35,17 +35,11 @@ export const organizationShellActions = { const session = await requireSignedInSession(c, input.sessionId); requireEligibleOrganization(session, input.organizationId); const organization = await getOrCreateOrganization(c, input.organizationId); - await expectQueueResponse<{ ok: true }>( - await organization.send( - organizationWorkflowQueueName("organization.command.shell.profile.update"), - { - displayName: input.displayName, - slug: input.slug, - primaryDomain: input.primaryDomain, - }, - { wait: true, timeout: 10_000 }, - ), - ); + await organization.commandUpdateShellProfile({ + displayName: input.displayName, + slug: input.slug, + primaryDomain: input.primaryDomain, + }); return await buildAppSnapshot(c, input.sessionId); }, diff --git a/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts index c6b0c02..73abea2 100644 --- a/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts +++ b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts @@ -12,9 +12,9 @@ import type { } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../../context.js"; import { getGithubData, getOrCreateAuditLog, getOrCreateTask, getTask } from "../../handles.js"; -import { taskWorkflowQueueName } from "../../task/workflow/index.js"; +// task actions called directly (no queue) import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../../services/create-flow.js"; -import { expectQueueResponse } from "../../../services/queue.js"; +// actions return directly (no queue response unwrapping) import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; import { taskIndex, taskSummaries } from "../db/schema.js"; @@ -128,6 +128,16 @@ async function resolveRepositoryRemoteUrl(c: any, repoId: string): Promise { const organizationId = c.state.organizationId; const repoId = cmd.repoId; @@ -188,21 +198,12 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis throw error; } - const created = await expectQueueResponse( - await taskHandle.send( - taskWorkflowQueueName("task.command.initialize"), - { - sandboxProviderId: cmd.sandboxProviderId, - branchName: initialBranchName, - title: initialTitle, - task: cmd.task, - }, - { - wait: true, - timeout: 10_000, - }, - ), - ); + const created = await taskHandle.initialize({ + sandboxProviderId: cmd.sandboxProviderId, + branchName: initialBranchName, + title: initialTitle, + task: cmd.task, + }); try { await upsertTaskSummary(c, await taskHandle.getTaskSummary({})); @@ -217,21 +218,15 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis } const auditLog = await getOrCreateAuditLog(c, organizationId); - await auditLog.send( - "auditLog.command.append", - { - kind: "task.created", + void auditLog.append({ + kind: "task.created", + repoId, + taskId, + payload: { repoId, - taskId, - payload: { - repoId, - sandboxProviderId: cmd.sandboxProviderId, - }, + sandboxProviderId: cmd.sandboxProviderId, }, - { - wait: false, - }, - ); + }); try { const taskSummary = await taskHandle.getTaskSummary({}); @@ -319,9 +314,15 @@ export async function removeTaskSummaryMutation(c: any, input: { taskId: string await refreshOrganizationSnapshotMutation(c); } +/** + * Called for every changed PR during sync and on webhook PR events. + * Runs in a bulk loop — MUST NOT create task actors or make cross-actor calls + * to task actors. Only writes to the org's local taskIndex/taskSummaries tables. + * Task actors are created lazily when the user views the task. + */ export async function refreshTaskSummaryForBranchMutation( c: any, - input: { repoId: string; branchName: string; pullRequest?: WorkspacePullRequestSummary | null }, + input: { repoId: string; branchName: string; pullRequest?: WorkspacePullRequestSummary | null; repoName?: string }, ): Promise { const pullRequest = input.pullRequest ?? null; let rows = await c.db @@ -331,34 +332,62 @@ export async function refreshTaskSummaryForBranchMutation( .all(); if (rows.length === 0 && pullRequest) { - const { config } = getActorRuntimeContext(); - const created = await createTaskMutation(c, { - repoId: input.repoId, - task: pullRequest.title?.trim() || `Review ${input.branchName}`, - sandboxProviderId: defaultSandboxProviderId(config), - explicitTitle: pullRequest.title?.trim() || input.branchName, - explicitBranchName: null, - onBranch: input.branchName, - }); - rows = [{ taskId: created.taskId }]; - } + // Create a virtual task entry in the org's local tables only. + // No task actor is spawned — it will be created lazily when the user + // clicks on the task in the sidebar (the "materialize" path). + const taskId = randomUUID(); + const now = Date.now(); + const title = pullRequest.title?.trim() || input.branchName; + const repoName = input.repoName ?? `${c.state.organizationId}/${input.repoId}`; - for (const row of rows) { - try { - const task = getTask(c, c.state.organizationId, input.repoId, row.taskId); - await expectQueueResponse<{ ok: true }>( - await task.send(taskWorkflowQueueName("task.command.pull_request.sync"), { pullRequest }, { wait: true, timeout: 10_000 }), - ); - } catch (error) { - logActorWarning("organization", "failed refreshing task summary for branch", { - organizationId: c.state.organizationId, + await c.db + .insert(taskIndex) + .values({ taskId, repoId: input.repoId, branchName: input.branchName, createdAt: now, updatedAt: now }) + .onConflictDoNothing() + .run(); + + await c.db + .insert(taskSummaries) + .values({ + taskId, repoId: input.repoId, - branchName: input.branchName, - taskId: row.taskId, - error: resolveErrorMessage(error), - }); + title, + status: "init_complete", + repoName, + updatedAtMs: pullRequest.updatedAtMs ?? now, + branch: input.branchName, + pullRequestJson: JSON.stringify(pullRequest), + sessionsSummaryJson: "[]", + }) + .onConflictDoNothing() + .run(); + + rows = [{ taskId }]; + } else { + // Update PR data on existing task summaries locally. + // If a real task actor exists, also notify it. + for (const row of rows) { + // Update the local summary with the new PR data + await c.db + .update(taskSummaries) + .set({ + pullRequestJson: pullRequest ? JSON.stringify(pullRequest) : null, + updatedAtMs: pullRequest?.updatedAtMs ?? Date.now(), + }) + .where(eq(taskSummaries.taskId, row.taskId)) + .run(); + + // Best-effort notify the task actor if it exists (fire-and-forget) + try { + const task = getTask(c, c.state.organizationId, input.repoId, row.taskId); + void task.pullRequestSync({ pullRequest }).catch(() => {}); + } catch { + // Task actor doesn't exist yet — that's fine, it's virtual + } } } + + await refreshOrganizationSnapshotMutation(c); } export function sortOverviewBranches( diff --git a/foundry/packages/backend/src/actors/organization/actions/tasks.ts b/foundry/packages/backend/src/actors/organization/actions/tasks.ts index b29baa1..118ff15 100644 --- a/foundry/packages/backend/src/actors/organization/actions/tasks.ts +++ b/foundry/packages/backend/src/actors/organization/actions/tasks.ts @@ -21,12 +21,10 @@ import type { TaskWorkspaceUpdateDraftInput, } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../../context.js"; -import { getOrCreateAuditLog, getOrCreateGithubData, getTask as getTaskHandle, selfOrganization } from "../../handles.js"; +import { getOrCreateAuditLog, getOrCreateTask, getTask as getTaskHandle } from "../../handles.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; -import { expectQueueResponse } from "../../../services/queue.js"; import { logActorWarning, resolveErrorMessage } from "../../logging.js"; -import { taskWorkflowQueueName } from "../../task/workflow/index.js"; -import { organizationWorkflowQueueName } from "../queues.js"; +import { taskIndex, taskSummaries } from "../db/schema.js"; import { createTaskMutation, getRepoOverviewFromOrg, @@ -42,16 +40,35 @@ function assertOrganization(c: { state: { organizationId: string } }, organizati } } -async function requireRepoExists(c: any, repoId: string): Promise { - const githubData = await getOrCreateGithubData(c, c.state.organizationId); - const repo = await githubData.getRepository({ repoId }); - if (!repo) { - throw new Error(`Unknown repo: ${repoId}`); +/** + * Look up the repoId for a task from the local task index. + * Used when callers (e.g. sandbox actor) only have taskId but need repoId + * to construct the task actor key. + */ +async function resolveTaskRepoId(c: any, taskId: string): Promise { + const row = await c.db.select({ repoId: taskIndex.repoId }).from(taskIndex).where(eq(taskIndex.taskId, taskId)).get(); + if (!row) { + throw new Error(`Task ${taskId} not found in task index`); } + return row.repoId; } +/** + * Get or lazily create a task actor for a user-initiated action. + * Uses getOrCreate because the user may be interacting with a virtual task + * (PR-driven) that has no actor yet. The task actor self-initializes in + * getCurrentRecord() from the org's getTaskIndexEntry data. + * + * This is safe because requireWorkspaceTask is only called from user-initiated + * actions (createSession, sendMessage, etc.), never from sync loops. + * See CLAUDE.md "Lazy Task Actor Creation". + */ async function requireWorkspaceTask(c: any, repoId: string, taskId: string) { - return getTaskHandle(c, c.state.organizationId, repoId, taskId); + return getOrCreateTask(c, c.state.organizationId, repoId, taskId, { + organizationId: c.state.organizationId, + repoId, + taskId, + }); } interface GetTaskInput { @@ -76,46 +93,30 @@ export const organizationTaskActions = { assertOrganization(c, input.organizationId); const { config } = getActorRuntimeContext(); const sandboxProviderId = input.sandboxProviderId ?? defaultSandboxProviderId(config); - await requireRepoExists(c, input.repoId); - const self = selfOrganization(c); - return expectQueueResponse( - await self.send( - organizationWorkflowQueueName("organization.command.createTask"), - { - repoId: input.repoId, - task: input.task, - sandboxProviderId, - explicitTitle: input.explicitTitle ?? null, - explicitBranchName: input.explicitBranchName ?? null, - onBranch: input.onBranch ?? null, - }, - { - wait: true, - timeout: 10_000, - }, - ), - ); + // Self-call: call the mutation directly since we're inside the org actor + return await createTaskMutation(c, { + repoId: input.repoId, + task: input.task, + sandboxProviderId, + explicitTitle: input.explicitTitle ?? null, + explicitBranchName: input.explicitBranchName ?? null, + onBranch: input.onBranch ?? null, + }); }, async materializeTask(c: any, input: { organizationId: string; repoId: string; virtualTaskId: string }): Promise { assertOrganization(c, input.organizationId); const { config } = getActorRuntimeContext(); - const self = selfOrganization(c); - return expectQueueResponse( - await self.send( - organizationWorkflowQueueName("organization.command.materializeTask"), - { - repoId: input.repoId, - task: input.virtualTaskId, - sandboxProviderId: defaultSandboxProviderId(config), - explicitTitle: null, - explicitBranchName: null, - onBranch: null, - }, - { wait: true, timeout: 10_000 }, - ), - ); + // Self-call: call the mutation directly + return await createTaskMutation(c, { + repoId: input.repoId, + task: input.virtualTaskId, + sandboxProviderId: defaultSandboxProviderId(config), + explicitTitle: null, + explicitBranchName: null, + onBranch: null, + }); }, async createWorkspaceTask(c: any, input: TaskWorkspaceCreateTaskInput): Promise<{ taskId: string; sessionId?: string }> { @@ -128,171 +129,117 @@ export const organizationTaskActions = { }); const task = await requireWorkspaceTask(c, input.repoId, created.taskId); - await task.send( - taskWorkflowQueueName("task.command.workspace.create_session_and_send"), - { + void task + .createSessionAndSend({ model: input.model, text: input.task, authSessionId: input.authSessionId, - }, - { wait: false }, - ); + }) + .catch(() => {}); return { taskId: created.taskId }; }, async markWorkspaceUnread(c: any, input: TaskWorkspaceSelectInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await expectQueueResponse<{ ok: true }>( - await task.send(taskWorkflowQueueName("task.command.workspace.mark_unread"), { authSessionId: input.authSessionId }, { wait: true, timeout: 10_000 }), - ); + await task.markUnread({ authSessionId: input.authSessionId }); }, async renameWorkspaceTask(c: any, input: TaskWorkspaceRenameInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await expectQueueResponse<{ ok: true }>( - await task.send(taskWorkflowQueueName("task.command.workspace.rename_task"), { value: input.value }, { wait: true, timeout: 20_000 }), - ); + await task.renameTask({ value: input.value }); }, async createWorkspaceSession(c: any, input: TaskWorkspaceSelectInput & { model?: string }): Promise<{ sessionId: string }> { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - return await expectQueueResponse<{ sessionId: string }>( - await task.send( - taskWorkflowQueueName("task.command.workspace.create_session"), - { - ...(input.model ? { model: input.model } : {}), - ...(input.authSessionId ? { authSessionId: input.authSessionId } : {}), - }, - { wait: true, timeout: 10_000 }, - ), - ); + return await task.createSession({ + ...(input.model ? { model: input.model } : {}), + ...(input.authSessionId ? { authSessionId: input.authSessionId } : {}), + }); }, async renameWorkspaceSession(c: any, input: TaskWorkspaceRenameSessionInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await expectQueueResponse<{ ok: true }>( - await task.send( - taskWorkflowQueueName("task.command.workspace.rename_session"), - { sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId }, - { wait: true, timeout: 10_000 }, - ), - ); + await task.renameSession({ sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId }); }, async selectWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await expectQueueResponse<{ ok: true }>( - await task.send( - taskWorkflowQueueName("task.command.workspace.select_session"), - { sessionId: input.sessionId, authSessionId: input.authSessionId }, - { wait: true, timeout: 10_000 }, - ), - ); + await task.selectSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }); }, async setWorkspaceSessionUnread(c: any, input: TaskWorkspaceSetSessionUnreadInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await expectQueueResponse<{ ok: true }>( - await task.send( - taskWorkflowQueueName("task.command.workspace.set_session_unread"), - { sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId }, - { wait: true, timeout: 10_000 }, - ), - ); + await task.setSessionUnread({ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId }); }, async updateWorkspaceDraft(c: any, input: TaskWorkspaceUpdateDraftInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.send( - taskWorkflowQueueName("task.command.workspace.update_draft"), - { + void task + .updateDraft({ sessionId: input.sessionId, text: input.text, attachments: input.attachments, authSessionId: input.authSessionId, - }, - { wait: false }, - ); + }) + .catch(() => {}); }, async changeWorkspaceModel(c: any, input: TaskWorkspaceChangeModelInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await expectQueueResponse<{ ok: true }>( - await task.send( - taskWorkflowQueueName("task.command.workspace.change_model"), - { sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId }, - { wait: true, timeout: 10_000 }, - ), - ); + await task.changeModel({ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId }); }, async sendWorkspaceMessage(c: any, input: TaskWorkspaceSendMessageInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.send( - taskWorkflowQueueName("task.command.workspace.send_message"), - { + void task + .sendMessage({ sessionId: input.sessionId, text: input.text, attachments: input.attachments, authSessionId: input.authSessionId, - }, - { wait: false }, - ); + }) + .catch(() => {}); }, async stopWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.send( - taskWorkflowQueueName("task.command.workspace.stop_session"), - { sessionId: input.sessionId, authSessionId: input.authSessionId }, - { wait: false }, - ); + void task.stopSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {}); }, async closeWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.send( - taskWorkflowQueueName("task.command.workspace.close_session"), - { sessionId: input.sessionId, authSessionId: input.authSessionId }, - { wait: false }, - ); + void task.closeSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {}); }, async publishWorkspacePr(c: any, input: TaskWorkspaceSelectInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.send(taskWorkflowQueueName("task.command.workspace.publish_pr"), {}, { wait: false }); + void task.publishPr({}).catch(() => {}); }, async revertWorkspaceFile(c: any, input: TaskWorkspaceDiffInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.send(taskWorkflowQueueName("task.command.workspace.revert_file"), input, { wait: false }); + void task.revertFile(input).catch(() => {}); }, async getRepoOverview(c: any, input: RepoOverviewInput): Promise { assertOrganization(c, input.organizationId); - await requireRepoExists(c, input.repoId); + return await getRepoOverviewFromOrg(c, input.repoId); }, async listTasks(c: any, input: ListTasksInput): Promise { assertOrganization(c, input.organizationId); - if (input.repoId) { return await listTaskSummariesForRepo(c, input.repoId, true); } - return await listAllTaskSummaries(c, true); }, async switchTask(c: any, input: { repoId: string; taskId: string }): Promise { - await requireRepoExists(c, input.repoId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); const record = await h.get(); - const switched = await expectQueueResponse<{ switchTarget: string }>( - await h.send(taskWorkflowQueueName("task.command.switch"), {}, { wait: true, timeout: 10_000 }), - ); - + const switched = await h.switchTask({}); return { organizationId: c.state.organizationId, taskId: input.taskId, @@ -303,7 +250,6 @@ export const organizationTaskActions = { async auditLog(c: any, input: HistoryQueryInput): Promise { assertOrganization(c, input.organizationId); - const auditLog = await getOrCreateAuditLog(c, c.state.organizationId); return await auditLog.list({ repoId: input.repoId, @@ -315,52 +261,58 @@ export const organizationTaskActions = { async getTask(c: any, input: GetTaskInput): Promise { assertOrganization(c, input.organizationId); - await requireRepoExists(c, input.repoId); - return await getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId).get(); + // Resolve repoId from local task index if not provided (e.g. sandbox actor only has taskId) + const repoId = input.repoId || (await resolveTaskRepoId(c, input.taskId)); + // Use getOrCreate — the task may be virtual (PR-driven, no actor yet). + // The task actor self-initializes in getCurrentRecord(). + const handle = await getOrCreateTask(c, c.state.organizationId, repoId, input.taskId, { + organizationId: c.state.organizationId, + repoId, + taskId: input.taskId, + }); + return await handle.get(); }, async attachTask(c: any, input: TaskProxyActionInput): Promise<{ target: string; sessionId: string | null }> { assertOrganization(c, input.organizationId); - await requireRepoExists(c, input.repoId); + const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - return await expectQueueResponse<{ target: string; sessionId: string | null }>( - await h.send(taskWorkflowQueueName("task.command.attach"), { reason: input.reason }, { wait: true, timeout: 10_000 }), - ); + return await h.attach({ reason: input.reason }); }, async pushTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepoExists(c, input.repoId); + const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - await h.send(taskWorkflowQueueName("task.command.push"), { reason: input.reason }, { wait: false }); + void h.push({ reason: input.reason }).catch(() => {}); }, async syncTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepoExists(c, input.repoId); + const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - await h.send(taskWorkflowQueueName("task.command.sync"), { reason: input.reason }, { wait: false }); + void h.sync({ reason: input.reason }).catch(() => {}); }, async mergeTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepoExists(c, input.repoId); + const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - await h.send(taskWorkflowQueueName("task.command.merge"), { reason: input.reason }, { wait: false }); + void h.merge({ reason: input.reason }).catch(() => {}); }, async archiveTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepoExists(c, input.repoId); + const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - await h.send(taskWorkflowQueueName("task.command.archive"), { reason: input.reason }, { wait: false }); + void h.archive({ reason: input.reason }).catch(() => {}); }, async killTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepoExists(c, input.repoId); + const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - await h.send(taskWorkflowQueueName("task.command.kill"), { reason: input.reason }, { wait: false }); + void h.kill({ reason: input.reason }).catch(() => {}); }, async getRepositoryMetadata(c: any, input: { repoId: string }): Promise<{ defaultBranch: string | null; fullName: string | null; remoteUrl: string }> { @@ -370,4 +322,19 @@ export const organizationTaskActions = { async findTaskForBranch(c: any, input: { repoId: string; branchName: string }): Promise<{ taskId: string | null }> { return await findTaskForBranch(c, input.repoId, input.branchName); }, + + /** + * Lightweight read of task index + summary data. Used by the task actor + * to self-initialize when lazily materialized from a virtual task. + * Does NOT trigger materialization — no circular dependency. + */ + async getTaskIndexEntry(c: any, input: { taskId: string }): Promise<{ branchName: string | null; title: string | null } | null> { + const idx = await c.db.select({ branchName: taskIndex.branchName }).from(taskIndex).where(eq(taskIndex.taskId, input.taskId)).get(); + const summary = await c.db.select({ title: taskSummaries.title }).from(taskSummaries).where(eq(taskSummaries.taskId, input.taskId)).get(); + if (!idx && !summary) return null; + return { + branchName: idx?.branchName ?? null, + title: summary?.title ?? null, + }; + }, }; diff --git a/foundry/packages/backend/src/actors/organization/app-shell.ts b/foundry/packages/backend/src/actors/organization/app-shell.ts index dddca16..dce5855 100644 --- a/foundry/packages/backend/src/actors/organization/app-shell.ts +++ b/foundry/packages/backend/src/actors/organization/app-shell.ts @@ -13,15 +13,12 @@ import type { import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../context.js"; import { getOrCreateGithubData, getOrCreateOrganization, selfOrganization } from "../handles.js"; -import { githubDataWorkflowQueueName } from "../github-data/workflow.js"; import { GitHubAppError } from "../../services/app-github.js"; import { getBetterAuthService } from "../../services/better-auth.js"; -import { expectQueueResponse } from "../../services/queue.js"; import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js"; import { logger } from "../../logging.js"; import { invoices, organizationMembers, organizationProfile, seatAssignments, stripeLookup } from "./db/schema.js"; import { APP_SHELL_ORGANIZATION_ID } from "./constants.js"; -import { organizationWorkflowQueueName } from "./queues.js"; const githubWebhookLogger = logger.child({ scope: "github-webhook", @@ -142,13 +139,7 @@ function stripeWebhookSubscription(event: any) { }; } -async function sendOrganizationCommand( - organization: any, - name: Parameters[0], - body: unknown, -): Promise { - return expectQueueResponse(await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 })); -} +// sendOrganizationCommand removed — org actions called directly export async function getOrganizationState(organization: any) { return await organization.getOrganizationShellState({}); @@ -491,7 +482,7 @@ async function syncGithubOrganizationsInternal(c: any, input: { sessionId: strin const organizationId = organizationOrganizationId(account.kind, account.githubLogin); const installation = installations.find((candidate) => candidate.accountLogin === account.githubLogin) ?? null; const organization = await getOrCreateOrganization(c, organizationId); - await sendOrganizationCommand<{ organizationId: string }>(organization, "organization.command.github.organization_shell.sync_from_github", { + await organization.commandSyncOrganizationShellFromGithub({ userId: githubUserId, userName: viewer.name || viewer.login, userEmail: viewer.email ?? `${viewer.login}@users.noreply.github.com`, @@ -686,7 +677,7 @@ async function applySubscriptionState( }, fallbackPlanId: FoundryBillingPlanId, ): Promise { - await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.stripe_subscription.apply", { + await organization.commandApplyStripeSubscription({ subscription, fallbackPlanId, }); @@ -702,7 +693,7 @@ export const organizationAppActions = { const organizationState = await getOrganizationState(organizationHandle); if (input.planId === "free") { - await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.free_plan.apply", { + await organizationHandle.commandApplyFreePlan({ clearSubscription: false, }); return { @@ -723,7 +714,7 @@ export const organizationAppActions = { email: session.currentUserEmail, }) ).id; - await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.stripe_customer.apply", { + await organizationHandle.commandApplyStripeCustomer({ customerId, }); await upsertStripeLookupEntries(c, input.organizationId, customerId, null); @@ -753,7 +744,7 @@ export const organizationAppActions = { const completion = await appShell.stripe.retrieveCheckoutCompletion(input.checkoutSessionId); if (completion.customerId) { - await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.stripe_customer.apply", { + await organizationHandle.commandApplyStripeCustomer({ customerId: completion.customerId, }); } @@ -765,7 +756,7 @@ export const organizationAppActions = { } if (completion.paymentMethodLabel) { - await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.payment_method.set", { + await organizationHandle.commandSetPaymentMethod({ label: completion.paymentMethodLabel, }); } @@ -805,7 +796,7 @@ export const organizationAppActions = { await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId); await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id); } else { - await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.status.set", { + await organizationHandle.commandSetBillingStatus({ status: "scheduled_cancel", }); } @@ -826,7 +817,7 @@ export const organizationAppActions = { await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId); await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id); } else { - await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.status.set", { + await organizationHandle.commandSetBillingStatus({ status: "active", }); } @@ -839,7 +830,7 @@ export const organizationAppActions = { const session = await requireSignedInSession(c, input.sessionId); requireEligibleOrganization(session, input.organizationId); const organization = await getOrCreateOrganization(c, input.organizationId); - await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.seat_usage.record", { + await organization.commandRecordSeatUsage({ email: session.currentUserEmail, }); return await buildAppSnapshot(c, input.sessionId); @@ -862,7 +853,7 @@ export const organizationAppActions = { if (organizationId) { const organization = await getOrCreateOrganization(c, organizationId); if (typeof object.customer === "string") { - await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.stripe_customer.apply", { + await organization.commandApplyStripeCustomer({ customerId: object.customer, }); } @@ -897,7 +888,7 @@ export const organizationAppActions = { const organizationId = await findOrganizationIdForStripeEvent(c, subscription.customerId, subscription.id); if (organizationId) { const organization = await getOrCreateOrganization(c, organizationId); - await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.free_plan.apply", { + await organization.commandApplyFreePlan({ clearSubscription: true, }); } @@ -911,7 +902,7 @@ export const organizationAppActions = { const organization = await getOrCreateOrganization(c, organizationId); const rawAmount = typeof invoice.amount_paid === "number" ? invoice.amount_paid : invoice.amount_due; const amountUsd = Math.round((typeof rawAmount === "number" ? rawAmount : 0) / 100); - await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.invoice.upsert", { + await organization.commandUpsertInvoice({ id: String(invoice.id), label: typeof invoice.number === "string" ? `Invoice ${invoice.number}` : "Stripe invoice", issuedAt: formatUnixDate(typeof invoice.created === "number" ? invoice.created : Math.floor(Date.now() / 1000)), @@ -947,7 +938,7 @@ export const organizationAppActions = { const organizationId = organizationOrganizationId(kind, accountLogin); const receivedAt = Date.now(); const organization = await getOrCreateOrganization(c, organizationId); - await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.github.webhook_receipt.record", { + await organization.commandRecordGithubWebhookReceipt({ organizationId: organizationId, event, action: body.action ?? null, @@ -966,61 +957,41 @@ export const organizationAppActions = { "installation_event", ); if (body.action === "deleted") { - await expectQueueResponse<{ ok: true }>( - await githubData.send( - githubDataWorkflowQueueName("githubData.command.clearState"), - { - connectedAccount: accountLogin, - installationStatus: "install_required", - installationId: null, - label: "GitHub App installation removed", - }, - { wait: true, timeout: 10_000 }, - ), - ); + await githubData.clearState({ + connectedAccount: accountLogin, + installationStatus: "install_required", + installationId: null, + label: "GitHub App installation removed", + }); } else if (body.action === "created") { - await expectQueueResponse<{ ok: true }>( - await githubData.send( - githubDataWorkflowQueueName("githubData.command.syncRepos"), - { - connectedAccount: accountLogin, - installationStatus: "connected", - installationId: body.installation?.id ?? null, - githubLogin: accountLogin, - kind, - label: "Syncing GitHub data from installation webhook...", - }, - { wait: true, timeout: 10_000 }, - ), - ); + void githubData + .syncRepos({ + connectedAccount: accountLogin, + installationStatus: "connected", + installationId: body.installation?.id ?? null, + githubLogin: accountLogin, + kind, + label: "Syncing GitHub data from installation webhook...", + }) + .catch(() => {}); } else if (body.action === "suspend") { - await expectQueueResponse<{ ok: true }>( - await githubData.send( - githubDataWorkflowQueueName("githubData.command.clearState"), - { - connectedAccount: accountLogin, - installationStatus: "reconnect_required", - installationId: body.installation?.id ?? null, - label: "GitHub App installation suspended", - }, - { wait: true, timeout: 10_000 }, - ), - ); + await githubData.clearState({ + connectedAccount: accountLogin, + installationStatus: "reconnect_required", + installationId: body.installation?.id ?? null, + label: "GitHub App installation suspended", + }); } else if (body.action === "unsuspend") { - await expectQueueResponse<{ ok: true }>( - await githubData.send( - githubDataWorkflowQueueName("githubData.command.syncRepos"), - { - connectedAccount: accountLogin, - installationStatus: "connected", - installationId: body.installation?.id ?? null, - githubLogin: accountLogin, - kind, - label: "Resyncing GitHub data after unsuspend...", - }, - { wait: true, timeout: 10_000 }, - ), - ); + void githubData + .syncRepos({ + connectedAccount: accountLogin, + installationStatus: "connected", + installationId: body.installation?.id ?? null, + githubLogin: accountLogin, + kind, + label: "Resyncing GitHub data after unsuspend...", + }) + .catch(() => {}); } return { ok: true }; } @@ -1037,20 +1008,16 @@ export const organizationAppActions = { }, "repository_membership_changed", ); - await expectQueueResponse<{ ok: true }>( - await githubData.send( - githubDataWorkflowQueueName("githubData.command.syncRepos"), - { - connectedAccount: accountLogin, - installationStatus: "connected", - installationId: body.installation?.id ?? null, - githubLogin: accountLogin, - kind, - label: "Resyncing GitHub data after repository access change...", - }, - { wait: true, timeout: 10_000 }, - ), - ); + void githubData + .syncRepos({ + connectedAccount: accountLogin, + installationStatus: "connected", + installationId: body.installation?.id ?? null, + githubLogin: accountLogin, + kind, + label: "Resyncing GitHub data after repository access change...", + }) + .catch(() => {}); return { ok: true }; } @@ -1078,43 +1045,35 @@ export const organizationAppActions = { "repository_event", ); if (event === "pull_request" && body.repository?.clone_url && body.pull_request) { - await expectQueueResponse<{ ok: true }>( - await githubData.send( - githubDataWorkflowQueueName("githubData.command.handlePullRequestWebhook"), - { - connectedAccount: accountLogin, - installationStatus: "connected", - installationId: body.installation?.id ?? null, - repository: { - fullName: body.repository.full_name, - cloneUrl: body.repository.clone_url, - private: Boolean(body.repository.private), - }, - pullRequest: { - number: body.pull_request.number, - status: body.pull_request.draft ? "draft" : "ready", - title: body.pull_request.title ?? "", - body: body.pull_request.body ?? null, - state: body.pull_request.state ?? "open", - url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`, - headRefName: body.pull_request.head?.ref ?? "", - baseRefName: body.pull_request.base?.ref ?? "", - authorLogin: body.pull_request.user?.login ?? null, - isDraft: Boolean(body.pull_request.draft), - merged: Boolean(body.pull_request.merged), - }, - }, - { wait: true, timeout: 10_000 }, - ), - ); + await githubData.handlePullRequestWebhook({ + connectedAccount: accountLogin, + installationStatus: "connected", + installationId: body.installation?.id ?? null, + repository: { + fullName: body.repository.full_name, + cloneUrl: body.repository.clone_url, + private: Boolean(body.repository.private), + }, + pullRequest: { + number: body.pull_request.number, + status: body.pull_request.draft ? "draft" : "ready", + title: body.pull_request.title ?? "", + body: body.pull_request.body ?? null, + state: body.pull_request.state ?? "open", + url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`, + headRefName: body.pull_request.head?.ref ?? "", + baseRefName: body.pull_request.base?.ref ?? "", + authorLogin: body.pull_request.user?.login ?? null, + isDraft: Boolean(body.pull_request.draft), + merged: Boolean(body.pull_request.merged), + }, + }); } if ((event === "push" || event === "create" || event === "delete") && body.repository?.clone_url) { const repoId = repoIdFromRemote(body.repository.clone_url); const knownRepository = await githubData.getRepository({ repoId }); if (knownRepository) { - await expectQueueResponse( - await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), { repoId }, { wait: true, timeout: 10_000 }), - ); + await githubData.reloadRepository({ repoId }); } } } @@ -1272,18 +1231,16 @@ export async function syncOrganizationShellFromGithubMutation( const needsInitialSync = installationStatus === "connected" && syncStatus === "pending"; if (needsInitialSync) { const githubData = await getOrCreateGithubData(c, organizationId); - await githubData.send( - githubDataWorkflowQueueName("githubData.command.syncRepos"), - { + void githubData + .syncRepos({ connectedAccount: input.githubLogin, installationStatus: "connected", installationId: input.installationId, githubLogin: input.githubLogin, kind: input.kind, label: "Initial repository sync...", - }, - { wait: false }, - ); + }) + .catch(() => {}); } return { organizationId }; diff --git a/foundry/packages/backend/src/actors/organization/index.ts b/foundry/packages/backend/src/actors/organization/index.ts index 08f979c..1bd8896 100644 --- a/foundry/packages/backend/src/actors/organization/index.ts +++ b/foundry/packages/backend/src/actors/organization/index.ts @@ -1,12 +1,10 @@ -import { actor, queue } from "rivetkit"; +import { actor } from "rivetkit"; import { organizationDb } from "./db/db.js"; import { organizationActions } from "./actions.js"; -import { ORGANIZATION_QUEUE_NAMES } from "./queues.js"; -import { runOrganizationCommandLoop } from "./workflow.js"; +import { organizationCommandActions } from "./workflow.js"; export const organization = actor({ db: organizationDb, - queues: Object.fromEntries(ORGANIZATION_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "Organization", icon: "compass", @@ -15,6 +13,8 @@ export const organization = actor({ createState: (_c, organizationId: string) => ({ organizationId, }), - actions: organizationActions, - run: runOrganizationCommandLoop, + actions: { + ...organizationActions, + ...organizationCommandActions, + }, }); diff --git a/foundry/packages/backend/src/actors/organization/workflow.ts b/foundry/packages/backend/src/actors/organization/workflow.ts index bd7a205..189225b 100644 --- a/foundry/packages/backend/src/actors/organization/workflow.ts +++ b/foundry/packages/backend/src/actors/organization/workflow.ts @@ -1,5 +1,8 @@ // @ts-nocheck -import { logActorWarning, resolveErrorMessage } from "../logging.js"; +/** + * Organization command actions — converted from queue handlers to direct actions. + * Each export becomes an action on the organization actor. + */ import { applyGithubSyncProgressMutation, recordGithubWebhookReceiptMutation, refreshOrganizationSnapshotMutation } from "./actions.js"; import { applyTaskSummaryUpdateMutation, @@ -33,136 +36,128 @@ import { updateOrganizationShellProfileMutation, upsertOrganizationInvoiceMutation, } from "./app-shell.js"; -import { ORGANIZATION_QUEUE_NAMES } from "./queues.js"; -// Command handler dispatch table — maps queue name to handler function. -const COMMAND_HANDLERS: Record Promise> = { - "organization.command.createTask": (c, body) => createTaskMutation(c, body), - "organization.command.materializeTask": (c, body) => createTaskMutation(c, body), - "organization.command.registerTaskBranch": (c, body) => registerTaskBranchMutation(c, body), - "organization.command.applyTaskSummaryUpdate": async (c, body) => { +export const organizationCommandActions = { + async commandCreateTask(c: any, body: any) { + return await createTaskMutation(c, body); + }, + async commandMaterializeTask(c: any, body: any) { + return await createTaskMutation(c, body); + }, + async commandRegisterTaskBranch(c: any, body: any) { + return await registerTaskBranchMutation(c, body); + }, + async commandApplyTaskSummaryUpdate(c: any, body: any) { await applyTaskSummaryUpdateMutation(c, body); return { ok: true }; }, - "organization.command.removeTaskSummary": async (c, body) => { + async commandRemoveTaskSummary(c: any, body: any) { await removeTaskSummaryMutation(c, body); return { ok: true }; }, - "organization.command.refreshTaskSummaryForBranch": async (c, body) => { + async commandRefreshTaskSummaryForBranch(c: any, body: any) { await refreshTaskSummaryForBranchMutation(c, body); return { ok: true }; }, - "organization.command.snapshot.broadcast": async (c, _body) => { + async commandBroadcastSnapshot(c: any, _body: any) { await refreshOrganizationSnapshotMutation(c); return { ok: true }; }, - "organization.command.syncGithubSession": async (c, body) => { + async commandSyncGithubSession(c: any, body: any) { const { syncGithubOrganizations } = await import("./app-shell.js"); await syncGithubOrganizations(c, body); return { ok: true }; }, - "organization.command.better_auth.session_index.upsert": (c, body) => betterAuthUpsertSessionIndexMutation(c, body), - "organization.command.better_auth.session_index.delete": async (c, body) => { + + // Better Auth index actions + async commandBetterAuthSessionIndexUpsert(c: any, body: any) { + return await betterAuthUpsertSessionIndexMutation(c, body); + }, + async commandBetterAuthSessionIndexDelete(c: any, body: any) { await betterAuthDeleteSessionIndexMutation(c, body); return { ok: true }; }, - "organization.command.better_auth.email_index.upsert": (c, body) => betterAuthUpsertEmailIndexMutation(c, body), - "organization.command.better_auth.email_index.delete": async (c, body) => { + async commandBetterAuthEmailIndexUpsert(c: any, body: any) { + return await betterAuthUpsertEmailIndexMutation(c, body); + }, + async commandBetterAuthEmailIndexDelete(c: any, body: any) { await betterAuthDeleteEmailIndexMutation(c, body); return { ok: true }; }, - "organization.command.better_auth.account_index.upsert": (c, body) => betterAuthUpsertAccountIndexMutation(c, body), - "organization.command.better_auth.account_index.delete": async (c, body) => { + async commandBetterAuthAccountIndexUpsert(c: any, body: any) { + return await betterAuthUpsertAccountIndexMutation(c, body); + }, + async commandBetterAuthAccountIndexDelete(c: any, body: any) { await betterAuthDeleteAccountIndexMutation(c, body); return { ok: true }; }, - "organization.command.better_auth.verification.create": (c, body) => betterAuthCreateVerificationMutation(c, body), - "organization.command.better_auth.verification.update": (c, body) => betterAuthUpdateVerificationMutation(c, body), - "organization.command.better_auth.verification.update_many": (c, body) => betterAuthUpdateManyVerificationMutation(c, body), - "organization.command.better_auth.verification.delete": async (c, body) => { + async commandBetterAuthVerificationCreate(c: any, body: any) { + return await betterAuthCreateVerificationMutation(c, body); + }, + async commandBetterAuthVerificationUpdate(c: any, body: any) { + return await betterAuthUpdateVerificationMutation(c, body); + }, + async commandBetterAuthVerificationUpdateMany(c: any, body: any) { + return await betterAuthUpdateManyVerificationMutation(c, body); + }, + async commandBetterAuthVerificationDelete(c: any, body: any) { await betterAuthDeleteVerificationMutation(c, body); return { ok: true }; }, - "organization.command.better_auth.verification.delete_many": (c, body) => betterAuthDeleteManyVerificationMutation(c, body), - "organization.command.github.sync_progress.apply": async (c, body) => { + async commandBetterAuthVerificationDeleteMany(c: any, body: any) { + return await betterAuthDeleteManyVerificationMutation(c, body); + }, + + // GitHub sync actions + async commandApplyGithubSyncProgress(c: any, body: any) { await applyGithubSyncProgressMutation(c, body); return { ok: true }; }, - "organization.command.github.webhook_receipt.record": async (c, body) => { + async commandRecordGithubWebhookReceipt(c: any, body: any) { await recordGithubWebhookReceiptMutation(c, body); return { ok: true }; }, - "organization.command.github.organization_shell.sync_from_github": (c, body) => syncOrganizationShellFromGithubMutation(c, body), - "organization.command.shell.profile.update": async (c, body) => { + async commandSyncOrganizationShellFromGithub(c: any, body: any) { + return await syncOrganizationShellFromGithubMutation(c, body); + }, + + // Shell/profile actions + async commandUpdateShellProfile(c: any, body: any) { await updateOrganizationShellProfileMutation(c, body); return { ok: true }; }, - "organization.command.shell.sync_started.mark": async (c, body) => { + async commandMarkSyncStarted(c: any, body: any) { await markOrganizationSyncStartedMutation(c, body); return { ok: true }; }, - "organization.command.billing.stripe_customer.apply": async (c, body) => { + + // Billing actions + async commandApplyStripeCustomer(c: any, body: any) { await applyOrganizationStripeCustomerMutation(c, body); return { ok: true }; }, - "organization.command.billing.stripe_subscription.apply": async (c, body) => { + async commandApplyStripeSubscription(c: any, body: any) { await applyOrganizationStripeSubscriptionMutation(c, body); return { ok: true }; }, - "organization.command.billing.free_plan.apply": async (c, body) => { + async commandApplyFreePlan(c: any, body: any) { await applyOrganizationFreePlanMutation(c, body); return { ok: true }; }, - "organization.command.billing.payment_method.set": async (c, body) => { + async commandSetPaymentMethod(c: any, body: any) { await setOrganizationBillingPaymentMethodMutation(c, body); return { ok: true }; }, - "organization.command.billing.status.set": async (c, body) => { + async commandSetBillingStatus(c: any, body: any) { await setOrganizationBillingStatusMutation(c, body); return { ok: true }; }, - "organization.command.billing.invoice.upsert": async (c, body) => { + async commandUpsertInvoice(c: any, body: any) { await upsertOrganizationInvoiceMutation(c, body); return { ok: true }; }, - "organization.command.billing.seat_usage.record": async (c, body) => { + async commandRecordSeatUsage(c: any, body: any) { await recordOrganizationSeatUsageMutation(c, body); return { ok: true }; }, }; - -/** - * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` - * with completable messages. This avoids the RivetKit bug where actors created - * from another actor's workflow context never start their `run: workflow(...)`. - * - * The queue is still durable — messages survive restarts. Only in-flight processing - * of a single message is lost on crash (the message is retried). All mutations are - * idempotent, so this is safe. - */ -export async function runOrganizationCommandLoop(c: any): Promise { - for await (const msg of c.queue.iter({ names: [...ORGANIZATION_QUEUE_NAMES], completable: true })) { - try { - const handler = COMMAND_HANDLERS[msg.name]; - if (handler) { - const result = await handler(c, msg.body); - await msg.complete(result); - } else { - logActorWarning("organization", "unknown queue message", { queueName: msg.name }); - await msg.complete({ error: `Unknown command: ${msg.name}` }); - } - } catch (error) { - const message = resolveErrorMessage(error); - logActorWarning("organization", "organization command failed", { - queueName: msg.name, - error: message, - }); - await msg.complete({ error: message }).catch((completeError: unknown) => { - logActorWarning("organization", "organization command failed completing error response", { - queueName: msg.name, - error: resolveErrorMessage(completeError), - }); - }); - } - } -} diff --git a/foundry/packages/backend/src/actors/sandbox/index.ts b/foundry/packages/backend/src/actors/sandbox/index.ts index 58186f2..c565a2d 100644 --- a/foundry/packages/backend/src/actors/sandbox/index.ts +++ b/foundry/packages/backend/src/actors/sandbox/index.ts @@ -6,9 +6,10 @@ import { DEFAULT_WORKSPACE_MODEL_GROUPS, workspaceModelGroupsFromSandboxAgents, import { SandboxAgent } from "sandbox-agent"; import { getActorRuntimeContext } from "../context.js"; import { organizationKey } from "../keys.js"; +import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { resolveSandboxProviderId } from "../../sandbox-config.js"; -const SANDBOX_REPO_CWD = "/home/sandbox/organization/repo"; +const SANDBOX_REPO_CWD = "/home/user/repo"; const DEFAULT_LOCAL_SANDBOX_IMAGE = "rivetdev/sandbox-agent:full"; const DEFAULT_LOCAL_SANDBOX_PORT = 2468; const dockerClient = new Dockerode({ socketPath: "/var/run/docker.sock" }); @@ -204,6 +205,10 @@ const baseTaskSandbox = sandboxActor({ create: () => ({ template: config.sandboxProviders.e2b.template ?? "sandbox-agent-full-0.3.x", envs: sandboxEnvObject(), + // Default E2B timeout is 5 minutes which is too short for task work. + // Set to 1 hour. TODO: use betaCreate + autoPause instead so sandboxes + // pause (preserving state) rather than being killed on timeout. + timeoutMs: 60 * 60 * 1000, }), installAgents: ["claude", "codex"], }); @@ -220,8 +225,12 @@ async function broadcastProcesses(c: any, actions: Record { + try { + return await baseActions.listProcesses(c); + } catch (error) { + // Sandbox may be gone (E2B timeout, destroyed, etc.) — degrade to empty + logActorWarning("taskSandbox", "listProcesses failed, sandbox may be expired", { + sandboxId: c.state.sandboxId, + error: resolveErrorMessage(error), + }); + return { processes: [] }; + } + }, + async createProcess(c: any, request: any): Promise { const created = await baseActions.createProcess(c, request); await broadcastProcesses(c, baseActions); diff --git a/foundry/packages/backend/src/actors/task/index.ts b/foundry/packages/backend/src/actors/task/index.ts index 30f6836..7e1c5e2 100644 --- a/foundry/packages/backend/src/actors/task/index.ts +++ b/foundry/packages/backend/src/actors/task/index.ts @@ -1,9 +1,9 @@ -import { actor, queue } from "rivetkit"; +import { actor } from "rivetkit"; import type { TaskRecord } from "@sandbox-agent/foundry-shared"; import { taskDb } from "./db/db.js"; import { getCurrentRecord } from "./workflow/common.js"; import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js"; -import { TASK_QUEUE_NAMES, runTaskCommandLoop } from "./workflow/index.js"; +import { taskCommandActions } from "./workflow/index.js"; export interface TaskInput { organizationId: string; @@ -13,11 +13,10 @@ export interface TaskInput { export const task = actor({ db: taskDb, - queues: Object.fromEntries(TASK_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "Task", icon: "wrench", - actionTimeout: 5 * 60_000, + actionTimeout: 10 * 60_000, }, createState: (_c, input: TaskInput) => ({ organizationId: input.organizationId, @@ -40,8 +39,9 @@ export const task = actor({ async getSessionDetail(c, input: { sessionId: string; authSessionId?: string }) { return await getSessionDetail(c, input.sessionId, input.authSessionId); }, + + ...taskCommandActions, }, - run: runTaskCommandLoop, }); -export { TASK_QUEUE_NAMES }; +export { taskWorkflowQueueName } from "./workflow/index.js"; diff --git a/foundry/packages/backend/src/actors/task/workflow/common.ts b/foundry/packages/backend/src/actors/task/workflow/common.ts index d942e01..cbe63e6 100644 --- a/foundry/packages/backend/src/actors/task/workflow/common.ts +++ b/foundry/packages/backend/src/actors/task/workflow/common.ts @@ -4,6 +4,8 @@ import type { TaskRecord, TaskStatus } from "@sandbox-agent/foundry-shared"; import { task as taskTable, taskRuntime, taskSandboxes } from "../db/schema.js"; import { getOrCreateAuditLog, getOrCreateOrganization } from "../../handles.js"; import { broadcastTaskUpdate } from "../workspace.js"; +import { getActorRuntimeContext } from "../../context.js"; +import { defaultSandboxProviderId } from "../../../sandbox-config.js"; export const TASK_ROW_ID = 1; @@ -64,10 +66,16 @@ export async function setTaskState(ctx: any, status: TaskStatus): Promise await broadcastTaskUpdate(ctx); } +/** + * Read the task's current record from its local SQLite DB. + * If the task actor was lazily created (virtual task from PR sync) and has no + * DB rows yet, auto-initializes by reading branch/title from the org actor's + * getTaskIndexEntry. This is the self-initialization path for lazy task actors. + */ export async function getCurrentRecord(ctx: any): Promise { const db = ctx.db; const organization = await getOrCreateOrganization(ctx, ctx.state.organizationId); - const row = await db + let row = await db .select({ branchName: taskTable.branchName, title: taskTable.title, @@ -85,7 +93,48 @@ export async function getCurrentRecord(ctx: any): Promise { .get(); if (!row) { - throw new Error(`Task not found: ${ctx.state.taskId}`); + // Virtual task — auto-initialize from org actor's task index data + let branchName: string | null = null; + let title = "Untitled"; + try { + const entry = await organization.getTaskIndexEntry({ taskId: ctx.state.taskId }); + branchName = entry?.branchName ?? null; + title = entry?.title ?? title; + } catch {} + + const { config } = getActorRuntimeContext(); + const { initBootstrapDbActivity, initCompleteActivity } = await import("./init.js"); + await initBootstrapDbActivity(ctx, { + sandboxProviderId: defaultSandboxProviderId(config), + branchName, + title, + task: title, + }); + await initCompleteActivity(ctx, { sandboxProviderId: defaultSandboxProviderId(config) }); + + // Re-read the row after initialization + const initialized = await db + .select({ + branchName: taskTable.branchName, + title: taskTable.title, + task: taskTable.task, + sandboxProviderId: taskTable.sandboxProviderId, + status: taskTable.status, + pullRequestJson: taskTable.pullRequestJson, + activeSandboxId: taskRuntime.activeSandboxId, + createdAt: taskTable.createdAt, + updatedAt: taskTable.updatedAt, + }) + .from(taskTable) + .leftJoin(taskRuntime, eq(taskTable.id, taskRuntime.id)) + .where(eq(taskTable.id, TASK_ROW_ID)) + .get(); + + if (!initialized) { + throw new Error(`Task not found after initialization: ${ctx.state.taskId}`); + } + + row = initialized; } const repositoryMetadata = await organization.getRepositoryMetadata({ repoId: ctx.state.repoId }); @@ -140,19 +189,13 @@ export async function getCurrentRecord(ctx: any): Promise { export async function appendAuditLog(ctx: any, kind: string, payload: Record): Promise { const row = await ctx.db.select({ branchName: taskTable.branchName }).from(taskTable).where(eq(taskTable.id, TASK_ROW_ID)).get(); const auditLog = await getOrCreateAuditLog(ctx, ctx.state.organizationId); - await auditLog.send( - "auditLog.command.append", - { - kind, - repoId: ctx.state.repoId, - taskId: ctx.state.taskId, - branchName: row?.branchName ?? null, - payload, - }, - { - wait: false, - }, - ); + void auditLog.append({ + kind, + repoId: ctx.state.repoId, + taskId: ctx.state.taskId, + branchName: row?.branchName ?? null, + payload, + }); await broadcastTaskUpdate(ctx); } diff --git a/foundry/packages/backend/src/actors/task/workflow/index.ts b/foundry/packages/backend/src/actors/task/workflow/index.ts index f23871a..69004ee 100644 --- a/foundry/packages/backend/src/actors/task/workflow/index.ts +++ b/foundry/packages/backend/src/actors/task/workflow/index.ts @@ -11,7 +11,6 @@ import { killDestroySandboxActivity, killWriteDbActivity, } from "./commands.js"; -import { TASK_QUEUE_NAMES } from "./queue.js"; import { changeWorkspaceModel, closeWorkspaceSession, @@ -33,205 +32,233 @@ import { updateWorkspaceDraft, } from "../workspace.js"; -export { TASK_QUEUE_NAMES, taskWorkflowQueueName } from "./queue.js"; +export { taskWorkflowQueueName } from "./queue.js"; -type TaskQueueName = (typeof TASK_QUEUE_NAMES)[number]; - -type CommandHandler = (c: any, msg: { name: TaskQueueName; body: any; complete: (response: unknown) => Promise }) => Promise; - -const commandHandlers: Record = { - "task.command.initialize": async (c, msg) => { - const body = msg.body; +/** + * Task command actions — converted from queue/workflow handlers to direct actions. + * Each export becomes an action on the task actor. + */ +export const taskCommandActions = { + async initialize(c: any, body: any) { await initBootstrapDbActivity(c, body); await initEnqueueProvisionActivity(c, body); - const currentRecord = await getCurrentRecord(c); + return await getCurrentRecord(c); + }, + + async provision(c: any, body: any) { try { - await msg.complete(currentRecord); + await initCompleteActivity(c, body); + return { ok: true }; } catch (error) { - logActorWarning("task.workflow", "initialize completion failed", { - error: resolveErrorMessage(error), - }); + await initFailedActivity(c, error, body); + return { ok: false, error: resolveErrorMessage(error) }; } }, - "task.command.provision": async (c, msg) => { - try { - await initCompleteActivity(c, msg.body); - await msg.complete({ ok: true }); - } catch (error) { - await initFailedActivity(c, error, msg.body); - await msg.complete({ - ok: false, - error: resolveErrorMessage(error), - }); - } - }, - - "task.command.attach": async (c, msg) => { + async attach(c: any, body: any) { + // handleAttachActivity expects msg with complete — adapt + const result = { value: undefined as any }; + const msg = { + name: "task.command.attach", + body, + complete: async (v: any) => { + result.value = v; + }, + }; await handleAttachActivity(c, msg); + return result.value; }, - "task.command.switch": async (c, msg) => { + async switchTask(c: any, body: any) { + const result = { value: undefined as any }; + const msg = { + name: "task.command.switch", + body, + complete: async (v: any) => { + result.value = v; + }, + }; await handleSwitchActivity(c, msg); + return result.value; }, - "task.command.push": async (c, msg) => { + async push(c: any, body: any) { + const result = { value: undefined as any }; + const msg = { + name: "task.command.push", + body, + complete: async (v: any) => { + result.value = v; + }, + }; await handlePushActivity(c, msg); + return result.value; }, - "task.command.sync": async (c, msg) => { + async sync(c: any, body: any) { + const result = { value: undefined as any }; + const msg = { + name: "task.command.sync", + body, + complete: async (v: any) => { + result.value = v; + }, + }; await handleSimpleCommandActivity(c, msg, "task.sync"); + return result.value; }, - "task.command.merge": async (c, msg) => { + async merge(c: any, body: any) { + const result = { value: undefined as any }; + const msg = { + name: "task.command.merge", + body, + complete: async (v: any) => { + result.value = v; + }, + }; await handleSimpleCommandActivity(c, msg, "task.merge"); + return result.value; }, - "task.command.archive": async (c, msg) => { + async archive(c: any, body: any) { + const result = { value: undefined as any }; + const msg = { + name: "task.command.archive", + body, + complete: async (v: any) => { + result.value = v; + }, + }; await handleArchiveActivity(c, msg); + return result.value; }, - "task.command.kill": async (c, msg) => { + async kill(c: any, body: any) { + const result = { value: undefined as any }; + const msg = { + name: "task.command.kill", + body, + complete: async (v: any) => { + result.value = v; + }, + }; await killDestroySandboxActivity(c); await killWriteDbActivity(c, msg); + return result.value; }, - "task.command.get": async (c, msg) => { + async getRecord(c: any, body: any) { + const result = { value: undefined as any }; + const msg = { + name: "task.command.get", + body, + complete: async (v: any) => { + result.value = v; + }, + }; await handleGetActivity(c, msg); + return result.value; }, - "task.command.pull_request.sync": async (c, msg) => { - await syncTaskPullRequest(c, msg.body?.pullRequest ?? null); - await msg.complete({ ok: true }); + async pullRequestSync(c: any, body: any) { + await syncTaskPullRequest(c, body?.pullRequest ?? null); + return { ok: true }; }, - "task.command.workspace.mark_unread": async (c, msg) => { - await markWorkspaceUnread(c, msg.body?.authSessionId); - await msg.complete({ ok: true }); + async markUnread(c: any, body: any) { + await markWorkspaceUnread(c, body?.authSessionId); + return { ok: true }; }, - "task.command.workspace.rename_task": async (c, msg) => { - await renameWorkspaceTask(c, msg.body.value); - await msg.complete({ ok: true }); + async renameTask(c: any, body: any) { + await renameWorkspaceTask(c, body.value); + return { ok: true }; }, - "task.command.workspace.create_session": async (c, msg) => { + async createSession(c: any, body: any) { + return await createWorkspaceSession(c, body?.model, body?.authSessionId); + }, + + async createSessionAndSend(c: any, body: any) { try { - const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId); - await msg.complete(created); - } catch (error) { - await msg.complete({ error: resolveErrorMessage(error) }); - } - }, - - "task.command.workspace.create_session_and_send": async (c, msg) => { - try { - const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId); - await sendWorkspaceMessage(c, created.sessionId, msg.body.text, [], msg.body?.authSessionId); + const created = await createWorkspaceSession(c, body?.model, body?.authSessionId); + await sendWorkspaceMessage(c, created.sessionId, body.text, [], body?.authSessionId); } catch (error) { logActorWarning("task.workflow", "create_session_and_send failed", { error: resolveErrorMessage(error), }); } - await msg.complete({ ok: true }); + return { ok: true }; }, - "task.command.workspace.ensure_session": async (c, msg) => { - await ensureWorkspaceSession(c, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId); - await msg.complete({ ok: true }); + async ensureSession(c: any, body: any) { + await ensureWorkspaceSession(c, body.sessionId, body?.model, body?.authSessionId); + return { ok: true }; }, - "task.command.workspace.rename_session": async (c, msg) => { - await renameWorkspaceSession(c, msg.body.sessionId, msg.body.title); - await msg.complete({ ok: true }); + async renameSession(c: any, body: any) { + await renameWorkspaceSession(c, body.sessionId, body.title); + return { ok: true }; }, - "task.command.workspace.select_session": async (c, msg) => { - await selectWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId); - await msg.complete({ ok: true }); + async selectSession(c: any, body: any) { + await selectWorkspaceSession(c, body.sessionId, body?.authSessionId); + return { ok: true }; }, - "task.command.workspace.set_session_unread": async (c, msg) => { - await setWorkspaceSessionUnread(c, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId); - await msg.complete({ ok: true }); + async setSessionUnread(c: any, body: any) { + await setWorkspaceSessionUnread(c, body.sessionId, body.unread, body?.authSessionId); + return { ok: true }; }, - "task.command.workspace.update_draft": async (c, msg) => { - await updateWorkspaceDraft(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId); - await msg.complete({ ok: true }); + async updateDraft(c: any, body: any) { + await updateWorkspaceDraft(c, body.sessionId, body.text, body.attachments, body?.authSessionId); + return { ok: true }; }, - "task.command.workspace.change_model": async (c, msg) => { - await changeWorkspaceModel(c, msg.body.sessionId, msg.body.model, msg.body?.authSessionId); - await msg.complete({ ok: true }); + async changeModel(c: any, body: any) { + await changeWorkspaceModel(c, body.sessionId, body.model, body?.authSessionId); + return { ok: true }; }, - "task.command.workspace.send_message": async (c, msg) => { - try { - await sendWorkspaceMessage(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId); - await msg.complete({ ok: true }); - } catch (error) { - await msg.complete({ error: resolveErrorMessage(error) }); - } + async sendMessage(c: any, body: any) { + await sendWorkspaceMessage(c, body.sessionId, body.text, body.attachments, body?.authSessionId); + return { ok: true }; }, - "task.command.workspace.stop_session": async (c, msg) => { - await stopWorkspaceSession(c, msg.body.sessionId); - await msg.complete({ ok: true }); + async stopSession(c: any, body: any) { + await stopWorkspaceSession(c, body.sessionId); + return { ok: true }; }, - "task.command.workspace.sync_session_status": async (c, msg) => { - await syncWorkspaceSessionStatus(c, msg.body.sessionId, msg.body.status, msg.body.at); - await msg.complete({ ok: true }); + async syncSessionStatus(c: any, body: any) { + await syncWorkspaceSessionStatus(c, body.sessionId, body.status, body.at); + return { ok: true }; }, - "task.command.workspace.refresh_derived": async (c, msg) => { + async refreshDerived(c: any, _body: any) { await refreshWorkspaceDerivedState(c); - await msg.complete({ ok: true }); + return { ok: true }; }, - "task.command.workspace.refresh_session_transcript": async (c, msg) => { - await refreshWorkspaceSessionTranscript(c, msg.body.sessionId); - await msg.complete({ ok: true }); + async refreshSessionTranscript(c: any, body: any) { + await refreshWorkspaceSessionTranscript(c, body.sessionId); + return { ok: true }; }, - "task.command.workspace.close_session": async (c, msg) => { - await closeWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId); - await msg.complete({ ok: true }); + async closeSession(c: any, body: any) { + await closeWorkspaceSession(c, body.sessionId, body?.authSessionId); + return { ok: true }; }, - "task.command.workspace.publish_pr": async (c, msg) => { + async publishPr(c: any, _body: any) { await publishWorkspacePr(c); - await msg.complete({ ok: true }); + return { ok: true }; }, - "task.command.workspace.revert_file": async (c, msg) => { - await revertWorkspaceFile(c, msg.body.path); - await msg.complete({ ok: true }); + async revertFile(c: any, body: any) { + await revertWorkspaceFile(c, body.path); + return { ok: true }; }, }; - -/** - * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` - * with completable messages. - */ -export async function runTaskCommandLoop(c: any): Promise { - for await (const msg of c.queue.iter({ names: [...TASK_QUEUE_NAMES], completable: true })) { - const handler = commandHandlers[msg.name as TaskQueueName]; - if (handler) { - try { - await handler(c, msg); - } catch (error) { - const message = resolveErrorMessage(error); - logActorWarning("task.workflow", "task command failed", { - queueName: msg.name, - error: message, - }); - await msg.complete({ error: message }).catch(() => {}); - } - } else { - logActorWarning("task.workflow", "unknown queue message", { queueName: msg.name }); - await msg.complete({ error: `Unknown command: ${msg.name}` }); - } - } -} diff --git a/foundry/packages/backend/src/actors/task/workflow/init.ts b/foundry/packages/backend/src/actors/task/workflow/init.ts index bcfcba9..08085e8 100644 --- a/foundry/packages/backend/src/actors/task/workflow/init.ts +++ b/foundry/packages/backend/src/actors/task/workflow/init.ts @@ -6,7 +6,7 @@ import { resolveErrorMessage } from "../../logging.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; import { task as taskTable, taskRuntime } from "../db/schema.js"; import { TASK_ROW_ID, appendAuditLog, collectErrorMessages, resolveErrorDetail, setTaskState } from "./common.js"; -import { taskWorkflowQueueName } from "./queue.js"; +// task actions called directly (no queue) export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise { const { config } = getActorRuntimeContext(); @@ -72,9 +72,7 @@ export async function initEnqueueProvisionActivity(loopCtx: any, body: any): Pro const self = selfTask(loopCtx); try { - await self.send(taskWorkflowQueueName("task.command.provision"), body, { - wait: false, - }); + void self.provision(body).catch(() => {}); } catch (error) { logActorWarning("task.init", "background provision command failed", { organizationId: loopCtx.state.organizationId, diff --git a/foundry/packages/backend/src/actors/task/workspace.ts b/foundry/packages/backend/src/actors/task/workspace.ts index 4f27c58..7505d01 100644 --- a/foundry/packages/backend/src/actors/task/workspace.ts +++ b/foundry/packages/backend/src/actors/task/workspace.ts @@ -10,14 +10,15 @@ import { } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../context.js"; import { getOrCreateOrganization, getOrCreateTaskSandbox, getOrCreateUser, getTaskSandbox, selfTask } from "../handles.js"; +import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { SANDBOX_REPO_CWD } from "../sandbox/index.js"; import { resolveSandboxProviderId } from "../../sandbox-config.js"; import { getBetterAuthService } from "../../services/better-auth.js"; -import { expectQueueResponse } from "../../services/queue.js"; +// expectQueueResponse removed — actions return values directly import { resolveOrganizationGithubAuth } from "../../services/github-auth.js"; import { githubRepoFullNameFromRemote } from "../../services/repo.js"; -import { organizationWorkflowQueueName } from "../organization/queues.js"; -import { userWorkflowQueueName } from "../user/workflow.js"; +// organization actions called directly (no queue) + import { task as taskTable, taskRuntime, taskSandboxes, taskWorkspaceSessions } from "./db/schema.js"; import { getCurrentRecord } from "./workflow/common.js"; @@ -239,17 +240,11 @@ async function upsertUserTaskState(c: any, authSessionId: string | null | undefi } const user = await getOrCreateUser(c, userId); - expectQueueResponse( - await user.send( - userWorkflowQueueName("user.command.task_state.upsert"), - { - taskId: c.state.taskId, - sessionId, - patch, - }, - { wait: true, timeout: 60_000 }, - ), - ); + await user.taskStateUpsert({ + taskId: c.state.taskId, + sessionId, + patch, + }); } async function deleteUserTaskState(c: any, authSessionId: string | null | undefined, sessionId: string): Promise { @@ -264,16 +259,10 @@ async function deleteUserTaskState(c: any, authSessionId: string | null | undefi } const user = await getOrCreateUser(c, userId); - expectQueueResponse( - await user.send( - userWorkflowQueueName("user.command.task_state.delete"), - { - taskId: c.state.taskId, - sessionId, - }, - { wait: true, timeout: 60_000 }, - ), - ); + await user.taskStateDelete({ + taskId: c.state.taskId, + sessionId, + }); } async function resolveDefaultModel(c: any, authSessionId?: string | null): Promise { @@ -750,21 +739,17 @@ async function enqueueWorkspaceRefresh( command: "task.command.workspace.refresh_derived" | "task.command.workspace.refresh_session_transcript", body: Record, ): Promise { - const self = selfTask(c); - await self.send(command, body, { wait: false }); + // Call directly since we're inside the task actor (no queue needed) + if (command === "task.command.workspace.refresh_derived") { + void refreshWorkspaceDerivedState(c).catch(() => {}); + } else { + void refreshWorkspaceSessionTranscript(c, body.sessionId as string).catch(() => {}); + } } async function enqueueWorkspaceEnsureSession(c: any, sessionId: string): Promise { - const self = selfTask(c); - await self.send( - "task.command.workspace.ensure_session", - { - sessionId, - }, - { - wait: false, - }, - ); + // Call directly since we're inside the task actor + void ensureWorkspaceSession(c, sessionId).catch(() => {}); } function pendingWorkspaceSessionStatus(record: any): "pending_provision" | "pending_session_create" { @@ -930,7 +915,10 @@ export async function buildSessionDetail(c: any, sessionId: string, authSessionI const userTaskState = await getUserTaskState(c, authSessionId); const userSessionState = userTaskState.bySessionId.get(sessionId); - if (!meta.sandboxSessionId) { + // Skip live transcript fetch if the sandbox session doesn't exist yet or + // the session is still provisioning — the sandbox API will block/timeout. + const isPending = meta.status === "pending_provision" || meta.status === "pending_session_create"; + if (!meta.sandboxSessionId || isPending) { return buildSessionDetailFromMeta(meta, userSessionState); } @@ -947,8 +935,13 @@ export async function buildSessionDetail(c: any, sessionId: string, authSessionI userSessionState, ); } - } catch { - // Session detail reads should degrade to cached transcript data if the live sandbox is unavailable. + } catch (error) { + // Session detail reads degrade to cached transcript when sandbox is unavailable. + logActorWarning("task", "readSessionTranscript failed, using cached transcript", { + taskId: c.state.taskId, + sessionId, + error: resolveErrorMessage(error), + }); } return buildSessionDetailFromMeta(meta, userSessionState); @@ -976,13 +969,7 @@ export async function getSessionDetail(c: any, sessionId: string, authSessionId? */ export async function broadcastTaskUpdate(c: any, options?: { sessionId?: string }): Promise { const organization = await getOrCreateOrganization(c, c.state.organizationId); - await expectQueueResponse<{ ok: true }>( - await organization.send( - organizationWorkflowQueueName("organization.command.applyTaskSummaryUpdate"), - { taskSummary: await buildTaskSummary(c) }, - { wait: true, timeout: 10_000 }, - ), - ); + await organization.commandApplyTaskSummaryUpdate({ taskSummary: await buildTaskSummary(c) }); c.broadcast("taskUpdated", { type: "taskUpdated", detail: await buildTaskDetail(c), @@ -1119,22 +1106,12 @@ export async function ensureWorkspaceSession(c: any, sessionId: string, model?: } export async function enqueuePendingWorkspaceSessions(c: any): Promise { - const self = selfTask(c); const pending = (await listSessionMetaRows(c, { includeClosed: true })).filter( (row) => row.closed !== true && row.status !== "ready" && row.status !== "error", ); for (const row of pending) { - await self.send( - "task.command.workspace.ensure_session", - { - sessionId: row.sessionId, - model: row.model, - }, - { - wait: false, - }, - ); + void ensureWorkspaceSession(c, row.sessionId, row.model).catch(() => {}); } } diff --git a/foundry/packages/backend/src/actors/user/index.ts b/foundry/packages/backend/src/actors/user/index.ts index 01d6e0f..8a15b58 100644 --- a/foundry/packages/backend/src/actors/user/index.ts +++ b/foundry/packages/backend/src/actors/user/index.ts @@ -1,12 +1,21 @@ -import { actor, queue } from "rivetkit"; +import { actor } from "rivetkit"; import { userDb } from "./db/db.js"; import { betterAuthActions } from "./actions/better-auth.js"; import { userActions } from "./actions/user.js"; -import { USER_QUEUE_NAMES, runUserCommandLoop } from "./workflow.js"; +import { + createAuthRecordMutation, + updateAuthRecordMutation, + updateManyAuthRecordsMutation, + deleteAuthRecordMutation, + deleteManyAuthRecordsMutation, + upsertUserProfileMutation, + upsertSessionStateMutation, + upsertTaskStateMutation, + deleteTaskStateMutation, +} from "./workflow.js"; export const user = actor({ db: userDb, - queues: Object.fromEntries(USER_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "User", icon: "shield", @@ -18,6 +27,34 @@ export const user = actor({ actions: { ...betterAuthActions, ...userActions, + async authCreate(c, body) { + return await createAuthRecordMutation(c, body); + }, + async authUpdate(c, body) { + return await updateAuthRecordMutation(c, body); + }, + async authUpdateMany(c, body) { + return await updateManyAuthRecordsMutation(c, body); + }, + async authDelete(c, body) { + await deleteAuthRecordMutation(c, body); + return { ok: true }; + }, + async authDeleteMany(c, body) { + return await deleteManyAuthRecordsMutation(c, body); + }, + async profileUpsert(c, body) { + return await upsertUserProfileMutation(c, body); + }, + async sessionStateUpsert(c, body) { + return await upsertSessionStateMutation(c, body); + }, + async taskStateUpsert(c, body) { + return await upsertTaskStateMutation(c, body); + }, + async taskStateDelete(c, body) { + await deleteTaskStateMutation(c, body); + return { ok: true }; + }, }, - run: runUserCommandLoop, }); diff --git a/foundry/packages/backend/src/actors/user/workflow.ts b/foundry/packages/backend/src/actors/user/workflow.ts index 87f5326..9bf2675 100644 --- a/foundry/packages/backend/src/actors/user/workflow.ts +++ b/foundry/packages/backend/src/actors/user/workflow.ts @@ -1,28 +1,9 @@ import { eq, count as sqlCount, and } from "drizzle-orm"; import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared"; -import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js"; import { buildWhere, columnFor, materializeRow, persistInput, persistPatch, tableFor } from "./query-helpers.js"; -export const USER_QUEUE_NAMES = [ - "user.command.auth.create", - "user.command.auth.update", - "user.command.auth.update_many", - "user.command.auth.delete", - "user.command.auth.delete_many", - "user.command.profile.upsert", - "user.command.session_state.upsert", - "user.command.task_state.upsert", - "user.command.task_state.delete", -] as const; - -export type UserQueueName = (typeof USER_QUEUE_NAMES)[number]; - -export function userWorkflowQueueName(name: UserQueueName): UserQueueName { - return name; -} - -async function createAuthRecordMutation(c: any, input: { model: string; data: Record }) { +export async function createAuthRecordMutation(c: any, input: { model: string; data: Record }) { const table = tableFor(input.model); const persisted = persistInput(input.model, input.data); await c.db @@ -37,12 +18,10 @@ async function createAuthRecordMutation(c: any, input: { model: string; data: Re return materializeRow(input.model, row); } -async function updateAuthRecordMutation(c: any, input: { model: string; where: any[]; update: Record }) { +export async function updateAuthRecordMutation(c: any, input: { model: string; where: any[]; update: Record }) { const table = tableFor(input.model); const predicate = buildWhere(table, input.where); - if (!predicate) { - throw new Error("updateAuthRecord requires a where clause"); - } + if (!predicate) throw new Error("updateAuthRecord requires a where clause"); await c.db .update(table) .set(persistPatch(input.model, input.update) as any) @@ -51,12 +30,10 @@ async function updateAuthRecordMutation(c: any, input: { model: string; where: a return materializeRow(input.model, await c.db.select().from(table).where(predicate).get()); } -async function updateManyAuthRecordsMutation(c: any, input: { model: string; where: any[]; update: Record }) { +export async function updateManyAuthRecordsMutation(c: any, input: { model: string; where: any[]; update: Record }) { const table = tableFor(input.model); const predicate = buildWhere(table, input.where); - if (!predicate) { - throw new Error("updateManyAuthRecords requires a where clause"); - } + if (!predicate) throw new Error("updateManyAuthRecords requires a where clause"); await c.db .update(table) .set(persistPatch(input.model, input.update) as any) @@ -66,27 +43,23 @@ async function updateManyAuthRecordsMutation(c: any, input: { model: string; whe return row?.value ?? 0; } -async function deleteAuthRecordMutation(c: any, input: { model: string; where: any[] }) { +export async function deleteAuthRecordMutation(c: any, input: { model: string; where: any[] }) { const table = tableFor(input.model); const predicate = buildWhere(table, input.where); - if (!predicate) { - throw new Error("deleteAuthRecord requires a where clause"); - } + if (!predicate) throw new Error("deleteAuthRecord requires a where clause"); await c.db.delete(table).where(predicate).run(); } -async function deleteManyAuthRecordsMutation(c: any, input: { model: string; where: any[] }) { +export async function deleteManyAuthRecordsMutation(c: any, input: { model: string; where: any[] }) { const table = tableFor(input.model); const predicate = buildWhere(table, input.where); - if (!predicate) { - throw new Error("deleteManyAuthRecords requires a where clause"); - } + if (!predicate) throw new Error("deleteManyAuthRecords requires a where clause"); const rows = await c.db.select().from(table).where(predicate).all(); await c.db.delete(table).where(predicate).run(); return rows.length; } -async function upsertUserProfileMutation( +export async function upsertUserProfileMutation( c: any, input: { userId: string; @@ -134,11 +107,10 @@ async function upsertUserProfileMutation( }, }) .run(); - return await c.db.select().from(userProfiles).where(eq(userProfiles.userId, input.userId)).get(); } -async function upsertSessionStateMutation(c: any, input: { sessionId: string; activeOrganizationId: string | null }) { +export async function upsertSessionStateMutation(c: any, input: { sessionId: string; activeOrganizationId: string | null }) { const now = Date.now(); await c.db .insert(sessionState) @@ -150,17 +122,13 @@ async function upsertSessionStateMutation(c: any, input: { sessionId: string; ac }) .onConflictDoUpdate({ target: sessionState.sessionId, - set: { - activeOrganizationId: input.activeOrganizationId, - updatedAt: now, - }, + set: { activeOrganizationId: input.activeOrganizationId, updatedAt: now }, }) .run(); - return await c.db.select().from(sessionState).where(eq(sessionState.sessionId, input.sessionId)).get(); } -async function upsertTaskStateMutation( +export async function upsertTaskStateMutation( c: any, input: { taskId: string; @@ -182,14 +150,7 @@ async function upsertTaskStateMutation( .get(); if (input.patch.activeSessionId !== undefined) { - await c.db - .update(userTaskState) - .set({ - activeSessionId: input.patch.activeSessionId, - updatedAt: now, - }) - .where(eq(userTaskState.taskId, input.taskId)) - .run(); + await c.db.update(userTaskState).set({ activeSessionId: input.patch.activeSessionId, updatedAt: now }).where(eq(userTaskState.taskId, input.taskId)).run(); } await c.db @@ -224,7 +185,7 @@ async function upsertTaskStateMutation( .get(); } -async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionId?: string }) { +export async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionId?: string }) { if (input.sessionId) { await c.db .delete(userTaskState) @@ -232,50 +193,5 @@ async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionI .run(); return; } - await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run(); } - -const COMMAND_HANDLERS: Record Promise> = { - "user.command.auth.create": (c, body) => createAuthRecordMutation(c, body), - "user.command.auth.update": (c, body) => updateAuthRecordMutation(c, body), - "user.command.auth.update_many": (c, body) => updateManyAuthRecordsMutation(c, body), - "user.command.auth.delete": async (c, body) => { - await deleteAuthRecordMutation(c, body); - return { ok: true }; - }, - "user.command.auth.delete_many": (c, body) => deleteManyAuthRecordsMutation(c, body), - "user.command.profile.upsert": (c, body) => upsertUserProfileMutation(c, body), - "user.command.session_state.upsert": (c, body) => upsertSessionStateMutation(c, body), - "user.command.task_state.upsert": (c, body) => upsertTaskStateMutation(c, body), - "user.command.task_state.delete": async (c, body) => { - await deleteTaskStateMutation(c, body); - return { ok: true }; - }, -}; - -/** - * Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()` - * with completable messages. - */ -export async function runUserCommandLoop(c: any): Promise { - for await (const msg of c.queue.iter({ names: [...USER_QUEUE_NAMES], completable: true })) { - try { - const handler = COMMAND_HANDLERS[msg.name]; - if (handler) { - const result = await handler(c, msg.body); - await msg.complete(result); - } else { - logActorWarning("user", "unknown queue message", { queueName: msg.name }); - await msg.complete({ error: `Unknown command: ${msg.name}` }); - } - } catch (error) { - const message = resolveErrorMessage(error); - logActorWarning("user", "user command failed", { - queueName: msg.name, - error: message, - }); - await msg.complete({ error: message }).catch(() => {}); - } - } -} diff --git a/foundry/packages/backend/src/index.ts b/foundry/packages/backend/src/index.ts index 60cd178..8f82d8b 100644 --- a/foundry/packages/backend/src/index.ts +++ b/foundry/packages/backend/src/index.ts @@ -48,6 +48,19 @@ function isRivetRequest(request: Request): boolean { } export async function startBackend(options: BackendStartOptions = {}): Promise { + // Prevent the sandbox-agent SDK's unhandled SQLite constraint errors from + // crashing the entire process. The SDK has a bug where duplicate event + // inserts (sandbox_agent_events UNIQUE constraint) throw from an internal + // async path with no catch. Log and continue. + process.on("uncaughtException", (error) => { + logger.error({ error: error?.message ?? String(error), stack: error?.stack }, "uncaughtException (kept alive)"); + }); + process.on("unhandledRejection", (reason) => { + const msg = reason instanceof Error ? reason.message : String(reason); + const stack = reason instanceof Error ? reason.stack : undefined; + logger.error({ error: msg, stack }, "unhandledRejection (kept alive)"); + }); + // sandbox-agent agent plugins vary on which env var they read for OpenAI/Codex auth. // Normalize to keep local dev + docker-compose simple. if (!process.env.CODEX_API_KEY && process.env.OPENAI_API_KEY) { diff --git a/foundry/packages/backend/src/services/better-auth.ts b/foundry/packages/backend/src/services/better-auth.ts index 215646f..c36b900 100644 --- a/foundry/packages/backend/src/services/better-auth.ts +++ b/foundry/packages/backend/src/services/better-auth.ts @@ -1,11 +1,11 @@ import { betterAuth } from "better-auth"; import { createAdapterFactory } from "better-auth/adapters"; import { APP_SHELL_ORGANIZATION_ID } from "../actors/organization/constants.js"; -import { organizationWorkflowQueueName } from "../actors/organization/queues.js"; -import { userWorkflowQueueName } from "../actors/user/workflow.js"; +// organization actions are called directly (no queue) +// user actor actions are called directly (no queue) import { organizationKey, userKey } from "../actors/keys.js"; import { logger } from "../logging.js"; -import { expectQueueResponse } from "./queue.js"; +// expectQueueResponse removed — actions return values directly const AUTH_BASE_PATH = "/v1/auth"; const SESSION_COOKIE = "better-auth.session_token"; @@ -62,11 +62,7 @@ function resolveRouteUserId(organization: any, resolved: any): string | null { return null; } -async function sendOrganizationCommand(organization: any, name: Parameters[0], body: unknown): Promise { - return expectQueueResponse( - await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 }), - ); -} +// sendOrganizationCommand removed — org actions are called directly export interface BetterAuthService { auth: any; @@ -166,9 +162,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin return null; }; - const ensureOrganizationVerification = async (method: Parameters[0], payload: Record) => { + const ensureOrganizationVerification = async (actionName: string, payload: Record) => { const organization = await appOrganization(); - return await sendOrganizationCommand(organization, method, payload); + return await (organization as any)[actionName](payload); }; return { @@ -179,7 +175,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin create: async ({ model, data }) => { const transformed = await transformInput(data, model, "create", true); if (model === "verification") { - return await ensureOrganizationVerification("organization.command.better_auth.verification.create", { data: transformed }); + return await ensureOrganizationVerification("commandBetterAuthVerificationCreate", { data: transformed }); } const userId = await resolveUserIdForQuery(model, undefined, transformed); @@ -188,20 +184,18 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } const userActor = await getUser(userId); - const created = expectQueueResponse( - await userActor.send(userWorkflowQueueName("user.command.auth.create"), { model, data: transformed }, { wait: true, timeout: 60_000 }), - ); + const created = await userActor.authCreate({ model, data: transformed }); const organization = await appOrganization(); if (model === "user" && typeof transformed.email === "string" && transformed.email.length > 0) { - await sendOrganizationCommand(organization, "organization.command.better_auth.email_index.upsert", { + await organization.commandBetterAuthEmailIndexUpsert({ email: transformed.email.toLowerCase(), userId, }); } if (model === "session") { - await sendOrganizationCommand(organization, "organization.command.better_auth.session_index.upsert", { + await organization.commandBetterAuthSessionIndexUpsert({ sessionId: String(created.id), sessionToken: String(created.token), userId, @@ -209,7 +203,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } if (model === "account") { - await sendOrganizationCommand(organization, "organization.command.better_auth.account_index.upsert", { + await organization.commandBetterAuthAccountIndexUpsert({ id: String(created.id), providerId: String(created.providerId), accountId: String(created.accountId), @@ -297,7 +291,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const transformedWhere = transformWhereClause({ model, where, action: "update" }); const transformedUpdate = (await transformInput(update as Record, model, "update", true)) as Record; if (model === "verification") { - return await ensureOrganizationVerification("organization.command.better_auth.verification.update", { + return await ensureOrganizationVerification("commandBetterAuthVerificationUpdate", { where: transformedWhere, update: transformedUpdate, }); @@ -317,23 +311,17 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin : model === "session" ? await userActor.betterAuthFindOneRecord({ model, where: transformedWhere }) : null; - const updated = expectQueueResponse( - await userActor.send( - userWorkflowQueueName("user.command.auth.update"), - { model, where: transformedWhere, update: transformedUpdate }, - { wait: true, timeout: 60_000 }, - ), - ); + const updated = await userActor.authUpdate({ model, where: transformedWhere, update: transformedUpdate }); const organization = await appOrganization(); if (model === "user" && updated) { if (before?.email && before.email !== updated.email) { - await sendOrganizationCommand(organization, "organization.command.better_auth.email_index.delete", { + await organization.commandBetterAuthEmailIndexDelete({ email: before.email.toLowerCase(), }); } if (updated.email) { - await sendOrganizationCommand(organization, "organization.command.better_auth.email_index.upsert", { + await organization.commandBetterAuthEmailIndexUpsert({ email: updated.email.toLowerCase(), userId, }); @@ -341,7 +329,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } if (model === "session" && updated) { - await sendOrganizationCommand(organization, "organization.command.better_auth.session_index.upsert", { + await organization.commandBetterAuthSessionIndexUpsert({ sessionId: String(updated.id), sessionToken: String(updated.token), userId, @@ -349,7 +337,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } if (model === "account" && updated) { - await sendOrganizationCommand(organization, "organization.command.better_auth.account_index.upsert", { + await organization.commandBetterAuthAccountIndexUpsert({ id: String(updated.id), providerId: String(updated.providerId), accountId: String(updated.accountId), @@ -364,7 +352,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const transformedWhere = transformWhereClause({ model, where, action: "updateMany" }); const transformedUpdate = (await transformInput(update as Record, model, "update", true)) as Record; if (model === "verification") { - return await ensureOrganizationVerification("organization.command.better_auth.verification.update_many", { + return await ensureOrganizationVerification("commandBetterAuthVerificationUpdateMany", { where: transformedWhere, update: transformedUpdate, }); @@ -376,20 +364,14 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } const userActor = await getUser(userId); - return expectQueueResponse( - await userActor.send( - userWorkflowQueueName("user.command.auth.update_many"), - { model, where: transformedWhere, update: transformedUpdate }, - { wait: true, timeout: 60_000 }, - ), - ); + return await userActor.authUpdateMany({ model, where: transformedWhere, update: transformedUpdate }); }, delete: async ({ model, where }) => { const transformedWhere = transformWhereClause({ model, where, action: "delete" }); if (model === "verification") { const organization = await appOrganization(); - await sendOrganizationCommand(organization, "organization.command.better_auth.verification.delete", { where: transformedWhere }); + await organization.commandBetterAuthVerificationDelete({ where: transformedWhere }); return; } @@ -401,19 +383,17 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const userActor = await getUser(userId); const organization = await appOrganization(); const before = await userActor.betterAuthFindOneRecord({ model, where: transformedWhere }); - expectQueueResponse( - await userActor.send(userWorkflowQueueName("user.command.auth.delete"), { model, where: transformedWhere }, { wait: true, timeout: 60_000 }), - ); + await userActor.authDelete({ model, where: transformedWhere }); if (model === "session" && before) { - await sendOrganizationCommand(organization, "organization.command.better_auth.session_index.delete", { + await organization.commandBetterAuthSessionIndexDelete({ sessionId: before.id, sessionToken: before.token, }); } if (model === "account" && before) { - await sendOrganizationCommand(organization, "organization.command.better_auth.account_index.delete", { + await organization.commandBetterAuthAccountIndexDelete({ id: before.id, providerId: before.providerId, accountId: before.accountId, @@ -421,7 +401,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } if (model === "user" && before?.email) { - await sendOrganizationCommand(organization, "organization.command.better_auth.email_index.delete", { + await organization.commandBetterAuthEmailIndexDelete({ email: before.email.toLowerCase(), }); } @@ -430,7 +410,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin deleteMany: async ({ model, where }) => { const transformedWhere = transformWhereClause({ model, where, action: "deleteMany" }); if (model === "verification") { - return await ensureOrganizationVerification("organization.command.better_auth.verification.delete_many", { where: transformedWhere }); + return await ensureOrganizationVerification("commandBetterAuthVerificationDeleteMany", { where: transformedWhere }); } if (model === "session") { @@ -441,11 +421,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const userActor = await getUser(userId); const organization = await appOrganization(); const sessions = await userActor.betterAuthFindManyRecords({ model, where: transformedWhere, limit: 5000 }); - const deleted = expectQueueResponse( - await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 60_000 }), - ); + const deleted = await userActor.authDeleteMany({ model, where: transformedWhere }); for (const session of sessions) { - await sendOrganizationCommand(organization, "organization.command.better_auth.session_index.delete", { + await organization.commandBetterAuthSessionIndexDelete({ sessionId: session.id, sessionToken: session.token, }); @@ -459,9 +437,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } const userActor = await getUser(userId); - const deleted = expectQueueResponse( - await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 60_000 }), - ); + const deleted = await userActor.authDeleteMany({ model, where: transformedWhere }); return deleted; }, @@ -533,9 +509,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin async upsertUserProfile(userId: string, patch: Record) { const userActor = await getUser(userId); - return expectQueueResponse( - await userActor.send(userWorkflowQueueName("user.command.profile.upsert"), { userId, patch }, { wait: true, timeout: 60_000 }), - ); + return await userActor.profileUpsert({ userId, patch }); }, async setActiveOrganization(sessionId: string, activeOrganizationId: string | null) { @@ -544,13 +518,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin throw new Error(`Unknown auth session ${sessionId}`); } const userActor = await getUser(authState.user.id); - return expectQueueResponse( - await userActor.send( - userWorkflowQueueName("user.command.session_state.upsert"), - { sessionId, activeOrganizationId }, - { wait: true, timeout: 60_000 }, - ), - ); + return await userActor.sessionStateUpsert({ sessionId, activeOrganizationId }); }, async getAccessTokenForSession(sessionId: string) { diff --git a/foundry/packages/client/src/backend-client.ts b/foundry/packages/client/src/backend-client.ts index 8331bf6..0903aa8 100644 --- a/foundry/packages/client/src/backend-client.ts +++ b/foundry/packages/client/src/backend-client.ts @@ -486,7 +486,15 @@ export function createBackendClient(options: BackendClientOptions): BackendClien createWithInput: "app", }) as unknown as AppOrganizationHandle; - const task = async (organizationId: string, repoId: string, taskId: string): Promise => client.task.get(taskKey(organizationId, repoId, taskId)); + // getOrCreate is intentional here — this is the ONLY lazy creation point for + // virtual tasks (PR-driven entries that exist in the org's local tables but + // have no task actor yet). The task actor self-initializes from org data in + // getCurrentRecord(). Backend code must NEVER use getOrCreateTask except in + // createTaskMutation. See backend/CLAUDE.md "Lazy Task Actor Creation". + const task = async (organizationId: string, repoId: string, taskId: string): Promise => + client.task.getOrCreate(taskKey(organizationId, repoId, taskId), { + createWithInput: { organizationId, repoId, taskId }, + }); const sandboxByKey = async (organizationId: string, _providerId: SandboxProviderId, sandboxId: string): Promise => { return (client as any).taskSandbox.get(taskSandboxKey(organizationId, sandboxId)); diff --git a/foundry/packages/frontend/src/components/mock-layout.tsx b/foundry/packages/frontend/src/components/mock-layout.tsx index 72c2616..e198ee6 100644 --- a/foundry/packages/frontend/src/components/mock-layout.tsx +++ b/foundry/packages/frontend/src/components/mock-layout.tsx @@ -346,16 +346,17 @@ const TranscriptPanel = memo(function TranscriptPanel({ (activeAgentSession.status === "pending_provision" || activeAgentSession.status === "pending_session_create" || activeAgentSession.status === "error") && activeMessages.length === 0; const serverDraft = promptSession?.draft.text ?? ""; - const serverAttachments = promptSession?.draft.attachments ?? []; + const serverAttachments = promptSession?.draft.attachments; + const serverAttachmentsJson = JSON.stringify(serverAttachments ?? []); // Sync server → local only when user hasn't typed recently (3s cooldown) const DRAFT_SYNC_COOLDOWN_MS = 3_000; useEffect(() => { if (Date.now() - lastEditTimeRef.current > DRAFT_SYNC_COOLDOWN_MS) { setLocalDraft(serverDraft); - setLocalAttachments(serverAttachments); + setLocalAttachments(serverAttachments ?? []); } - }, [serverDraft, serverAttachments]); + }, [serverDraft, serverAttachmentsJson]); // Reset local draft immediately on session/task switch useEffect(() => {