Convert all actors from queues/workflows to direct actions, lazy task creation

Major refactor replacing all queue-based workflow communication with direct
RivetKit action calls across all actors. This works around a RivetKit bug
where c.queue.iter() deadlocks for actors created from another actor's context.

Key changes:
- All actors (organization, task, user, audit-log, github-data) converted
  from run: workflow(...) to actions-only (no run handler, no queues)
- PR sync creates virtual task entries in org local DB instead of spawning
  task actors — prevents OOM from 200+ actors created simultaneously
- Task actors created lazily on first user interaction via getOrCreate,
  self-initialize from org's getTaskIndexEntry data
- Removed requireRepoExists cross-actor call (caused 500s), replaced with
  local resolveTaskRepoId from org's taskIndex table
- Fixed getOrganizationContext to thread overrides through all sync phases
- Fixed sandbox repo path (/home/user/repo for E2B compatibility)
- Fixed buildSessionDetail to skip transcript fetch for pending sessions
- Added process crash protection (uncaughtException/unhandledRejection)
- Fixed React infinite render loop in mock-layout useEffect dependencies
- Added sandbox listProcesses error handling for expired E2B sandboxes
- Set E2B sandbox timeout to 1 hour (was 5 min default)
- Updated CLAUDE.md with lazy task creation rules, no-silent-catch policy,
  React hook dependency safety rules

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-16 14:17:24 -07:00
parent 29e5821fef
commit 78cd38d826
24 changed files with 887 additions and 887 deletions

View file

@ -144,6 +144,15 @@ The client subscribes to `app` always, `organization` when entering an organizat
- Do not add backend git clone paths, `git fetch`, `git for-each-ref`, or direct backend git CLI calls. If you need git data, either read stored GitHub metadata or run the command inside a sandbox.
- The `BackendDriver` has no `GitDriver` or `StackDriver`. Only `GithubDriver` and `TmuxDriver` remain.
## React Hook Dependency Safety
- **Never use unstable references as `useEffect`/`useMemo`/`useCallback` dependencies.** React compares dependencies by reference, not value. Expressions like `?? []`, `?? {}`, `.map(...)`, `.filter(...)`, or object/array literals create new references every render, causing infinite re-render loops when used as dependencies.
- If the upstream value may be `undefined`/`null` and you need a fallback, either:
- Use the raw upstream value as the dependency and apply the fallback inside the effect body: `useEffect(() => { doThing(value ?? []); }, [value]);`
- Derive a stable primitive key: `const key = JSON.stringify(value ?? []);` then depend on `key`
- Memoize: `const stable = useMemo(() => value ?? [], [value]);`
- When reviewing code, treat any `?? []`, `?? {}`, or inline `.map()/.filter()` in a dependency array as a bug.
## UI System
- Foundry's base UI system is `BaseUI` with `Styletron`, plus Foundry-specific theme/tokens on top. Treat that as the default UI foundation.
@ -168,6 +177,7 @@ The client subscribes to `app` always, `organization` when entering an organizat
- If the system reaches an unexpected state, raise an explicit error with actionable context.
- Do not fail silently, swallow errors, or auto-ignore inconsistent data.
- Prefer fail-fast behavior over hidden degradation when correctness is uncertain.
- **Never use bare `catch {}` or `catch { }` blocks.** Every catch must at minimum log the error with `logActorWarning` or `console.warn`. Silent catches hide bugs and make debugging impossible. If a catch is intentionally degrading (e.g. returning empty data when a sandbox is expired), it must still log so operators can see what happened. Use `catch (error) { logActorWarning(..., { error: resolveErrorMessage(error) }); }` or equivalent.
## RivetKit Dependency Policy
@ -208,8 +218,9 @@ For all Rivet/RivetKit implementation:
- Do not add custom backend REST endpoints (no `/v1/*` shim layer).
- We own the sandbox-agent project; treat sandbox-agent defects as first-party bugs and fix them instead of working around them.
- Keep strict single-writer ownership: each table/row has exactly one actor writer.
- Parent actors (`organization`, `repository`, `task`, `history`, `sandbox-instance`) use command-only loops with no timeout.
- Parent actors (`organization`, `task`, `sandbox-instance`) use command-only loops with no timeout.
- Periodic syncing lives in dedicated child actors with one timeout cadence each.
- **Task actors must be created lazily** — never during sync or bulk operations. PR sync writes virtual entries to the org's local `taskIndex`/`taskSummaries` tables. The task actor is created on first user interaction via `getOrCreate`. See `packages/backend/CLAUDE.md` "Lazy Task Actor Creation" for details.
- Do not build blocking flows that wait on external systems to become ready or complete. Prefer push-based progression driven by actor messages, events, webhooks, or queue/workflow state changes.
- Use workflows/background commands for any repo sync, sandbox provisioning, agent install, branch restack/rebase, or other multi-step external work. Do not keep user-facing actions/requests open while that work runs.
- `send` policy: always `await` the `send(...)` call itself so enqueue failures surface immediately, but default to `wait: false`.

View file

@ -49,6 +49,43 @@ OrganizationActor (coordinator for tasks + auth users)
When adding a new index table, annotate it in the schema file with a doc comment identifying it as a coordinator index and which child actor it indexes (see existing examples).
## Lazy Task Actor Creation — CRITICAL
**Task actors must NEVER be created during GitHub sync or bulk operations.** Creating hundreds of task actors simultaneously causes OOM crashes. An org can have 200+ PRs; spawning an actor per PR kills the process.
### The two creation points
There are exactly **two** places that may create a task actor:
1. **`createTaskMutation`** in `task-mutations.ts` — the only backend code that calls `getOrCreateTask`. Triggered by explicit user action ("New Task" button). One actor at a time.
2. **`backend-client.ts` client helper** — calls `client.task.getOrCreate(...)`. This is the lazy materialization point: when a user clicks a virtual task in the sidebar, the client creates the actor, and it self-initializes in `getCurrentRecord()` (`workflow/common.ts`) by reading branch/title from the org's `getTaskIndexEntry` action.
### The rule
### The rule
**Never use `getOrCreateTask` inside a sync loop, webhook handler, or any bulk operation.** That's what caused the OOM — 186 actors spawned simultaneously during PR sync.
`getOrCreateTask` IS allowed in:
- `createTaskMutation` — explicit user "New Task" action
- `requireWorkspaceTask` — user-initiated actions (createSession, sendMessage, etc.) that may hit a virtual task
- `getTask` action on the org — called by sandbox actor and client, needs to materialize virtual tasks
- `backend-client.ts` client helper — lazy materialization when user views a task
### Virtual tasks (PR-driven)
During PR sync, `refreshTaskSummaryForBranchMutation` is called for every changed PR (via github-data's `emitPullRequestChangeEvents`). It writes **virtual task entries** to the org actor's local `taskIndex` + `taskSummaries` tables only. No task actor is spawned. No cross-actor calls to task actors.
When the user interacts with a virtual task (clicks it, creates a session):
1. Client or org actor calls `getOrCreate` on the task actor key → actor is created with empty DB
2. Any action on the actor calls `getCurrentRecord()` → sees empty DB → reads branch/title from org's `getTaskIndexEntry` → calls `initBootstrapDbActivity` + `initCompleteActivity` → task is now real
### Call sites to watch
- `refreshTaskSummaryForBranchMutation` — called in bulk during sync. Must ONLY write to org local tables. Never create task actors or call task actor actions.
- `emitPullRequestChangeEvents` in github-data — iterates all changed PRs. Must remain fire-and-forget with no actor fan-out.
## Ownership Rules
- `OrganizationActor` is the organization coordinator, direct coordinator for tasks, and lookup/index owner. It owns the task index, task summaries, and repo catalog.

View file

@ -1,10 +1,9 @@
// @ts-nocheck
import { and, desc, eq } from "drizzle-orm";
import { actor, queue } from "rivetkit";
import { actor } from "rivetkit";
import type { AuditLogEvent } from "@sandbox-agent/foundry-shared";
import { auditLogDb } from "./db/db.js";
import { events } from "./db/schema.js";
import { AUDIT_LOG_QUEUE_NAMES, runAuditLogCommandLoop } from "./workflow.js";
export interface AuditLogInput {
organizationId: string;
@ -36,7 +35,6 @@ export interface ListAuditLogParams {
*/
export const auditLog = actor({
db: auditLogDb,
queues: Object.fromEntries(AUDIT_LOG_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "Audit Log",
icon: "database",
@ -45,6 +43,22 @@ export const auditLog = actor({
organizationId: input.organizationId,
}),
actions: {
async append(c, body: AppendAuditLogCommand): Promise<{ ok: true }> {
const now = Date.now();
await c.db
.insert(events)
.values({
repoId: body.repoId ?? null,
taskId: body.taskId ?? null,
branchName: body.branchName ?? null,
kind: body.kind,
payloadJson: JSON.stringify(body.payload),
createdAt: now,
})
.run();
return { ok: true };
},
async list(c, params?: ListAuditLogParams): Promise<AuditLogEvent[]> {
const whereParts = [];
if (params?.repoId) {
@ -81,5 +95,4 @@ export const auditLog = actor({
}));
},
},
run: runAuditLogCommandLoop,
});

View file

@ -1,38 +0,0 @@
// @ts-nocheck
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { events } from "./db/schema.js";
import type { AppendAuditLogCommand } from "./index.js";
export const AUDIT_LOG_QUEUE_NAMES = ["auditLog.command.append"] as const;
async function appendAuditLogRow(c: any, body: AppendAuditLogCommand): Promise<void> {
const now = Date.now();
await c.db
.insert(events)
.values({
repoId: body.repoId ?? null,
taskId: body.taskId ?? null,
branchName: body.branchName ?? null,
kind: body.kind,
payloadJson: JSON.stringify(body.payload),
createdAt: now,
})
.run();
}
export async function runAuditLogCommandLoop(c: any): Promise<void> {
for await (const msg of c.queue.iter({ names: [...AUDIT_LOG_QUEUE_NAMES], completable: true })) {
try {
if (msg.name === "auditLog.command.append") {
await appendAuditLogRow(c, msg.body as AppendAuditLogCommand);
await msg.complete({ ok: true });
continue;
}
await msg.complete({ error: `Unknown command: ${msg.name}` });
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("auditLog", "audit-log command failed", { queueName: msg.name, error: message });
await msg.complete({ error: message }).catch(() => {});
}
}
}

View file

@ -1,17 +1,15 @@
// @ts-nocheck
import { eq, inArray } from "drizzle-orm";
import { actor, queue } from "rivetkit";
import { actor } from "rivetkit";
import type { FoundryOrganization } from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js";
import { getOrCreateOrganization, getTask } from "../handles.js";
import { repoIdFromRemote } from "../../services/repo.js";
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
import { expectQueueResponse } from "../../services/queue.js";
import { organizationWorkflowQueueName } from "../organization/queues.js";
import { taskWorkflowQueueName } from "../task/workflow/index.js";
// actions called directly (no queue)
import { githubDataDb } from "./db/db.js";
import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js";
import { GITHUB_DATA_QUEUE_NAMES, runGithubDataCommandLoop } from "./workflow.js";
// workflow.ts is no longer used — commands are actions now
const META_ROW_ID = 1;
const SYNC_REPOSITORY_BATCH_SIZE = 10;
@ -76,9 +74,7 @@ interface ClearStateInput {
label: string;
}
async function sendOrganizationCommand(organization: any, name: Parameters<typeof organizationWorkflowQueueName>[0], body: unknown): Promise<void> {
await expectQueueResponse<{ ok: true }>(await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 }));
}
// sendOrganizationCommand removed — org actions called directly
interface PullRequestWebhookInput {
connectedAccount: string;
@ -213,7 +209,7 @@ async function writeMeta(c: any, patch: Partial<GithubMetaState>) {
async function publishSyncProgress(c: any, patch: Partial<GithubMetaState>): Promise<GithubMetaState> {
const meta = await writeMeta(c, patch);
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await sendOrganizationCommand(organization, "organization.command.github.sync_progress.apply", {
await organization.commandApplyGithubSyncProgress({
connectedAccount: meta.connectedAccount,
installationStatus: meta.installationStatus,
installationId: meta.installationId,
@ -229,21 +225,29 @@ async function publishSyncProgress(c: any, patch: Partial<GithubMetaState>): Pro
}
async function getOrganizationContext(c: any, overrides?: FullSyncInput) {
// Try to read the org profile for fallback values, but don't require it.
// Webhook-triggered syncs can arrive before the user signs in and creates the
// org profile row. The webhook callers already pass the necessary overrides
// (connectedAccount, installationId, githubLogin, kind), so we can proceed
// without the profile as long as overrides cover the required fields.
const organizationHandle = await getOrCreateOrganization(c, c.state.organizationId);
const organizationState = await organizationHandle.getOrganizationShellStateIfInitialized({});
if (!organizationState) {
throw new Error(`Organization ${c.state.organizationId} is not initialized`);
// If the org profile doesn't exist and overrides don't provide enough context, fail.
if (!organizationState && !overrides?.connectedAccount) {
throw new Error(`Organization ${c.state.organizationId} is not initialized and no override context was provided`);
}
const auth = await resolveOrganizationGithubAuth(c, c.state.organizationId);
return {
kind: overrides?.kind ?? organizationState.snapshot.kind,
githubLogin: overrides?.githubLogin ?? organizationState.githubLogin,
connectedAccount: overrides?.connectedAccount ?? organizationState.snapshot.github.connectedAccount ?? organizationState.githubLogin,
installationId: overrides?.installationId ?? organizationState.githubInstallationId ?? null,
kind: overrides?.kind ?? organizationState?.snapshot.kind,
githubLogin: overrides?.githubLogin ?? organizationState?.githubLogin,
connectedAccount: overrides?.connectedAccount ?? organizationState?.snapshot.github.connectedAccount ?? organizationState?.githubLogin,
installationId: overrides?.installationId ?? organizationState?.githubInstallationId ?? null,
installationStatus:
overrides?.installationStatus ??
organizationState.snapshot.github.installationStatus ??
(organizationState.snapshot.kind === "personal" ? "connected" : "reconnect_required"),
organizationState?.snapshot.github.installationStatus ??
(organizationState?.snapshot.kind === "personal" ? "connected" : "reconnect_required"),
accessToken: overrides?.accessToken ?? auth?.githubToken ?? null,
};
}
@ -420,11 +424,7 @@ async function refreshTaskSummaryForBranch(c: any, repoId: string, branchName: s
return;
}
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await organization.send(
organizationWorkflowQueueName("organization.command.refreshTaskSummaryForBranch"),
{ repoId, branchName, pullRequest },
{ wait: false },
);
void organization.commandRefreshTaskSummaryForBranch({ repoId, branchName, pullRequest, repoName: repositoryRecord.fullName ?? undefined }).catch(() => {});
}
async function emitPullRequestChangeEvents(c: any, beforeRows: any[], afterRows: any[]) {
@ -472,7 +472,7 @@ async function autoArchiveTaskForClosedPullRequest(c: any, row: any) {
}
try {
const task = getTask(c, c.state.organizationId, row.repoId, match.taskId);
await task.send(taskWorkflowQueueName("task.command.archive"), { reason: `PR ${String(row.state).toLowerCase()}` }, { wait: false });
void task.archive({ reason: `PR ${String(row.state).toLowerCase()}` }).catch(() => {});
} catch {
// Best-effort only. Task summary refresh will still clear the PR state.
}
@ -721,7 +721,11 @@ export async function fullSyncBranchBatch(c: any, config: FullSyncConfig, batchI
if (batchIndex >= batches.length) return true;
const batch = batches[batchIndex]!;
const context = await getOrganizationContext(c);
const context = await getOrganizationContext(c, {
connectedAccount: config.connectedAccount,
installationStatus: config.installationStatus as any,
installationId: config.installationId,
});
const batchBranches = (await Promise.all(batch.map((repo) => listRepositoryBranchesForContext(context, repo)))).flat();
await upsertBranches(c, batchBranches, config.startedAt, config.syncGeneration);
@ -757,7 +761,11 @@ export async function fullSyncMembers(c: any, config: FullSyncConfig): Promise<v
totalRepositoryCount: config.totalRepositoryCount,
});
const context = await getOrganizationContext(c);
const context = await getOrganizationContext(c, {
connectedAccount: config.connectedAccount,
installationStatus: config.installationStatus as any,
installationId: config.installationId,
});
const members = await resolveMembers(c, context);
await upsertMembers(c, members, config.startedAt, config.syncGeneration);
await sweepMembers(c, config.syncGeneration);
@ -773,7 +781,11 @@ export async function fullSyncPullRequestBatch(c: any, config: FullSyncConfig, b
if (batchIndex >= batches.length) return true;
const batch = batches[batchIndex]!;
const context = await getOrganizationContext(c);
const context = await getOrganizationContext(c, {
connectedAccount: config.connectedAccount,
installationStatus: config.installationStatus as any,
installationId: config.installationId,
});
const batchPRs = await listPullRequestsForRepositories(context, batch);
await upsertPullRequests(c, batchPRs, config.syncGeneration);
@ -801,7 +813,7 @@ export async function fullSyncFinalize(c: any, config: FullSyncConfig): Promise<
await sweepPullRequests(c, config.syncGeneration);
await sweepRepositories(c, config.syncGeneration);
await writeMeta(c, {
await publishSyncProgress(c, {
connectedAccount: config.connectedAccount,
installationStatus: config.installationStatus,
installationId: config.installationId,
@ -867,16 +879,14 @@ export async function fullSyncError(c: any, error: unknown): Promise<void> {
export const githubData = actor({
db: githubDataDb,
queues: Object.fromEntries(GITHUB_DATA_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "GitHub Data",
icon: "github",
actionTimeout: 5 * 60_000,
actionTimeout: 10 * 60_000,
},
createState: (_c, input: GithubDataInput) => ({
organizationId: input.organizationId,
}),
run: runGithubDataCommandLoop,
actions: {
async getSummary(c) {
const repositories = await c.db.select().from(githubRepositories).all();
@ -935,6 +945,34 @@ export const githubData = actor({
}))
.sort((left, right) => left.branchName.localeCompare(right.branchName));
},
async syncRepos(c, body: any) {
try {
await runFullSync(c, body);
return { ok: true };
} catch (error) {
try {
await fullSyncError(c, error);
} catch {
/* best effort */
}
throw error;
}
},
async reloadRepository(c, body: { repoId: string }) {
return await reloadRepositoryMutation(c, body);
},
async clearState(c, body: any) {
await clearStateMutation(c, body);
return { ok: true };
},
async handlePullRequestWebhook(c, body: any) {
await handlePullRequestWebhookMutation(c, body);
return { ok: true };
},
},
});

View file

@ -2,16 +2,15 @@ import { desc } from "drizzle-orm";
import type { FoundryAppSnapshot } from "@sandbox-agent/foundry-shared";
import { getOrCreateGithubData, getOrCreateOrganization } from "../../handles.js";
import { authSessionIndex } from "../db/schema.js";
import { githubDataWorkflowQueueName } from "../../github-data/workflow.js";
import {
assertAppOrganization,
buildAppSnapshot,
requireEligibleOrganization,
requireSignedInSession,
markOrganizationSyncStartedMutation,
} from "../app-shell.js";
import { getBetterAuthService } from "../../../services/better-auth.js";
import { expectQueueResponse } from "../../../services/queue.js";
import { organizationWorkflowQueueName } from "../queues.js";
import { refreshOrganizationSnapshotMutation } from "../actions.js";
export const organizationGithubActions = {
async resolveAppGithubToken(
@ -59,33 +58,21 @@ export const organizationGithubActions = {
}
const organizationHandle = await getOrCreateOrganization(c, input.organizationId);
await expectQueueResponse<{ ok: true }>(
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.shell.sync_started.mark"),
{ label: "Importing repository catalog..." },
{ wait: true, timeout: 10_000 },
),
);
await expectQueueResponse<{ ok: true }>(
await organizationHandle.send(organizationWorkflowQueueName("organization.command.snapshot.broadcast"), {}, { wait: true, timeout: 10_000 }),
);
await organizationHandle.commandMarkSyncStarted({ label: "Importing repository catalog..." });
await organizationHandle.commandBroadcastSnapshot({});
await githubData.send("githubData.command.syncRepos", { label: "Importing repository catalog..." }, { wait: false });
void githubData.syncRepos({ label: "Importing repository catalog..." }).catch(() => {});
return await buildAppSnapshot(c, input.sessionId);
},
async adminReloadGithubOrganization(c: any): Promise<void> {
const githubData = await getOrCreateGithubData(c, c.state.organizationId);
await expectQueueResponse<{ ok: true }>(
await githubData.send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Reloading GitHub organization..." }, { wait: true, timeout: 10_000 }),
);
await githubData.syncRepos({ label: "Reloading GitHub organization..." });
},
async adminReloadGithubRepository(c: any, input: { repoId: string }): Promise<void> {
const githubData = await getOrCreateGithubData(c, c.state.organizationId);
await expectQueueResponse<unknown>(
await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), input, { wait: true, timeout: 10_000 }),
);
await githubData.reloadRepository(input);
},
};

View file

@ -1,7 +1,7 @@
import type { FoundryAppSnapshot, UpdateFoundryOrganizationProfileInput, WorkspaceModelId } from "@sandbox-agent/foundry-shared";
import { getBetterAuthService } from "../../../services/better-auth.js";
import { getOrCreateOrganization } from "../../handles.js";
import { expectQueueResponse } from "../../../services/queue.js";
// actions called directly (no queue)
import {
assertAppOrganization,
assertOrganizationShell,
@ -11,7 +11,7 @@ import {
requireEligibleOrganization,
requireSignedInSession,
} from "../app-shell.js";
import { organizationWorkflowQueueName } from "../queues.js";
// org queue names removed — using direct actions
export const organizationShellActions = {
async getAppSnapshot(c: any, input: { sessionId: string }): Promise<FoundryAppSnapshot> {
@ -35,17 +35,11 @@ export const organizationShellActions = {
const session = await requireSignedInSession(c, input.sessionId);
requireEligibleOrganization(session, input.organizationId);
const organization = await getOrCreateOrganization(c, input.organizationId);
await expectQueueResponse<{ ok: true }>(
await organization.send(
organizationWorkflowQueueName("organization.command.shell.profile.update"),
{
displayName: input.displayName,
slug: input.slug,
primaryDomain: input.primaryDomain,
},
{ wait: true, timeout: 10_000 },
),
);
await organization.commandUpdateShellProfile({
displayName: input.displayName,
slug: input.slug,
primaryDomain: input.primaryDomain,
});
return await buildAppSnapshot(c, input.sessionId);
},

View file

@ -12,9 +12,9 @@ import type {
} from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../../context.js";
import { getGithubData, getOrCreateAuditLog, getOrCreateTask, getTask } from "../../handles.js";
import { taskWorkflowQueueName } from "../../task/workflow/index.js";
// task actions called directly (no queue)
import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../../services/create-flow.js";
import { expectQueueResponse } from "../../../services/queue.js";
// actions return directly (no queue response unwrapping)
import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js";
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
import { taskIndex, taskSummaries } from "../db/schema.js";
@ -128,6 +128,16 @@ async function resolveRepositoryRemoteUrl(c: any, repoId: string): Promise<strin
return remoteUrl;
}
/**
* The ONLY backend code path that creates a task actor via getOrCreateTask.
* Called when a user explicitly creates a new task (not during sync/webhooks).
*
* All other code must use getTask (handles.ts) which calls .get() and will
* error if the actor doesn't exist. Virtual tasks created during PR sync
* are materialized lazily by the client's getOrCreate in backend-client.ts.
*
* NEVER call this from a sync loop or webhook handler.
*/
export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promise<TaskRecord> {
const organizationId = c.state.organizationId;
const repoId = cmd.repoId;
@ -188,21 +198,12 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis
throw error;
}
const created = await expectQueueResponse<TaskRecord>(
await taskHandle.send(
taskWorkflowQueueName("task.command.initialize"),
{
sandboxProviderId: cmd.sandboxProviderId,
branchName: initialBranchName,
title: initialTitle,
task: cmd.task,
},
{
wait: true,
timeout: 10_000,
},
),
);
const created = await taskHandle.initialize({
sandboxProviderId: cmd.sandboxProviderId,
branchName: initialBranchName,
title: initialTitle,
task: cmd.task,
});
try {
await upsertTaskSummary(c, await taskHandle.getTaskSummary({}));
@ -217,21 +218,15 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis
}
const auditLog = await getOrCreateAuditLog(c, organizationId);
await auditLog.send(
"auditLog.command.append",
{
kind: "task.created",
void auditLog.append({
kind: "task.created",
repoId,
taskId,
payload: {
repoId,
taskId,
payload: {
repoId,
sandboxProviderId: cmd.sandboxProviderId,
},
sandboxProviderId: cmd.sandboxProviderId,
},
{
wait: false,
},
);
});
try {
const taskSummary = await taskHandle.getTaskSummary({});
@ -319,9 +314,15 @@ export async function removeTaskSummaryMutation(c: any, input: { taskId: string
await refreshOrganizationSnapshotMutation(c);
}
/**
* Called for every changed PR during sync and on webhook PR events.
* Runs in a bulk loop MUST NOT create task actors or make cross-actor calls
* to task actors. Only writes to the org's local taskIndex/taskSummaries tables.
* Task actors are created lazily when the user views the task.
*/
export async function refreshTaskSummaryForBranchMutation(
c: any,
input: { repoId: string; branchName: string; pullRequest?: WorkspacePullRequestSummary | null },
input: { repoId: string; branchName: string; pullRequest?: WorkspacePullRequestSummary | null; repoName?: string },
): Promise<void> {
const pullRequest = input.pullRequest ?? null;
let rows = await c.db
@ -331,34 +332,62 @@ export async function refreshTaskSummaryForBranchMutation(
.all();
if (rows.length === 0 && pullRequest) {
const { config } = getActorRuntimeContext();
const created = await createTaskMutation(c, {
repoId: input.repoId,
task: pullRequest.title?.trim() || `Review ${input.branchName}`,
sandboxProviderId: defaultSandboxProviderId(config),
explicitTitle: pullRequest.title?.trim() || input.branchName,
explicitBranchName: null,
onBranch: input.branchName,
});
rows = [{ taskId: created.taskId }];
}
// Create a virtual task entry in the org's local tables only.
// No task actor is spawned — it will be created lazily when the user
// clicks on the task in the sidebar (the "materialize" path).
const taskId = randomUUID();
const now = Date.now();
const title = pullRequest.title?.trim() || input.branchName;
const repoName = input.repoName ?? `${c.state.organizationId}/${input.repoId}`;
for (const row of rows) {
try {
const task = getTask(c, c.state.organizationId, input.repoId, row.taskId);
await expectQueueResponse<{ ok: true }>(
await task.send(taskWorkflowQueueName("task.command.pull_request.sync"), { pullRequest }, { wait: true, timeout: 10_000 }),
);
} catch (error) {
logActorWarning("organization", "failed refreshing task summary for branch", {
organizationId: c.state.organizationId,
await c.db
.insert(taskIndex)
.values({ taskId, repoId: input.repoId, branchName: input.branchName, createdAt: now, updatedAt: now })
.onConflictDoNothing()
.run();
await c.db
.insert(taskSummaries)
.values({
taskId,
repoId: input.repoId,
branchName: input.branchName,
taskId: row.taskId,
error: resolveErrorMessage(error),
});
title,
status: "init_complete",
repoName,
updatedAtMs: pullRequest.updatedAtMs ?? now,
branch: input.branchName,
pullRequestJson: JSON.stringify(pullRequest),
sessionsSummaryJson: "[]",
})
.onConflictDoNothing()
.run();
rows = [{ taskId }];
} else {
// Update PR data on existing task summaries locally.
// If a real task actor exists, also notify it.
for (const row of rows) {
// Update the local summary with the new PR data
await c.db
.update(taskSummaries)
.set({
pullRequestJson: pullRequest ? JSON.stringify(pullRequest) : null,
updatedAtMs: pullRequest?.updatedAtMs ?? Date.now(),
})
.where(eq(taskSummaries.taskId, row.taskId))
.run();
// Best-effort notify the task actor if it exists (fire-and-forget)
try {
const task = getTask(c, c.state.organizationId, input.repoId, row.taskId);
void task.pullRequestSync({ pullRequest }).catch(() => {});
} catch {
// Task actor doesn't exist yet — that's fine, it's virtual
}
}
}
await refreshOrganizationSnapshotMutation(c);
}
export function sortOverviewBranches(

View file

@ -21,12 +21,10 @@ import type {
TaskWorkspaceUpdateDraftInput,
} from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../../context.js";
import { getOrCreateAuditLog, getOrCreateGithubData, getTask as getTaskHandle, selfOrganization } from "../../handles.js";
import { getOrCreateAuditLog, getOrCreateTask, getTask as getTaskHandle } from "../../handles.js";
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
import { expectQueueResponse } from "../../../services/queue.js";
import { logActorWarning, resolveErrorMessage } from "../../logging.js";
import { taskWorkflowQueueName } from "../../task/workflow/index.js";
import { organizationWorkflowQueueName } from "../queues.js";
import { taskIndex, taskSummaries } from "../db/schema.js";
import {
createTaskMutation,
getRepoOverviewFromOrg,
@ -42,16 +40,35 @@ function assertOrganization(c: { state: { organizationId: string } }, organizati
}
}
async function requireRepoExists(c: any, repoId: string): Promise<void> {
const githubData = await getOrCreateGithubData(c, c.state.organizationId);
const repo = await githubData.getRepository({ repoId });
if (!repo) {
throw new Error(`Unknown repo: ${repoId}`);
/**
* Look up the repoId for a task from the local task index.
* Used when callers (e.g. sandbox actor) only have taskId but need repoId
* to construct the task actor key.
*/
async function resolveTaskRepoId(c: any, taskId: string): Promise<string> {
const row = await c.db.select({ repoId: taskIndex.repoId }).from(taskIndex).where(eq(taskIndex.taskId, taskId)).get();
if (!row) {
throw new Error(`Task ${taskId} not found in task index`);
}
return row.repoId;
}
/**
* Get or lazily create a task actor for a user-initiated action.
* Uses getOrCreate because the user may be interacting with a virtual task
* (PR-driven) that has no actor yet. The task actor self-initializes in
* getCurrentRecord() from the org's getTaskIndexEntry data.
*
* This is safe because requireWorkspaceTask is only called from user-initiated
* actions (createSession, sendMessage, etc.), never from sync loops.
* See CLAUDE.md "Lazy Task Actor Creation".
*/
async function requireWorkspaceTask(c: any, repoId: string, taskId: string) {
return getTaskHandle(c, c.state.organizationId, repoId, taskId);
return getOrCreateTask(c, c.state.organizationId, repoId, taskId, {
organizationId: c.state.organizationId,
repoId,
taskId,
});
}
interface GetTaskInput {
@ -76,46 +93,30 @@ export const organizationTaskActions = {
assertOrganization(c, input.organizationId);
const { config } = getActorRuntimeContext();
const sandboxProviderId = input.sandboxProviderId ?? defaultSandboxProviderId(config);
await requireRepoExists(c, input.repoId);
const self = selfOrganization(c);
return expectQueueResponse<TaskRecord>(
await self.send(
organizationWorkflowQueueName("organization.command.createTask"),
{
repoId: input.repoId,
task: input.task,
sandboxProviderId,
explicitTitle: input.explicitTitle ?? null,
explicitBranchName: input.explicitBranchName ?? null,
onBranch: input.onBranch ?? null,
},
{
wait: true,
timeout: 10_000,
},
),
);
// Self-call: call the mutation directly since we're inside the org actor
return await createTaskMutation(c, {
repoId: input.repoId,
task: input.task,
sandboxProviderId,
explicitTitle: input.explicitTitle ?? null,
explicitBranchName: input.explicitBranchName ?? null,
onBranch: input.onBranch ?? null,
});
},
async materializeTask(c: any, input: { organizationId: string; repoId: string; virtualTaskId: string }): Promise<TaskRecord> {
assertOrganization(c, input.organizationId);
const { config } = getActorRuntimeContext();
const self = selfOrganization(c);
return expectQueueResponse<TaskRecord>(
await self.send(
organizationWorkflowQueueName("organization.command.materializeTask"),
{
repoId: input.repoId,
task: input.virtualTaskId,
sandboxProviderId: defaultSandboxProviderId(config),
explicitTitle: null,
explicitBranchName: null,
onBranch: null,
},
{ wait: true, timeout: 10_000 },
),
);
// Self-call: call the mutation directly
return await createTaskMutation(c, {
repoId: input.repoId,
task: input.virtualTaskId,
sandboxProviderId: defaultSandboxProviderId(config),
explicitTitle: null,
explicitBranchName: null,
onBranch: null,
});
},
async createWorkspaceTask(c: any, input: TaskWorkspaceCreateTaskInput): Promise<{ taskId: string; sessionId?: string }> {
@ -128,171 +129,117 @@ export const organizationTaskActions = {
});
const task = await requireWorkspaceTask(c, input.repoId, created.taskId);
await task.send(
taskWorkflowQueueName("task.command.workspace.create_session_and_send"),
{
void task
.createSessionAndSend({
model: input.model,
text: input.task,
authSessionId: input.authSessionId,
},
{ wait: false },
);
})
.catch(() => {});
return { taskId: created.taskId };
},
async markWorkspaceUnread(c: any, input: TaskWorkspaceSelectInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await expectQueueResponse<{ ok: true }>(
await task.send(taskWorkflowQueueName("task.command.workspace.mark_unread"), { authSessionId: input.authSessionId }, { wait: true, timeout: 10_000 }),
);
await task.markUnread({ authSessionId: input.authSessionId });
},
async renameWorkspaceTask(c: any, input: TaskWorkspaceRenameInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await expectQueueResponse<{ ok: true }>(
await task.send(taskWorkflowQueueName("task.command.workspace.rename_task"), { value: input.value }, { wait: true, timeout: 20_000 }),
);
await task.renameTask({ value: input.value });
},
async createWorkspaceSession(c: any, input: TaskWorkspaceSelectInput & { model?: string }): Promise<{ sessionId: string }> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
return await expectQueueResponse<{ sessionId: string }>(
await task.send(
taskWorkflowQueueName("task.command.workspace.create_session"),
{
...(input.model ? { model: input.model } : {}),
...(input.authSessionId ? { authSessionId: input.authSessionId } : {}),
},
{ wait: true, timeout: 10_000 },
),
);
return await task.createSession({
...(input.model ? { model: input.model } : {}),
...(input.authSessionId ? { authSessionId: input.authSessionId } : {}),
});
},
async renameWorkspaceSession(c: any, input: TaskWorkspaceRenameSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await expectQueueResponse<{ ok: true }>(
await task.send(
taskWorkflowQueueName("task.command.workspace.rename_session"),
{ sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId },
{ wait: true, timeout: 10_000 },
),
);
await task.renameSession({ sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId });
},
async selectWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await expectQueueResponse<{ ok: true }>(
await task.send(
taskWorkflowQueueName("task.command.workspace.select_session"),
{ sessionId: input.sessionId, authSessionId: input.authSessionId },
{ wait: true, timeout: 10_000 },
),
);
await task.selectSession({ sessionId: input.sessionId, authSessionId: input.authSessionId });
},
async setWorkspaceSessionUnread(c: any, input: TaskWorkspaceSetSessionUnreadInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await expectQueueResponse<{ ok: true }>(
await task.send(
taskWorkflowQueueName("task.command.workspace.set_session_unread"),
{ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId },
{ wait: true, timeout: 10_000 },
),
);
await task.setSessionUnread({ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId });
},
async updateWorkspaceDraft(c: any, input: TaskWorkspaceUpdateDraftInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(
taskWorkflowQueueName("task.command.workspace.update_draft"),
{
void task
.updateDraft({
sessionId: input.sessionId,
text: input.text,
attachments: input.attachments,
authSessionId: input.authSessionId,
},
{ wait: false },
);
})
.catch(() => {});
},
async changeWorkspaceModel(c: any, input: TaskWorkspaceChangeModelInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await expectQueueResponse<{ ok: true }>(
await task.send(
taskWorkflowQueueName("task.command.workspace.change_model"),
{ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId },
{ wait: true, timeout: 10_000 },
),
);
await task.changeModel({ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId });
},
async sendWorkspaceMessage(c: any, input: TaskWorkspaceSendMessageInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(
taskWorkflowQueueName("task.command.workspace.send_message"),
{
void task
.sendMessage({
sessionId: input.sessionId,
text: input.text,
attachments: input.attachments,
authSessionId: input.authSessionId,
},
{ wait: false },
);
})
.catch(() => {});
},
async stopWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(
taskWorkflowQueueName("task.command.workspace.stop_session"),
{ sessionId: input.sessionId, authSessionId: input.authSessionId },
{ wait: false },
);
void task.stopSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {});
},
async closeWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(
taskWorkflowQueueName("task.command.workspace.close_session"),
{ sessionId: input.sessionId, authSessionId: input.authSessionId },
{ wait: false },
);
void task.closeSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {});
},
async publishWorkspacePr(c: any, input: TaskWorkspaceSelectInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(taskWorkflowQueueName("task.command.workspace.publish_pr"), {}, { wait: false });
void task.publishPr({}).catch(() => {});
},
async revertWorkspaceFile(c: any, input: TaskWorkspaceDiffInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.send(taskWorkflowQueueName("task.command.workspace.revert_file"), input, { wait: false });
void task.revertFile(input).catch(() => {});
},
async getRepoOverview(c: any, input: RepoOverviewInput): Promise<RepoOverview> {
assertOrganization(c, input.organizationId);
await requireRepoExists(c, input.repoId);
return await getRepoOverviewFromOrg(c, input.repoId);
},
async listTasks(c: any, input: ListTasksInput): Promise<TaskSummary[]> {
assertOrganization(c, input.organizationId);
if (input.repoId) {
return await listTaskSummariesForRepo(c, input.repoId, true);
}
return await listAllTaskSummaries(c, true);
},
async switchTask(c: any, input: { repoId: string; taskId: string }): Promise<SwitchResult> {
await requireRepoExists(c, input.repoId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
const record = await h.get();
const switched = await expectQueueResponse<{ switchTarget: string }>(
await h.send(taskWorkflowQueueName("task.command.switch"), {}, { wait: true, timeout: 10_000 }),
);
const switched = await h.switchTask({});
return {
organizationId: c.state.organizationId,
taskId: input.taskId,
@ -303,7 +250,6 @@ export const organizationTaskActions = {
async auditLog(c: any, input: HistoryQueryInput): Promise<AuditLogEvent[]> {
assertOrganization(c, input.organizationId);
const auditLog = await getOrCreateAuditLog(c, c.state.organizationId);
return await auditLog.list({
repoId: input.repoId,
@ -315,52 +261,58 @@ export const organizationTaskActions = {
async getTask(c: any, input: GetTaskInput): Promise<TaskRecord> {
assertOrganization(c, input.organizationId);
await requireRepoExists(c, input.repoId);
return await getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId).get();
// Resolve repoId from local task index if not provided (e.g. sandbox actor only has taskId)
const repoId = input.repoId || (await resolveTaskRepoId(c, input.taskId));
// Use getOrCreate — the task may be virtual (PR-driven, no actor yet).
// The task actor self-initializes in getCurrentRecord().
const handle = await getOrCreateTask(c, c.state.organizationId, repoId, input.taskId, {
organizationId: c.state.organizationId,
repoId,
taskId: input.taskId,
});
return await handle.get();
},
async attachTask(c: any, input: TaskProxyActionInput): Promise<{ target: string; sessionId: string | null }> {
assertOrganization(c, input.organizationId);
await requireRepoExists(c, input.repoId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
return await expectQueueResponse<{ target: string; sessionId: string | null }>(
await h.send(taskWorkflowQueueName("task.command.attach"), { reason: input.reason }, { wait: true, timeout: 10_000 }),
);
return await h.attach({ reason: input.reason });
},
async pushTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
await requireRepoExists(c, input.repoId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
await h.send(taskWorkflowQueueName("task.command.push"), { reason: input.reason }, { wait: false });
void h.push({ reason: input.reason }).catch(() => {});
},
async syncTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
await requireRepoExists(c, input.repoId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
await h.send(taskWorkflowQueueName("task.command.sync"), { reason: input.reason }, { wait: false });
void h.sync({ reason: input.reason }).catch(() => {});
},
async mergeTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
await requireRepoExists(c, input.repoId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
await h.send(taskWorkflowQueueName("task.command.merge"), { reason: input.reason }, { wait: false });
void h.merge({ reason: input.reason }).catch(() => {});
},
async archiveTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
await requireRepoExists(c, input.repoId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
await h.send(taskWorkflowQueueName("task.command.archive"), { reason: input.reason }, { wait: false });
void h.archive({ reason: input.reason }).catch(() => {});
},
async killTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
await requireRepoExists(c, input.repoId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
await h.send(taskWorkflowQueueName("task.command.kill"), { reason: input.reason }, { wait: false });
void h.kill({ reason: input.reason }).catch(() => {});
},
async getRepositoryMetadata(c: any, input: { repoId: string }): Promise<{ defaultBranch: string | null; fullName: string | null; remoteUrl: string }> {
@ -370,4 +322,19 @@ export const organizationTaskActions = {
async findTaskForBranch(c: any, input: { repoId: string; branchName: string }): Promise<{ taskId: string | null }> {
return await findTaskForBranch(c, input.repoId, input.branchName);
},
/**
* Lightweight read of task index + summary data. Used by the task actor
* to self-initialize when lazily materialized from a virtual task.
* Does NOT trigger materialization no circular dependency.
*/
async getTaskIndexEntry(c: any, input: { taskId: string }): Promise<{ branchName: string | null; title: string | null } | null> {
const idx = await c.db.select({ branchName: taskIndex.branchName }).from(taskIndex).where(eq(taskIndex.taskId, input.taskId)).get();
const summary = await c.db.select({ title: taskSummaries.title }).from(taskSummaries).where(eq(taskSummaries.taskId, input.taskId)).get();
if (!idx && !summary) return null;
return {
branchName: idx?.branchName ?? null,
title: summary?.title ?? null,
};
},
};

View file

@ -13,15 +13,12 @@ import type {
import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js";
import { getOrCreateGithubData, getOrCreateOrganization, selfOrganization } from "../handles.js";
import { githubDataWorkflowQueueName } from "../github-data/workflow.js";
import { GitHubAppError } from "../../services/app-github.js";
import { getBetterAuthService } from "../../services/better-auth.js";
import { expectQueueResponse } from "../../services/queue.js";
import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js";
import { logger } from "../../logging.js";
import { invoices, organizationMembers, organizationProfile, seatAssignments, stripeLookup } from "./db/schema.js";
import { APP_SHELL_ORGANIZATION_ID } from "./constants.js";
import { organizationWorkflowQueueName } from "./queues.js";
const githubWebhookLogger = logger.child({
scope: "github-webhook",
@ -142,13 +139,7 @@ function stripeWebhookSubscription(event: any) {
};
}
async function sendOrganizationCommand<TResponse>(
organization: any,
name: Parameters<typeof organizationWorkflowQueueName>[0],
body: unknown,
): Promise<TResponse> {
return expectQueueResponse<TResponse>(await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 }));
}
// sendOrganizationCommand removed — org actions called directly
export async function getOrganizationState(organization: any) {
return await organization.getOrganizationShellState({});
@ -491,7 +482,7 @@ async function syncGithubOrganizationsInternal(c: any, input: { sessionId: strin
const organizationId = organizationOrganizationId(account.kind, account.githubLogin);
const installation = installations.find((candidate) => candidate.accountLogin === account.githubLogin) ?? null;
const organization = await getOrCreateOrganization(c, organizationId);
await sendOrganizationCommand<{ organizationId: string }>(organization, "organization.command.github.organization_shell.sync_from_github", {
await organization.commandSyncOrganizationShellFromGithub({
userId: githubUserId,
userName: viewer.name || viewer.login,
userEmail: viewer.email ?? `${viewer.login}@users.noreply.github.com`,
@ -686,7 +677,7 @@ async function applySubscriptionState(
},
fallbackPlanId: FoundryBillingPlanId,
): Promise<void> {
await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.stripe_subscription.apply", {
await organization.commandApplyStripeSubscription({
subscription,
fallbackPlanId,
});
@ -702,7 +693,7 @@ export const organizationAppActions = {
const organizationState = await getOrganizationState(organizationHandle);
if (input.planId === "free") {
await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.free_plan.apply", {
await organizationHandle.commandApplyFreePlan({
clearSubscription: false,
});
return {
@ -723,7 +714,7 @@ export const organizationAppActions = {
email: session.currentUserEmail,
})
).id;
await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.stripe_customer.apply", {
await organizationHandle.commandApplyStripeCustomer({
customerId,
});
await upsertStripeLookupEntries(c, input.organizationId, customerId, null);
@ -753,7 +744,7 @@ export const organizationAppActions = {
const completion = await appShell.stripe.retrieveCheckoutCompletion(input.checkoutSessionId);
if (completion.customerId) {
await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.stripe_customer.apply", {
await organizationHandle.commandApplyStripeCustomer({
customerId: completion.customerId,
});
}
@ -765,7 +756,7 @@ export const organizationAppActions = {
}
if (completion.paymentMethodLabel) {
await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.payment_method.set", {
await organizationHandle.commandSetPaymentMethod({
label: completion.paymentMethodLabel,
});
}
@ -805,7 +796,7 @@ export const organizationAppActions = {
await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId);
await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id);
} else {
await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.status.set", {
await organizationHandle.commandSetBillingStatus({
status: "scheduled_cancel",
});
}
@ -826,7 +817,7 @@ export const organizationAppActions = {
await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId);
await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id);
} else {
await sendOrganizationCommand<{ ok: true }>(organizationHandle, "organization.command.billing.status.set", {
await organizationHandle.commandSetBillingStatus({
status: "active",
});
}
@ -839,7 +830,7 @@ export const organizationAppActions = {
const session = await requireSignedInSession(c, input.sessionId);
requireEligibleOrganization(session, input.organizationId);
const organization = await getOrCreateOrganization(c, input.organizationId);
await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.seat_usage.record", {
await organization.commandRecordSeatUsage({
email: session.currentUserEmail,
});
return await buildAppSnapshot(c, input.sessionId);
@ -862,7 +853,7 @@ export const organizationAppActions = {
if (organizationId) {
const organization = await getOrCreateOrganization(c, organizationId);
if (typeof object.customer === "string") {
await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.stripe_customer.apply", {
await organization.commandApplyStripeCustomer({
customerId: object.customer,
});
}
@ -897,7 +888,7 @@ export const organizationAppActions = {
const organizationId = await findOrganizationIdForStripeEvent(c, subscription.customerId, subscription.id);
if (organizationId) {
const organization = await getOrCreateOrganization(c, organizationId);
await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.free_plan.apply", {
await organization.commandApplyFreePlan({
clearSubscription: true,
});
}
@ -911,7 +902,7 @@ export const organizationAppActions = {
const organization = await getOrCreateOrganization(c, organizationId);
const rawAmount = typeof invoice.amount_paid === "number" ? invoice.amount_paid : invoice.amount_due;
const amountUsd = Math.round((typeof rawAmount === "number" ? rawAmount : 0) / 100);
await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.billing.invoice.upsert", {
await organization.commandUpsertInvoice({
id: String(invoice.id),
label: typeof invoice.number === "string" ? `Invoice ${invoice.number}` : "Stripe invoice",
issuedAt: formatUnixDate(typeof invoice.created === "number" ? invoice.created : Math.floor(Date.now() / 1000)),
@ -947,7 +938,7 @@ export const organizationAppActions = {
const organizationId = organizationOrganizationId(kind, accountLogin);
const receivedAt = Date.now();
const organization = await getOrCreateOrganization(c, organizationId);
await sendOrganizationCommand<{ ok: true }>(organization, "organization.command.github.webhook_receipt.record", {
await organization.commandRecordGithubWebhookReceipt({
organizationId: organizationId,
event,
action: body.action ?? null,
@ -966,61 +957,41 @@ export const organizationAppActions = {
"installation_event",
);
if (body.action === "deleted") {
await expectQueueResponse<{ ok: true }>(
await githubData.send(
githubDataWorkflowQueueName("githubData.command.clearState"),
{
connectedAccount: accountLogin,
installationStatus: "install_required",
installationId: null,
label: "GitHub App installation removed",
},
{ wait: true, timeout: 10_000 },
),
);
await githubData.clearState({
connectedAccount: accountLogin,
installationStatus: "install_required",
installationId: null,
label: "GitHub App installation removed",
});
} else if (body.action === "created") {
await expectQueueResponse<{ ok: true }>(
await githubData.send(
githubDataWorkflowQueueName("githubData.command.syncRepos"),
{
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Syncing GitHub data from installation webhook...",
},
{ wait: true, timeout: 10_000 },
),
);
void githubData
.syncRepos({
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Syncing GitHub data from installation webhook...",
})
.catch(() => {});
} else if (body.action === "suspend") {
await expectQueueResponse<{ ok: true }>(
await githubData.send(
githubDataWorkflowQueueName("githubData.command.clearState"),
{
connectedAccount: accountLogin,
installationStatus: "reconnect_required",
installationId: body.installation?.id ?? null,
label: "GitHub App installation suspended",
},
{ wait: true, timeout: 10_000 },
),
);
await githubData.clearState({
connectedAccount: accountLogin,
installationStatus: "reconnect_required",
installationId: body.installation?.id ?? null,
label: "GitHub App installation suspended",
});
} else if (body.action === "unsuspend") {
await expectQueueResponse<{ ok: true }>(
await githubData.send(
githubDataWorkflowQueueName("githubData.command.syncRepos"),
{
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Resyncing GitHub data after unsuspend...",
},
{ wait: true, timeout: 10_000 },
),
);
void githubData
.syncRepos({
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Resyncing GitHub data after unsuspend...",
})
.catch(() => {});
}
return { ok: true };
}
@ -1037,20 +1008,16 @@ export const organizationAppActions = {
},
"repository_membership_changed",
);
await expectQueueResponse<{ ok: true }>(
await githubData.send(
githubDataWorkflowQueueName("githubData.command.syncRepos"),
{
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Resyncing GitHub data after repository access change...",
},
{ wait: true, timeout: 10_000 },
),
);
void githubData
.syncRepos({
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Resyncing GitHub data after repository access change...",
})
.catch(() => {});
return { ok: true };
}
@ -1078,43 +1045,35 @@ export const organizationAppActions = {
"repository_event",
);
if (event === "pull_request" && body.repository?.clone_url && body.pull_request) {
await expectQueueResponse<{ ok: true }>(
await githubData.send(
githubDataWorkflowQueueName("githubData.command.handlePullRequestWebhook"),
{
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
repository: {
fullName: body.repository.full_name,
cloneUrl: body.repository.clone_url,
private: Boolean(body.repository.private),
},
pullRequest: {
number: body.pull_request.number,
status: body.pull_request.draft ? "draft" : "ready",
title: body.pull_request.title ?? "",
body: body.pull_request.body ?? null,
state: body.pull_request.state ?? "open",
url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`,
headRefName: body.pull_request.head?.ref ?? "",
baseRefName: body.pull_request.base?.ref ?? "",
authorLogin: body.pull_request.user?.login ?? null,
isDraft: Boolean(body.pull_request.draft),
merged: Boolean(body.pull_request.merged),
},
},
{ wait: true, timeout: 10_000 },
),
);
await githubData.handlePullRequestWebhook({
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
repository: {
fullName: body.repository.full_name,
cloneUrl: body.repository.clone_url,
private: Boolean(body.repository.private),
},
pullRequest: {
number: body.pull_request.number,
status: body.pull_request.draft ? "draft" : "ready",
title: body.pull_request.title ?? "",
body: body.pull_request.body ?? null,
state: body.pull_request.state ?? "open",
url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`,
headRefName: body.pull_request.head?.ref ?? "",
baseRefName: body.pull_request.base?.ref ?? "",
authorLogin: body.pull_request.user?.login ?? null,
isDraft: Boolean(body.pull_request.draft),
merged: Boolean(body.pull_request.merged),
},
});
}
if ((event === "push" || event === "create" || event === "delete") && body.repository?.clone_url) {
const repoId = repoIdFromRemote(body.repository.clone_url);
const knownRepository = await githubData.getRepository({ repoId });
if (knownRepository) {
await expectQueueResponse<unknown>(
await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), { repoId }, { wait: true, timeout: 10_000 }),
);
await githubData.reloadRepository({ repoId });
}
}
}
@ -1272,18 +1231,16 @@ export async function syncOrganizationShellFromGithubMutation(
const needsInitialSync = installationStatus === "connected" && syncStatus === "pending";
if (needsInitialSync) {
const githubData = await getOrCreateGithubData(c, organizationId);
await githubData.send(
githubDataWorkflowQueueName("githubData.command.syncRepos"),
{
void githubData
.syncRepos({
connectedAccount: input.githubLogin,
installationStatus: "connected",
installationId: input.installationId,
githubLogin: input.githubLogin,
kind: input.kind,
label: "Initial repository sync...",
},
{ wait: false },
);
})
.catch(() => {});
}
return { organizationId };

View file

@ -1,12 +1,10 @@
import { actor, queue } from "rivetkit";
import { actor } from "rivetkit";
import { organizationDb } from "./db/db.js";
import { organizationActions } from "./actions.js";
import { ORGANIZATION_QUEUE_NAMES } from "./queues.js";
import { runOrganizationCommandLoop } from "./workflow.js";
import { organizationCommandActions } from "./workflow.js";
export const organization = actor({
db: organizationDb,
queues: Object.fromEntries(ORGANIZATION_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "Organization",
icon: "compass",
@ -15,6 +13,8 @@ export const organization = actor({
createState: (_c, organizationId: string) => ({
organizationId,
}),
actions: organizationActions,
run: runOrganizationCommandLoop,
actions: {
...organizationActions,
...organizationCommandActions,
},
});

View file

@ -1,5 +1,8 @@
// @ts-nocheck
import { logActorWarning, resolveErrorMessage } from "../logging.js";
/**
* Organization command actions converted from queue handlers to direct actions.
* Each export becomes an action on the organization actor.
*/
import { applyGithubSyncProgressMutation, recordGithubWebhookReceiptMutation, refreshOrganizationSnapshotMutation } from "./actions.js";
import {
applyTaskSummaryUpdateMutation,
@ -33,136 +36,128 @@ import {
updateOrganizationShellProfileMutation,
upsertOrganizationInvoiceMutation,
} from "./app-shell.js";
import { ORGANIZATION_QUEUE_NAMES } from "./queues.js";
// Command handler dispatch table — maps queue name to handler function.
const COMMAND_HANDLERS: Record<string, (c: any, body: any) => Promise<any>> = {
"organization.command.createTask": (c, body) => createTaskMutation(c, body),
"organization.command.materializeTask": (c, body) => createTaskMutation(c, body),
"organization.command.registerTaskBranch": (c, body) => registerTaskBranchMutation(c, body),
"organization.command.applyTaskSummaryUpdate": async (c, body) => {
export const organizationCommandActions = {
async commandCreateTask(c: any, body: any) {
return await createTaskMutation(c, body);
},
async commandMaterializeTask(c: any, body: any) {
return await createTaskMutation(c, body);
},
async commandRegisterTaskBranch(c: any, body: any) {
return await registerTaskBranchMutation(c, body);
},
async commandApplyTaskSummaryUpdate(c: any, body: any) {
await applyTaskSummaryUpdateMutation(c, body);
return { ok: true };
},
"organization.command.removeTaskSummary": async (c, body) => {
async commandRemoveTaskSummary(c: any, body: any) {
await removeTaskSummaryMutation(c, body);
return { ok: true };
},
"organization.command.refreshTaskSummaryForBranch": async (c, body) => {
async commandRefreshTaskSummaryForBranch(c: any, body: any) {
await refreshTaskSummaryForBranchMutation(c, body);
return { ok: true };
},
"organization.command.snapshot.broadcast": async (c, _body) => {
async commandBroadcastSnapshot(c: any, _body: any) {
await refreshOrganizationSnapshotMutation(c);
return { ok: true };
},
"organization.command.syncGithubSession": async (c, body) => {
async commandSyncGithubSession(c: any, body: any) {
const { syncGithubOrganizations } = await import("./app-shell.js");
await syncGithubOrganizations(c, body);
return { ok: true };
},
"organization.command.better_auth.session_index.upsert": (c, body) => betterAuthUpsertSessionIndexMutation(c, body),
"organization.command.better_auth.session_index.delete": async (c, body) => {
// Better Auth index actions
async commandBetterAuthSessionIndexUpsert(c: any, body: any) {
return await betterAuthUpsertSessionIndexMutation(c, body);
},
async commandBetterAuthSessionIndexDelete(c: any, body: any) {
await betterAuthDeleteSessionIndexMutation(c, body);
return { ok: true };
},
"organization.command.better_auth.email_index.upsert": (c, body) => betterAuthUpsertEmailIndexMutation(c, body),
"organization.command.better_auth.email_index.delete": async (c, body) => {
async commandBetterAuthEmailIndexUpsert(c: any, body: any) {
return await betterAuthUpsertEmailIndexMutation(c, body);
},
async commandBetterAuthEmailIndexDelete(c: any, body: any) {
await betterAuthDeleteEmailIndexMutation(c, body);
return { ok: true };
},
"organization.command.better_auth.account_index.upsert": (c, body) => betterAuthUpsertAccountIndexMutation(c, body),
"organization.command.better_auth.account_index.delete": async (c, body) => {
async commandBetterAuthAccountIndexUpsert(c: any, body: any) {
return await betterAuthUpsertAccountIndexMutation(c, body);
},
async commandBetterAuthAccountIndexDelete(c: any, body: any) {
await betterAuthDeleteAccountIndexMutation(c, body);
return { ok: true };
},
"organization.command.better_auth.verification.create": (c, body) => betterAuthCreateVerificationMutation(c, body),
"organization.command.better_auth.verification.update": (c, body) => betterAuthUpdateVerificationMutation(c, body),
"organization.command.better_auth.verification.update_many": (c, body) => betterAuthUpdateManyVerificationMutation(c, body),
"organization.command.better_auth.verification.delete": async (c, body) => {
async commandBetterAuthVerificationCreate(c: any, body: any) {
return await betterAuthCreateVerificationMutation(c, body);
},
async commandBetterAuthVerificationUpdate(c: any, body: any) {
return await betterAuthUpdateVerificationMutation(c, body);
},
async commandBetterAuthVerificationUpdateMany(c: any, body: any) {
return await betterAuthUpdateManyVerificationMutation(c, body);
},
async commandBetterAuthVerificationDelete(c: any, body: any) {
await betterAuthDeleteVerificationMutation(c, body);
return { ok: true };
},
"organization.command.better_auth.verification.delete_many": (c, body) => betterAuthDeleteManyVerificationMutation(c, body),
"organization.command.github.sync_progress.apply": async (c, body) => {
async commandBetterAuthVerificationDeleteMany(c: any, body: any) {
return await betterAuthDeleteManyVerificationMutation(c, body);
},
// GitHub sync actions
async commandApplyGithubSyncProgress(c: any, body: any) {
await applyGithubSyncProgressMutation(c, body);
return { ok: true };
},
"organization.command.github.webhook_receipt.record": async (c, body) => {
async commandRecordGithubWebhookReceipt(c: any, body: any) {
await recordGithubWebhookReceiptMutation(c, body);
return { ok: true };
},
"organization.command.github.organization_shell.sync_from_github": (c, body) => syncOrganizationShellFromGithubMutation(c, body),
"organization.command.shell.profile.update": async (c, body) => {
async commandSyncOrganizationShellFromGithub(c: any, body: any) {
return await syncOrganizationShellFromGithubMutation(c, body);
},
// Shell/profile actions
async commandUpdateShellProfile(c: any, body: any) {
await updateOrganizationShellProfileMutation(c, body);
return { ok: true };
},
"organization.command.shell.sync_started.mark": async (c, body) => {
async commandMarkSyncStarted(c: any, body: any) {
await markOrganizationSyncStartedMutation(c, body);
return { ok: true };
},
"organization.command.billing.stripe_customer.apply": async (c, body) => {
// Billing actions
async commandApplyStripeCustomer(c: any, body: any) {
await applyOrganizationStripeCustomerMutation(c, body);
return { ok: true };
},
"organization.command.billing.stripe_subscription.apply": async (c, body) => {
async commandApplyStripeSubscription(c: any, body: any) {
await applyOrganizationStripeSubscriptionMutation(c, body);
return { ok: true };
},
"organization.command.billing.free_plan.apply": async (c, body) => {
async commandApplyFreePlan(c: any, body: any) {
await applyOrganizationFreePlanMutation(c, body);
return { ok: true };
},
"organization.command.billing.payment_method.set": async (c, body) => {
async commandSetPaymentMethod(c: any, body: any) {
await setOrganizationBillingPaymentMethodMutation(c, body);
return { ok: true };
},
"organization.command.billing.status.set": async (c, body) => {
async commandSetBillingStatus(c: any, body: any) {
await setOrganizationBillingStatusMutation(c, body);
return { ok: true };
},
"organization.command.billing.invoice.upsert": async (c, body) => {
async commandUpsertInvoice(c: any, body: any) {
await upsertOrganizationInvoiceMutation(c, body);
return { ok: true };
},
"organization.command.billing.seat_usage.record": async (c, body) => {
async commandRecordSeatUsage(c: any, body: any) {
await recordOrganizationSeatUsageMutation(c, body);
return { ok: true };
},
};
/**
* Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()`
* with completable messages. This avoids the RivetKit bug where actors created
* from another actor's workflow context never start their `run: workflow(...)`.
*
* The queue is still durable messages survive restarts. Only in-flight processing
* of a single message is lost on crash (the message is retried). All mutations are
* idempotent, so this is safe.
*/
export async function runOrganizationCommandLoop(c: any): Promise<void> {
for await (const msg of c.queue.iter({ names: [...ORGANIZATION_QUEUE_NAMES], completable: true })) {
try {
const handler = COMMAND_HANDLERS[msg.name];
if (handler) {
const result = await handler(c, msg.body);
await msg.complete(result);
} else {
logActorWarning("organization", "unknown queue message", { queueName: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` });
}
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("organization", "organization command failed", {
queueName: msg.name,
error: message,
});
await msg.complete({ error: message }).catch((completeError: unknown) => {
logActorWarning("organization", "organization command failed completing error response", {
queueName: msg.name,
error: resolveErrorMessage(completeError),
});
});
}
}
}

View file

@ -6,9 +6,10 @@ import { DEFAULT_WORKSPACE_MODEL_GROUPS, workspaceModelGroupsFromSandboxAgents,
import { SandboxAgent } from "sandbox-agent";
import { getActorRuntimeContext } from "../context.js";
import { organizationKey } from "../keys.js";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { resolveSandboxProviderId } from "../../sandbox-config.js";
const SANDBOX_REPO_CWD = "/home/sandbox/organization/repo";
const SANDBOX_REPO_CWD = "/home/user/repo";
const DEFAULT_LOCAL_SANDBOX_IMAGE = "rivetdev/sandbox-agent:full";
const DEFAULT_LOCAL_SANDBOX_PORT = 2468;
const dockerClient = new Dockerode({ socketPath: "/var/run/docker.sock" });
@ -204,6 +205,10 @@ const baseTaskSandbox = sandboxActor({
create: () => ({
template: config.sandboxProviders.e2b.template ?? "sandbox-agent-full-0.3.x",
envs: sandboxEnvObject(),
// Default E2B timeout is 5 minutes which is too short for task work.
// Set to 1 hour. TODO: use betaCreate + autoPause instead so sandboxes
// pause (preserving state) rather than being killed on timeout.
timeoutMs: 60 * 60 * 1000,
}),
installAgents: ["claude", "codex"],
});
@ -220,8 +225,12 @@ async function broadcastProcesses(c: any, actions: Record<string, (...args: any[
type: "processesUpdated",
processes: listed.processes ?? [],
});
} catch {
} catch (error) {
// Process broadcasts are best-effort. Callers still receive the primary action result.
logActorWarning("taskSandbox", "broadcastProcesses failed", {
sandboxId: c.state?.sandboxId,
error: resolveErrorMessage(error),
});
}
}
@ -337,6 +346,19 @@ export const taskSandbox = actor({
return sanitizeActorResult(await session.prompt([{ type: "text", text }]));
},
async listProcesses(c: any): Promise<any> {
try {
return await baseActions.listProcesses(c);
} catch (error) {
// Sandbox may be gone (E2B timeout, destroyed, etc.) — degrade to empty
logActorWarning("taskSandbox", "listProcesses failed, sandbox may be expired", {
sandboxId: c.state.sandboxId,
error: resolveErrorMessage(error),
});
return { processes: [] };
}
},
async createProcess(c: any, request: any): Promise<any> {
const created = await baseActions.createProcess(c, request);
await broadcastProcesses(c, baseActions);

View file

@ -1,9 +1,9 @@
import { actor, queue } from "rivetkit";
import { actor } from "rivetkit";
import type { TaskRecord } from "@sandbox-agent/foundry-shared";
import { taskDb } from "./db/db.js";
import { getCurrentRecord } from "./workflow/common.js";
import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js";
import { TASK_QUEUE_NAMES, runTaskCommandLoop } from "./workflow/index.js";
import { taskCommandActions } from "./workflow/index.js";
export interface TaskInput {
organizationId: string;
@ -13,11 +13,10 @@ export interface TaskInput {
export const task = actor({
db: taskDb,
queues: Object.fromEntries(TASK_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "Task",
icon: "wrench",
actionTimeout: 5 * 60_000,
actionTimeout: 10 * 60_000,
},
createState: (_c, input: TaskInput) => ({
organizationId: input.organizationId,
@ -40,8 +39,9 @@ export const task = actor({
async getSessionDetail(c, input: { sessionId: string; authSessionId?: string }) {
return await getSessionDetail(c, input.sessionId, input.authSessionId);
},
...taskCommandActions,
},
run: runTaskCommandLoop,
});
export { TASK_QUEUE_NAMES };
export { taskWorkflowQueueName } from "./workflow/index.js";

View file

@ -4,6 +4,8 @@ import type { TaskRecord, TaskStatus } from "@sandbox-agent/foundry-shared";
import { task as taskTable, taskRuntime, taskSandboxes } from "../db/schema.js";
import { getOrCreateAuditLog, getOrCreateOrganization } from "../../handles.js";
import { broadcastTaskUpdate } from "../workspace.js";
import { getActorRuntimeContext } from "../../context.js";
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
export const TASK_ROW_ID = 1;
@ -64,10 +66,16 @@ export async function setTaskState(ctx: any, status: TaskStatus): Promise<void>
await broadcastTaskUpdate(ctx);
}
/**
* Read the task's current record from its local SQLite DB.
* If the task actor was lazily created (virtual task from PR sync) and has no
* DB rows yet, auto-initializes by reading branch/title from the org actor's
* getTaskIndexEntry. This is the self-initialization path for lazy task actors.
*/
export async function getCurrentRecord(ctx: any): Promise<TaskRecord> {
const db = ctx.db;
const organization = await getOrCreateOrganization(ctx, ctx.state.organizationId);
const row = await db
let row = await db
.select({
branchName: taskTable.branchName,
title: taskTable.title,
@ -85,7 +93,48 @@ export async function getCurrentRecord(ctx: any): Promise<TaskRecord> {
.get();
if (!row) {
throw new Error(`Task not found: ${ctx.state.taskId}`);
// Virtual task — auto-initialize from org actor's task index data
let branchName: string | null = null;
let title = "Untitled";
try {
const entry = await organization.getTaskIndexEntry({ taskId: ctx.state.taskId });
branchName = entry?.branchName ?? null;
title = entry?.title ?? title;
} catch {}
const { config } = getActorRuntimeContext();
const { initBootstrapDbActivity, initCompleteActivity } = await import("./init.js");
await initBootstrapDbActivity(ctx, {
sandboxProviderId: defaultSandboxProviderId(config),
branchName,
title,
task: title,
});
await initCompleteActivity(ctx, { sandboxProviderId: defaultSandboxProviderId(config) });
// Re-read the row after initialization
const initialized = await db
.select({
branchName: taskTable.branchName,
title: taskTable.title,
task: taskTable.task,
sandboxProviderId: taskTable.sandboxProviderId,
status: taskTable.status,
pullRequestJson: taskTable.pullRequestJson,
activeSandboxId: taskRuntime.activeSandboxId,
createdAt: taskTable.createdAt,
updatedAt: taskTable.updatedAt,
})
.from(taskTable)
.leftJoin(taskRuntime, eq(taskTable.id, taskRuntime.id))
.where(eq(taskTable.id, TASK_ROW_ID))
.get();
if (!initialized) {
throw new Error(`Task not found after initialization: ${ctx.state.taskId}`);
}
row = initialized;
}
const repositoryMetadata = await organization.getRepositoryMetadata({ repoId: ctx.state.repoId });
@ -140,19 +189,13 @@ export async function getCurrentRecord(ctx: any): Promise<TaskRecord> {
export async function appendAuditLog(ctx: any, kind: string, payload: Record<string, unknown>): Promise<void> {
const row = await ctx.db.select({ branchName: taskTable.branchName }).from(taskTable).where(eq(taskTable.id, TASK_ROW_ID)).get();
const auditLog = await getOrCreateAuditLog(ctx, ctx.state.organizationId);
await auditLog.send(
"auditLog.command.append",
{
kind,
repoId: ctx.state.repoId,
taskId: ctx.state.taskId,
branchName: row?.branchName ?? null,
payload,
},
{
wait: false,
},
);
void auditLog.append({
kind,
repoId: ctx.state.repoId,
taskId: ctx.state.taskId,
branchName: row?.branchName ?? null,
payload,
});
await broadcastTaskUpdate(ctx);
}

View file

@ -11,7 +11,6 @@ import {
killDestroySandboxActivity,
killWriteDbActivity,
} from "./commands.js";
import { TASK_QUEUE_NAMES } from "./queue.js";
import {
changeWorkspaceModel,
closeWorkspaceSession,
@ -33,205 +32,233 @@ import {
updateWorkspaceDraft,
} from "../workspace.js";
export { TASK_QUEUE_NAMES, taskWorkflowQueueName } from "./queue.js";
export { taskWorkflowQueueName } from "./queue.js";
type TaskQueueName = (typeof TASK_QUEUE_NAMES)[number];
type CommandHandler = (c: any, msg: { name: TaskQueueName; body: any; complete: (response: unknown) => Promise<void> }) => Promise<void>;
const commandHandlers: Record<TaskQueueName, CommandHandler> = {
"task.command.initialize": async (c, msg) => {
const body = msg.body;
/**
* Task command actions converted from queue/workflow handlers to direct actions.
* Each export becomes an action on the task actor.
*/
export const taskCommandActions = {
async initialize(c: any, body: any) {
await initBootstrapDbActivity(c, body);
await initEnqueueProvisionActivity(c, body);
const currentRecord = await getCurrentRecord(c);
return await getCurrentRecord(c);
},
async provision(c: any, body: any) {
try {
await msg.complete(currentRecord);
await initCompleteActivity(c, body);
return { ok: true };
} catch (error) {
logActorWarning("task.workflow", "initialize completion failed", {
error: resolveErrorMessage(error),
});
await initFailedActivity(c, error, body);
return { ok: false, error: resolveErrorMessage(error) };
}
},
"task.command.provision": async (c, msg) => {
try {
await initCompleteActivity(c, msg.body);
await msg.complete({ ok: true });
} catch (error) {
await initFailedActivity(c, error, msg.body);
await msg.complete({
ok: false,
error: resolveErrorMessage(error),
});
}
},
"task.command.attach": async (c, msg) => {
async attach(c: any, body: any) {
// handleAttachActivity expects msg with complete — adapt
const result = { value: undefined as any };
const msg = {
name: "task.command.attach",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleAttachActivity(c, msg);
return result.value;
},
"task.command.switch": async (c, msg) => {
async switchTask(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.switch",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleSwitchActivity(c, msg);
return result.value;
},
"task.command.push": async (c, msg) => {
async push(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.push",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handlePushActivity(c, msg);
return result.value;
},
"task.command.sync": async (c, msg) => {
async sync(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.sync",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleSimpleCommandActivity(c, msg, "task.sync");
return result.value;
},
"task.command.merge": async (c, msg) => {
async merge(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.merge",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleSimpleCommandActivity(c, msg, "task.merge");
return result.value;
},
"task.command.archive": async (c, msg) => {
async archive(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.archive",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleArchiveActivity(c, msg);
return result.value;
},
"task.command.kill": async (c, msg) => {
async kill(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.kill",
body,
complete: async (v: any) => {
result.value = v;
},
};
await killDestroySandboxActivity(c);
await killWriteDbActivity(c, msg);
return result.value;
},
"task.command.get": async (c, msg) => {
async getRecord(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.get",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleGetActivity(c, msg);
return result.value;
},
"task.command.pull_request.sync": async (c, msg) => {
await syncTaskPullRequest(c, msg.body?.pullRequest ?? null);
await msg.complete({ ok: true });
async pullRequestSync(c: any, body: any) {
await syncTaskPullRequest(c, body?.pullRequest ?? null);
return { ok: true };
},
"task.command.workspace.mark_unread": async (c, msg) => {
await markWorkspaceUnread(c, msg.body?.authSessionId);
await msg.complete({ ok: true });
async markUnread(c: any, body: any) {
await markWorkspaceUnread(c, body?.authSessionId);
return { ok: true };
},
"task.command.workspace.rename_task": async (c, msg) => {
await renameWorkspaceTask(c, msg.body.value);
await msg.complete({ ok: true });
async renameTask(c: any, body: any) {
await renameWorkspaceTask(c, body.value);
return { ok: true };
},
"task.command.workspace.create_session": async (c, msg) => {
async createSession(c: any, body: any) {
return await createWorkspaceSession(c, body?.model, body?.authSessionId);
},
async createSessionAndSend(c: any, body: any) {
try {
const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId);
await msg.complete(created);
} catch (error) {
await msg.complete({ error: resolveErrorMessage(error) });
}
},
"task.command.workspace.create_session_and_send": async (c, msg) => {
try {
const created = await createWorkspaceSession(c, msg.body?.model, msg.body?.authSessionId);
await sendWorkspaceMessage(c, created.sessionId, msg.body.text, [], msg.body?.authSessionId);
const created = await createWorkspaceSession(c, body?.model, body?.authSessionId);
await sendWorkspaceMessage(c, created.sessionId, body.text, [], body?.authSessionId);
} catch (error) {
logActorWarning("task.workflow", "create_session_and_send failed", {
error: resolveErrorMessage(error),
});
}
await msg.complete({ ok: true });
return { ok: true };
},
"task.command.workspace.ensure_session": async (c, msg) => {
await ensureWorkspaceSession(c, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId);
await msg.complete({ ok: true });
async ensureSession(c: any, body: any) {
await ensureWorkspaceSession(c, body.sessionId, body?.model, body?.authSessionId);
return { ok: true };
},
"task.command.workspace.rename_session": async (c, msg) => {
await renameWorkspaceSession(c, msg.body.sessionId, msg.body.title);
await msg.complete({ ok: true });
async renameSession(c: any, body: any) {
await renameWorkspaceSession(c, body.sessionId, body.title);
return { ok: true };
},
"task.command.workspace.select_session": async (c, msg) => {
await selectWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId);
await msg.complete({ ok: true });
async selectSession(c: any, body: any) {
await selectWorkspaceSession(c, body.sessionId, body?.authSessionId);
return { ok: true };
},
"task.command.workspace.set_session_unread": async (c, msg) => {
await setWorkspaceSessionUnread(c, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId);
await msg.complete({ ok: true });
async setSessionUnread(c: any, body: any) {
await setWorkspaceSessionUnread(c, body.sessionId, body.unread, body?.authSessionId);
return { ok: true };
},
"task.command.workspace.update_draft": async (c, msg) => {
await updateWorkspaceDraft(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
await msg.complete({ ok: true });
async updateDraft(c: any, body: any) {
await updateWorkspaceDraft(c, body.sessionId, body.text, body.attachments, body?.authSessionId);
return { ok: true };
},
"task.command.workspace.change_model": async (c, msg) => {
await changeWorkspaceModel(c, msg.body.sessionId, msg.body.model, msg.body?.authSessionId);
await msg.complete({ ok: true });
async changeModel(c: any, body: any) {
await changeWorkspaceModel(c, body.sessionId, body.model, body?.authSessionId);
return { ok: true };
},
"task.command.workspace.send_message": async (c, msg) => {
try {
await sendWorkspaceMessage(c, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
await msg.complete({ ok: true });
} catch (error) {
await msg.complete({ error: resolveErrorMessage(error) });
}
async sendMessage(c: any, body: any) {
await sendWorkspaceMessage(c, body.sessionId, body.text, body.attachments, body?.authSessionId);
return { ok: true };
},
"task.command.workspace.stop_session": async (c, msg) => {
await stopWorkspaceSession(c, msg.body.sessionId);
await msg.complete({ ok: true });
async stopSession(c: any, body: any) {
await stopWorkspaceSession(c, body.sessionId);
return { ok: true };
},
"task.command.workspace.sync_session_status": async (c, msg) => {
await syncWorkspaceSessionStatus(c, msg.body.sessionId, msg.body.status, msg.body.at);
await msg.complete({ ok: true });
async syncSessionStatus(c: any, body: any) {
await syncWorkspaceSessionStatus(c, body.sessionId, body.status, body.at);
return { ok: true };
},
"task.command.workspace.refresh_derived": async (c, msg) => {
async refreshDerived(c: any, _body: any) {
await refreshWorkspaceDerivedState(c);
await msg.complete({ ok: true });
return { ok: true };
},
"task.command.workspace.refresh_session_transcript": async (c, msg) => {
await refreshWorkspaceSessionTranscript(c, msg.body.sessionId);
await msg.complete({ ok: true });
async refreshSessionTranscript(c: any, body: any) {
await refreshWorkspaceSessionTranscript(c, body.sessionId);
return { ok: true };
},
"task.command.workspace.close_session": async (c, msg) => {
await closeWorkspaceSession(c, msg.body.sessionId, msg.body?.authSessionId);
await msg.complete({ ok: true });
async closeSession(c: any, body: any) {
await closeWorkspaceSession(c, body.sessionId, body?.authSessionId);
return { ok: true };
},
"task.command.workspace.publish_pr": async (c, msg) => {
async publishPr(c: any, _body: any) {
await publishWorkspacePr(c);
await msg.complete({ ok: true });
return { ok: true };
},
"task.command.workspace.revert_file": async (c, msg) => {
await revertWorkspaceFile(c, msg.body.path);
await msg.complete({ ok: true });
async revertFile(c: any, body: any) {
await revertWorkspaceFile(c, body.path);
return { ok: true };
},
};
/**
* Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()`
* with completable messages.
*/
export async function runTaskCommandLoop(c: any): Promise<void> {
for await (const msg of c.queue.iter({ names: [...TASK_QUEUE_NAMES], completable: true })) {
const handler = commandHandlers[msg.name as TaskQueueName];
if (handler) {
try {
await handler(c, msg);
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("task.workflow", "task command failed", {
queueName: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {});
}
} else {
logActorWarning("task.workflow", "unknown queue message", { queueName: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` });
}
}
}

View file

@ -6,7 +6,7 @@ import { resolveErrorMessage } from "../../logging.js";
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
import { task as taskTable, taskRuntime } from "../db/schema.js";
import { TASK_ROW_ID, appendAuditLog, collectErrorMessages, resolveErrorDetail, setTaskState } from "./common.js";
import { taskWorkflowQueueName } from "./queue.js";
// task actions called directly (no queue)
export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise<void> {
const { config } = getActorRuntimeContext();
@ -72,9 +72,7 @@ export async function initEnqueueProvisionActivity(loopCtx: any, body: any): Pro
const self = selfTask(loopCtx);
try {
await self.send(taskWorkflowQueueName("task.command.provision"), body, {
wait: false,
});
void self.provision(body).catch(() => {});
} catch (error) {
logActorWarning("task.init", "background provision command failed", {
organizationId: loopCtx.state.organizationId,

View file

@ -10,14 +10,15 @@ import {
} from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js";
import { getOrCreateOrganization, getOrCreateTaskSandbox, getOrCreateUser, getTaskSandbox, selfTask } from "../handles.js";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { SANDBOX_REPO_CWD } from "../sandbox/index.js";
import { resolveSandboxProviderId } from "../../sandbox-config.js";
import { getBetterAuthService } from "../../services/better-auth.js";
import { expectQueueResponse } from "../../services/queue.js";
// expectQueueResponse removed — actions return values directly
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
import { githubRepoFullNameFromRemote } from "../../services/repo.js";
import { organizationWorkflowQueueName } from "../organization/queues.js";
import { userWorkflowQueueName } from "../user/workflow.js";
// organization actions called directly (no queue)
import { task as taskTable, taskRuntime, taskSandboxes, taskWorkspaceSessions } from "./db/schema.js";
import { getCurrentRecord } from "./workflow/common.js";
@ -239,17 +240,11 @@ async function upsertUserTaskState(c: any, authSessionId: string | null | undefi
}
const user = await getOrCreateUser(c, userId);
expectQueueResponse(
await user.send(
userWorkflowQueueName("user.command.task_state.upsert"),
{
taskId: c.state.taskId,
sessionId,
patch,
},
{ wait: true, timeout: 60_000 },
),
);
await user.taskStateUpsert({
taskId: c.state.taskId,
sessionId,
patch,
});
}
async function deleteUserTaskState(c: any, authSessionId: string | null | undefined, sessionId: string): Promise<void> {
@ -264,16 +259,10 @@ async function deleteUserTaskState(c: any, authSessionId: string | null | undefi
}
const user = await getOrCreateUser(c, userId);
expectQueueResponse(
await user.send(
userWorkflowQueueName("user.command.task_state.delete"),
{
taskId: c.state.taskId,
sessionId,
},
{ wait: true, timeout: 60_000 },
),
);
await user.taskStateDelete({
taskId: c.state.taskId,
sessionId,
});
}
async function resolveDefaultModel(c: any, authSessionId?: string | null): Promise<string> {
@ -750,21 +739,17 @@ async function enqueueWorkspaceRefresh(
command: "task.command.workspace.refresh_derived" | "task.command.workspace.refresh_session_transcript",
body: Record<string, unknown>,
): Promise<void> {
const self = selfTask(c);
await self.send(command, body, { wait: false });
// Call directly since we're inside the task actor (no queue needed)
if (command === "task.command.workspace.refresh_derived") {
void refreshWorkspaceDerivedState(c).catch(() => {});
} else {
void refreshWorkspaceSessionTranscript(c, body.sessionId as string).catch(() => {});
}
}
async function enqueueWorkspaceEnsureSession(c: any, sessionId: string): Promise<void> {
const self = selfTask(c);
await self.send(
"task.command.workspace.ensure_session",
{
sessionId,
},
{
wait: false,
},
);
// Call directly since we're inside the task actor
void ensureWorkspaceSession(c, sessionId).catch(() => {});
}
function pendingWorkspaceSessionStatus(record: any): "pending_provision" | "pending_session_create" {
@ -930,7 +915,10 @@ export async function buildSessionDetail(c: any, sessionId: string, authSessionI
const userTaskState = await getUserTaskState(c, authSessionId);
const userSessionState = userTaskState.bySessionId.get(sessionId);
if (!meta.sandboxSessionId) {
// Skip live transcript fetch if the sandbox session doesn't exist yet or
// the session is still provisioning — the sandbox API will block/timeout.
const isPending = meta.status === "pending_provision" || meta.status === "pending_session_create";
if (!meta.sandboxSessionId || isPending) {
return buildSessionDetailFromMeta(meta, userSessionState);
}
@ -947,8 +935,13 @@ export async function buildSessionDetail(c: any, sessionId: string, authSessionI
userSessionState,
);
}
} catch {
// Session detail reads should degrade to cached transcript data if the live sandbox is unavailable.
} catch (error) {
// Session detail reads degrade to cached transcript when sandbox is unavailable.
logActorWarning("task", "readSessionTranscript failed, using cached transcript", {
taskId: c.state.taskId,
sessionId,
error: resolveErrorMessage(error),
});
}
return buildSessionDetailFromMeta(meta, userSessionState);
@ -976,13 +969,7 @@ export async function getSessionDetail(c: any, sessionId: string, authSessionId?
*/
export async function broadcastTaskUpdate(c: any, options?: { sessionId?: string }): Promise<void> {
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await expectQueueResponse<{ ok: true }>(
await organization.send(
organizationWorkflowQueueName("organization.command.applyTaskSummaryUpdate"),
{ taskSummary: await buildTaskSummary(c) },
{ wait: true, timeout: 10_000 },
),
);
await organization.commandApplyTaskSummaryUpdate({ taskSummary: await buildTaskSummary(c) });
c.broadcast("taskUpdated", {
type: "taskUpdated",
detail: await buildTaskDetail(c),
@ -1119,22 +1106,12 @@ export async function ensureWorkspaceSession(c: any, sessionId: string, model?:
}
export async function enqueuePendingWorkspaceSessions(c: any): Promise<void> {
const self = selfTask(c);
const pending = (await listSessionMetaRows(c, { includeClosed: true })).filter(
(row) => row.closed !== true && row.status !== "ready" && row.status !== "error",
);
for (const row of pending) {
await self.send(
"task.command.workspace.ensure_session",
{
sessionId: row.sessionId,
model: row.model,
},
{
wait: false,
},
);
void ensureWorkspaceSession(c, row.sessionId, row.model).catch(() => {});
}
}

View file

@ -1,12 +1,21 @@
import { actor, queue } from "rivetkit";
import { actor } from "rivetkit";
import { userDb } from "./db/db.js";
import { betterAuthActions } from "./actions/better-auth.js";
import { userActions } from "./actions/user.js";
import { USER_QUEUE_NAMES, runUserCommandLoop } from "./workflow.js";
import {
createAuthRecordMutation,
updateAuthRecordMutation,
updateManyAuthRecordsMutation,
deleteAuthRecordMutation,
deleteManyAuthRecordsMutation,
upsertUserProfileMutation,
upsertSessionStateMutation,
upsertTaskStateMutation,
deleteTaskStateMutation,
} from "./workflow.js";
export const user = actor({
db: userDb,
queues: Object.fromEntries(USER_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "User",
icon: "shield",
@ -18,6 +27,34 @@ export const user = actor({
actions: {
...betterAuthActions,
...userActions,
async authCreate(c, body) {
return await createAuthRecordMutation(c, body);
},
async authUpdate(c, body) {
return await updateAuthRecordMutation(c, body);
},
async authUpdateMany(c, body) {
return await updateManyAuthRecordsMutation(c, body);
},
async authDelete(c, body) {
await deleteAuthRecordMutation(c, body);
return { ok: true };
},
async authDeleteMany(c, body) {
return await deleteManyAuthRecordsMutation(c, body);
},
async profileUpsert(c, body) {
return await upsertUserProfileMutation(c, body);
},
async sessionStateUpsert(c, body) {
return await upsertSessionStateMutation(c, body);
},
async taskStateUpsert(c, body) {
return await upsertTaskStateMutation(c, body);
},
async taskStateDelete(c, body) {
await deleteTaskStateMutation(c, body);
return { ok: true };
},
},
run: runUserCommandLoop,
});

View file

@ -1,28 +1,9 @@
import { eq, count as sqlCount, and } from "drizzle-orm";
import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js";
import { buildWhere, columnFor, materializeRow, persistInput, persistPatch, tableFor } from "./query-helpers.js";
export const USER_QUEUE_NAMES = [
"user.command.auth.create",
"user.command.auth.update",
"user.command.auth.update_many",
"user.command.auth.delete",
"user.command.auth.delete_many",
"user.command.profile.upsert",
"user.command.session_state.upsert",
"user.command.task_state.upsert",
"user.command.task_state.delete",
] as const;
export type UserQueueName = (typeof USER_QUEUE_NAMES)[number];
export function userWorkflowQueueName(name: UserQueueName): UserQueueName {
return name;
}
async function createAuthRecordMutation(c: any, input: { model: string; data: Record<string, unknown> }) {
export async function createAuthRecordMutation(c: any, input: { model: string; data: Record<string, unknown> }) {
const table = tableFor(input.model);
const persisted = persistInput(input.model, input.data);
await c.db
@ -37,12 +18,10 @@ async function createAuthRecordMutation(c: any, input: { model: string; data: Re
return materializeRow(input.model, row);
}
async function updateAuthRecordMutation(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
export async function updateAuthRecordMutation(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) {
throw new Error("updateAuthRecord requires a where clause");
}
if (!predicate) throw new Error("updateAuthRecord requires a where clause");
await c.db
.update(table)
.set(persistPatch(input.model, input.update) as any)
@ -51,12 +30,10 @@ async function updateAuthRecordMutation(c: any, input: { model: string; where: a
return materializeRow(input.model, await c.db.select().from(table).where(predicate).get());
}
async function updateManyAuthRecordsMutation(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
export async function updateManyAuthRecordsMutation(c: any, input: { model: string; where: any[]; update: Record<string, unknown> }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) {
throw new Error("updateManyAuthRecords requires a where clause");
}
if (!predicate) throw new Error("updateManyAuthRecords requires a where clause");
await c.db
.update(table)
.set(persistPatch(input.model, input.update) as any)
@ -66,27 +43,23 @@ async function updateManyAuthRecordsMutation(c: any, input: { model: string; whe
return row?.value ?? 0;
}
async function deleteAuthRecordMutation(c: any, input: { model: string; where: any[] }) {
export async function deleteAuthRecordMutation(c: any, input: { model: string; where: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) {
throw new Error("deleteAuthRecord requires a where clause");
}
if (!predicate) throw new Error("deleteAuthRecord requires a where clause");
await c.db.delete(table).where(predicate).run();
}
async function deleteManyAuthRecordsMutation(c: any, input: { model: string; where: any[] }) {
export async function deleteManyAuthRecordsMutation(c: any, input: { model: string; where: any[] }) {
const table = tableFor(input.model);
const predicate = buildWhere(table, input.where);
if (!predicate) {
throw new Error("deleteManyAuthRecords requires a where clause");
}
if (!predicate) throw new Error("deleteManyAuthRecords requires a where clause");
const rows = await c.db.select().from(table).where(predicate).all();
await c.db.delete(table).where(predicate).run();
return rows.length;
}
async function upsertUserProfileMutation(
export async function upsertUserProfileMutation(
c: any,
input: {
userId: string;
@ -134,11 +107,10 @@ async function upsertUserProfileMutation(
},
})
.run();
return await c.db.select().from(userProfiles).where(eq(userProfiles.userId, input.userId)).get();
}
async function upsertSessionStateMutation(c: any, input: { sessionId: string; activeOrganizationId: string | null }) {
export async function upsertSessionStateMutation(c: any, input: { sessionId: string; activeOrganizationId: string | null }) {
const now = Date.now();
await c.db
.insert(sessionState)
@ -150,17 +122,13 @@ async function upsertSessionStateMutation(c: any, input: { sessionId: string; ac
})
.onConflictDoUpdate({
target: sessionState.sessionId,
set: {
activeOrganizationId: input.activeOrganizationId,
updatedAt: now,
},
set: { activeOrganizationId: input.activeOrganizationId, updatedAt: now },
})
.run();
return await c.db.select().from(sessionState).where(eq(sessionState.sessionId, input.sessionId)).get();
}
async function upsertTaskStateMutation(
export async function upsertTaskStateMutation(
c: any,
input: {
taskId: string;
@ -182,14 +150,7 @@ async function upsertTaskStateMutation(
.get();
if (input.patch.activeSessionId !== undefined) {
await c.db
.update(userTaskState)
.set({
activeSessionId: input.patch.activeSessionId,
updatedAt: now,
})
.where(eq(userTaskState.taskId, input.taskId))
.run();
await c.db.update(userTaskState).set({ activeSessionId: input.patch.activeSessionId, updatedAt: now }).where(eq(userTaskState.taskId, input.taskId)).run();
}
await c.db
@ -224,7 +185,7 @@ async function upsertTaskStateMutation(
.get();
}
async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionId?: string }) {
export async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionId?: string }) {
if (input.sessionId) {
await c.db
.delete(userTaskState)
@ -232,50 +193,5 @@ async function deleteTaskStateMutation(c: any, input: { taskId: string; sessionI
.run();
return;
}
await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run();
}
const COMMAND_HANDLERS: Record<string, (c: any, body: any) => Promise<any>> = {
"user.command.auth.create": (c, body) => createAuthRecordMutation(c, body),
"user.command.auth.update": (c, body) => updateAuthRecordMutation(c, body),
"user.command.auth.update_many": (c, body) => updateManyAuthRecordsMutation(c, body),
"user.command.auth.delete": async (c, body) => {
await deleteAuthRecordMutation(c, body);
return { ok: true };
},
"user.command.auth.delete_many": (c, body) => deleteManyAuthRecordsMutation(c, body),
"user.command.profile.upsert": (c, body) => upsertUserProfileMutation(c, body),
"user.command.session_state.upsert": (c, body) => upsertSessionStateMutation(c, body),
"user.command.task_state.upsert": (c, body) => upsertTaskStateMutation(c, body),
"user.command.task_state.delete": async (c, body) => {
await deleteTaskStateMutation(c, body);
return { ok: true };
},
};
/**
* Plain run handler (no workflow engine). Drains the queue using `c.queue.iter()`
* with completable messages.
*/
export async function runUserCommandLoop(c: any): Promise<void> {
for await (const msg of c.queue.iter({ names: [...USER_QUEUE_NAMES], completable: true })) {
try {
const handler = COMMAND_HANDLERS[msg.name];
if (handler) {
const result = await handler(c, msg.body);
await msg.complete(result);
} else {
logActorWarning("user", "unknown queue message", { queueName: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` });
}
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("user", "user command failed", {
queueName: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {});
}
}
}

View file

@ -48,6 +48,19 @@ function isRivetRequest(request: Request): boolean {
}
export async function startBackend(options: BackendStartOptions = {}): Promise<void> {
// Prevent the sandbox-agent SDK's unhandled SQLite constraint errors from
// crashing the entire process. The SDK has a bug where duplicate event
// inserts (sandbox_agent_events UNIQUE constraint) throw from an internal
// async path with no catch. Log and continue.
process.on("uncaughtException", (error) => {
logger.error({ error: error?.message ?? String(error), stack: error?.stack }, "uncaughtException (kept alive)");
});
process.on("unhandledRejection", (reason) => {
const msg = reason instanceof Error ? reason.message : String(reason);
const stack = reason instanceof Error ? reason.stack : undefined;
logger.error({ error: msg, stack }, "unhandledRejection (kept alive)");
});
// sandbox-agent agent plugins vary on which env var they read for OpenAI/Codex auth.
// Normalize to keep local dev + docker-compose simple.
if (!process.env.CODEX_API_KEY && process.env.OPENAI_API_KEY) {

View file

@ -1,11 +1,11 @@
import { betterAuth } from "better-auth";
import { createAdapterFactory } from "better-auth/adapters";
import { APP_SHELL_ORGANIZATION_ID } from "../actors/organization/constants.js";
import { organizationWorkflowQueueName } from "../actors/organization/queues.js";
import { userWorkflowQueueName } from "../actors/user/workflow.js";
// organization actions are called directly (no queue)
// user actor actions are called directly (no queue)
import { organizationKey, userKey } from "../actors/keys.js";
import { logger } from "../logging.js";
import { expectQueueResponse } from "./queue.js";
// expectQueueResponse removed — actions return values directly
const AUTH_BASE_PATH = "/v1/auth";
const SESSION_COOKIE = "better-auth.session_token";
@ -62,11 +62,7 @@ function resolveRouteUserId(organization: any, resolved: any): string | null {
return null;
}
async function sendOrganizationCommand<TResponse>(organization: any, name: Parameters<typeof organizationWorkflowQueueName>[0], body: unknown): Promise<TResponse> {
return expectQueueResponse<TResponse>(
await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 }),
);
}
// sendOrganizationCommand removed — org actions are called directly
export interface BetterAuthService {
auth: any;
@ -166,9 +162,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
return null;
};
const ensureOrganizationVerification = async <TResponse>(method: Parameters<typeof organizationWorkflowQueueName>[0], payload: Record<string, unknown>) => {
const ensureOrganizationVerification = async (actionName: string, payload: Record<string, unknown>) => {
const organization = await appOrganization();
return await sendOrganizationCommand<TResponse>(organization, method, payload);
return await (organization as any)[actionName](payload);
};
return {
@ -179,7 +175,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
create: async ({ model, data }) => {
const transformed = await transformInput(data, model, "create", true);
if (model === "verification") {
return await ensureOrganizationVerification<any>("organization.command.better_auth.verification.create", { data: transformed });
return await ensureOrganizationVerification("commandBetterAuthVerificationCreate", { data: transformed });
}
const userId = await resolveUserIdForQuery(model, undefined, transformed);
@ -188,20 +184,18 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
}
const userActor = await getUser(userId);
const created = expectQueueResponse<any>(
await userActor.send(userWorkflowQueueName("user.command.auth.create"), { model, data: transformed }, { wait: true, timeout: 60_000 }),
);
const created = await userActor.authCreate({ model, data: transformed });
const organization = await appOrganization();
if (model === "user" && typeof transformed.email === "string" && transformed.email.length > 0) {
await sendOrganizationCommand(organization, "organization.command.better_auth.email_index.upsert", {
await organization.commandBetterAuthEmailIndexUpsert({
email: transformed.email.toLowerCase(),
userId,
});
}
if (model === "session") {
await sendOrganizationCommand(organization, "organization.command.better_auth.session_index.upsert", {
await organization.commandBetterAuthSessionIndexUpsert({
sessionId: String(created.id),
sessionToken: String(created.token),
userId,
@ -209,7 +203,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
}
if (model === "account") {
await sendOrganizationCommand(organization, "organization.command.better_auth.account_index.upsert", {
await organization.commandBetterAuthAccountIndexUpsert({
id: String(created.id),
providerId: String(created.providerId),
accountId: String(created.accountId),
@ -297,7 +291,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
const transformedWhere = transformWhereClause({ model, where, action: "update" });
const transformedUpdate = (await transformInput(update as Record<string, unknown>, model, "update", true)) as Record<string, unknown>;
if (model === "verification") {
return await ensureOrganizationVerification<any>("organization.command.better_auth.verification.update", {
return await ensureOrganizationVerification("commandBetterAuthVerificationUpdate", {
where: transformedWhere,
update: transformedUpdate,
});
@ -317,23 +311,17 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
: model === "session"
? await userActor.betterAuthFindOneRecord({ model, where: transformedWhere })
: null;
const updated = expectQueueResponse<any>(
await userActor.send(
userWorkflowQueueName("user.command.auth.update"),
{ model, where: transformedWhere, update: transformedUpdate },
{ wait: true, timeout: 60_000 },
),
);
const updated = await userActor.authUpdate({ model, where: transformedWhere, update: transformedUpdate });
const organization = await appOrganization();
if (model === "user" && updated) {
if (before?.email && before.email !== updated.email) {
await sendOrganizationCommand(organization, "organization.command.better_auth.email_index.delete", {
await organization.commandBetterAuthEmailIndexDelete({
email: before.email.toLowerCase(),
});
}
if (updated.email) {
await sendOrganizationCommand(organization, "organization.command.better_auth.email_index.upsert", {
await organization.commandBetterAuthEmailIndexUpsert({
email: updated.email.toLowerCase(),
userId,
});
@ -341,7 +329,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
}
if (model === "session" && updated) {
await sendOrganizationCommand(organization, "organization.command.better_auth.session_index.upsert", {
await organization.commandBetterAuthSessionIndexUpsert({
sessionId: String(updated.id),
sessionToken: String(updated.token),
userId,
@ -349,7 +337,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
}
if (model === "account" && updated) {
await sendOrganizationCommand(organization, "organization.command.better_auth.account_index.upsert", {
await organization.commandBetterAuthAccountIndexUpsert({
id: String(updated.id),
providerId: String(updated.providerId),
accountId: String(updated.accountId),
@ -364,7 +352,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
const transformedWhere = transformWhereClause({ model, where, action: "updateMany" });
const transformedUpdate = (await transformInput(update as Record<string, unknown>, model, "update", true)) as Record<string, unknown>;
if (model === "verification") {
return await ensureOrganizationVerification<number>("organization.command.better_auth.verification.update_many", {
return await ensureOrganizationVerification("commandBetterAuthVerificationUpdateMany", {
where: transformedWhere,
update: transformedUpdate,
});
@ -376,20 +364,14 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
}
const userActor = await getUser(userId);
return expectQueueResponse<number>(
await userActor.send(
userWorkflowQueueName("user.command.auth.update_many"),
{ model, where: transformedWhere, update: transformedUpdate },
{ wait: true, timeout: 60_000 },
),
);
return await userActor.authUpdateMany({ model, where: transformedWhere, update: transformedUpdate });
},
delete: async ({ model, where }) => {
const transformedWhere = transformWhereClause({ model, where, action: "delete" });
if (model === "verification") {
const organization = await appOrganization();
await sendOrganizationCommand(organization, "organization.command.better_auth.verification.delete", { where: transformedWhere });
await organization.commandBetterAuthVerificationDelete({ where: transformedWhere });
return;
}
@ -401,19 +383,17 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
const userActor = await getUser(userId);
const organization = await appOrganization();
const before = await userActor.betterAuthFindOneRecord({ model, where: transformedWhere });
expectQueueResponse<void>(
await userActor.send(userWorkflowQueueName("user.command.auth.delete"), { model, where: transformedWhere }, { wait: true, timeout: 60_000 }),
);
await userActor.authDelete({ model, where: transformedWhere });
if (model === "session" && before) {
await sendOrganizationCommand(organization, "organization.command.better_auth.session_index.delete", {
await organization.commandBetterAuthSessionIndexDelete({
sessionId: before.id,
sessionToken: before.token,
});
}
if (model === "account" && before) {
await sendOrganizationCommand(organization, "organization.command.better_auth.account_index.delete", {
await organization.commandBetterAuthAccountIndexDelete({
id: before.id,
providerId: before.providerId,
accountId: before.accountId,
@ -421,7 +401,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
}
if (model === "user" && before?.email) {
await sendOrganizationCommand(organization, "organization.command.better_auth.email_index.delete", {
await organization.commandBetterAuthEmailIndexDelete({
email: before.email.toLowerCase(),
});
}
@ -430,7 +410,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
deleteMany: async ({ model, where }) => {
const transformedWhere = transformWhereClause({ model, where, action: "deleteMany" });
if (model === "verification") {
return await ensureOrganizationVerification<number>("organization.command.better_auth.verification.delete_many", { where: transformedWhere });
return await ensureOrganizationVerification("commandBetterAuthVerificationDeleteMany", { where: transformedWhere });
}
if (model === "session") {
@ -441,11 +421,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
const userActor = await getUser(userId);
const organization = await appOrganization();
const sessions = await userActor.betterAuthFindManyRecords({ model, where: transformedWhere, limit: 5000 });
const deleted = expectQueueResponse<number>(
await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 60_000 }),
);
const deleted = await userActor.authDeleteMany({ model, where: transformedWhere });
for (const session of sessions) {
await sendOrganizationCommand(organization, "organization.command.better_auth.session_index.delete", {
await organization.commandBetterAuthSessionIndexDelete({
sessionId: session.id,
sessionToken: session.token,
});
@ -459,9 +437,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
}
const userActor = await getUser(userId);
const deleted = expectQueueResponse<number>(
await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 60_000 }),
);
const deleted = await userActor.authDeleteMany({ model, where: transformedWhere });
return deleted;
},
@ -533,9 +509,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
async upsertUserProfile(userId: string, patch: Record<string, unknown>) {
const userActor = await getUser(userId);
return expectQueueResponse(
await userActor.send(userWorkflowQueueName("user.command.profile.upsert"), { userId, patch }, { wait: true, timeout: 60_000 }),
);
return await userActor.profileUpsert({ userId, patch });
},
async setActiveOrganization(sessionId: string, activeOrganizationId: string | null) {
@ -544,13 +518,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
throw new Error(`Unknown auth session ${sessionId}`);
}
const userActor = await getUser(authState.user.id);
return expectQueueResponse(
await userActor.send(
userWorkflowQueueName("user.command.session_state.upsert"),
{ sessionId, activeOrganizationId },
{ wait: true, timeout: 60_000 },
),
);
return await userActor.sessionStateUpsert({ sessionId, activeOrganizationId });
},
async getAccessTokenForSession(sessionId: string) {

View file

@ -486,7 +486,15 @@ export function createBackendClient(options: BackendClientOptions): BackendClien
createWithInput: "app",
}) as unknown as AppOrganizationHandle;
const task = async (organizationId: string, repoId: string, taskId: string): Promise<TaskHandle> => client.task.get(taskKey(organizationId, repoId, taskId));
// getOrCreate is intentional here — this is the ONLY lazy creation point for
// virtual tasks (PR-driven entries that exist in the org's local tables but
// have no task actor yet). The task actor self-initializes from org data in
// getCurrentRecord(). Backend code must NEVER use getOrCreateTask except in
// createTaskMutation. See backend/CLAUDE.md "Lazy Task Actor Creation".
const task = async (organizationId: string, repoId: string, taskId: string): Promise<TaskHandle> =>
client.task.getOrCreate(taskKey(organizationId, repoId, taskId), {
createWithInput: { organizationId, repoId, taskId },
});
const sandboxByKey = async (organizationId: string, _providerId: SandboxProviderId, sandboxId: string): Promise<TaskSandboxHandle> => {
return (client as any).taskSandbox.get(taskSandboxKey(organizationId, sandboxId));

View file

@ -346,16 +346,17 @@ const TranscriptPanel = memo(function TranscriptPanel({
(activeAgentSession.status === "pending_provision" || activeAgentSession.status === "pending_session_create" || activeAgentSession.status === "error") &&
activeMessages.length === 0;
const serverDraft = promptSession?.draft.text ?? "";
const serverAttachments = promptSession?.draft.attachments ?? [];
const serverAttachments = promptSession?.draft.attachments;
const serverAttachmentsJson = JSON.stringify(serverAttachments ?? []);
// Sync server → local only when user hasn't typed recently (3s cooldown)
const DRAFT_SYNC_COOLDOWN_MS = 3_000;
useEffect(() => {
if (Date.now() - lastEditTimeRef.current > DRAFT_SYNC_COOLDOWN_MS) {
setLocalDraft(serverDraft);
setLocalAttachments(serverAttachments);
setLocalAttachments(serverAttachments ?? []);
}
}, [serverDraft, serverAttachments]);
}, [serverDraft, serverAttachmentsJson]);
// Reset local draft immediately on session/task switch
useEffect(() => {