diff --git a/.context/proposal-revert-actions-to-queues.md b/.context/proposal-revert-actions-to-queues.md index 7eed270..65a28e2 100644 --- a/.context/proposal-revert-actions-to-queues.md +++ b/.context/proposal-revert-actions-to-queues.md @@ -7,10 +7,10 @@ We converted all actors from queue/workflow-based communication to direct action ## Reference branches - **`main`** at commit `32f3c6c3` — the original queue/workflow code BEFORE the actions refactor -- **`queues-to-actions`** — the current refactored code using direct actions -- **`task-owner-git-auth`** at commit `f45a4674` — the merged PR #262 that introduced the actions pattern +- **`queues-to-actions`** — the actions refactor code with bug fixes (E2B, lazy tasks, etc.) +- **`task-owner-git-auth`** at commit `3684e2e5` — the CURRENT branch with all work including task owner system, lazy tasks, and actions refactor -Use `main` as the reference for the queue/workflow communication patterns. Use `queues-to-actions` as the reference for bug fixes and new features that MUST be preserved. +Use `main` as the reference for the queue/workflow communication patterns. Use `task-owner-git-auth` (current HEAD) as the authoritative source for ALL features and bug fixes that MUST be preserved — it has everything from `queues-to-actions` plus the task owner system. ## What to KEEP (do NOT revert these) @@ -60,6 +60,32 @@ These are bug fixes and improvements made during the actions refactor that are i - The audit-log actor was simplified to a single `append` action - Keep this simplification — audit-log doesn't need a workflow +### 11. Task owner (primary user) system +- New `task_owner` single-row table in task actor DB schema (`foundry/packages/backend/src/actors/task/db/schema.ts`) — stores `primaryUserId`, `primaryGithubLogin`, `primaryGithubEmail`, `primaryGithubAvatarUrl` +- New migration in `foundry/packages/backend/src/actors/task/db/migrations.ts` creating the `task_owner` table +- `primaryUserLogin` and `primaryUserAvatarUrl` columns added to org's `taskSummaries` table (`foundry/packages/backend/src/actors/organization/db/schema.ts`) + corresponding migration +- `readTaskOwner()`, `upsertTaskOwner()` helpers in `workspace.ts` +- `maybeSwapTaskOwner()` — called from `sendWorkspaceMessage()`, checks if a different user is sending and swaps owner + injects git credentials into sandbox +- `changeTaskOwnerManually()` — called from the new `changeOwner` action on the task actor, updates owner without injecting credentials (credentials injected on next message from that user) +- `injectGitCredentials()` — pushes `git config user.name/email` + credential store file into the sandbox via `runProcess` +- `resolveGithubIdentity()` — resolves user's GitHub login/email/avatar/accessToken from their auth session +- `buildTaskSummary()` now includes `primaryUserLogin` and `primaryUserAvatarUrl` in the summary pushed to org coordinator +- New `changeOwner` action on task actor in `workflow/index.ts` +- New `changeWorkspaceTaskOwner` action on org actor in `actions/tasks.ts` +- New `TaskWorkspaceChangeOwnerInput` type in shared types (`foundry/packages/shared/src/workspace.ts`) +- `TaskSummary` type extended with `primaryUserLogin` and `primaryUserAvatarUrl` + +### 12. Task owner UI +- New "Overview" tab in right sidebar (`foundry/packages/frontend/src/components/mock-layout/right-sidebar.tsx`) — shows current owner with avatar, click to open dropdown of org members to change owner +- `onChangeOwner` and `members` props added to `RightSidebar` component +- Primary user login shown in green in left sidebar task items (`foundry/packages/frontend/src/components/mock-layout/sidebar.tsx`) +- `changeWorkspaceTaskOwner` method added to backend client and workspace client interfaces + +### 13. Client changes for task owner +- `changeWorkspaceTaskOwner()` added to `backend-client.ts` and all workspace client implementations (mock, remote) +- Mock workspace client implements the owner change +- Subscription manager test updated for new task summary shape + ## What to REVERT (communication pattern only) For each actor, revert from direct action calls back to queue sends with `expectQueueResponse` / fire-and-forget patterns. The reference for the queue patterns is `main` at `32f3c6c3`. @@ -86,6 +112,7 @@ For each actor, revert from direct action calls back to queue sends with `expect - Keep `requireWorkspaceTask` using `getOrCreate` - Keep `getTask` using `getOrCreate` with `resolveTaskRepoId` - Keep `getTaskIndexEntry` +- Keep `changeWorkspaceTaskOwner` (new action — delegates to task actor's `changeOwner`) - Revert task actor calls from direct actions to queue sends where applicable **`actions/task-mutations.ts`:** @@ -109,9 +136,14 @@ For each actor, revert from direct action calls back to queue sends with `expect **`workflow/index.ts`:** - Restore `taskCommandActions` as queue handlers in the workflow command loop - Restore `TASK_QUEUE_NAMES` and dispatch map +- Add `changeOwner` to the queue dispatch map (new command, not in `main` — add as `task.command.changeOwner`) **`workspace.ts`:** - Revert sandbox/org action calls back to queue sends where they were queue-based before +- Keep ALL task owner code: `readTaskOwner`, `upsertTaskOwner`, `maybeSwapTaskOwner`, `changeTaskOwnerManually`, `injectGitCredentials`, `resolveGithubIdentity` +- Keep the `authSessionId` param added to `ensureSandboxRepo` +- Keep the `maybeSwapTaskOwner` call in `sendWorkspaceMessage` +- Keep `primaryUserLogin`/`primaryUserAvatarUrl` in `buildTaskSummary` ### 3. User actor (`foundry/packages/backend/src/actors/user/`) @@ -163,3 +195,8 @@ For each actor, revert from direct action calls back to queue sends with `expect - [ ] No 500 errors in backend logs (except expected E2B sandbox expiry) - [ ] Workflow history visible in RivetKit inspector for org, task, user actors - [ ] CLAUDE.md constraints still documented and respected +- [ ] Task owner shows in right sidebar "Overview" tab +- [ ] Owner dropdown shows org members and allows switching +- [ ] Sending a message as a different user swaps the owner +- [ ] Primary user login shown in green on sidebar task items +- [ ] Git credentials injected into sandbox on owner swap (check `/home/user/.git-token` exists) diff --git a/foundry/packages/backend/CLAUDE.md b/foundry/packages/backend/CLAUDE.md index ae4257e..8824682 100644 --- a/foundry/packages/backend/CLAUDE.md +++ b/foundry/packages/backend/CLAUDE.md @@ -198,6 +198,80 @@ curl -s -X POST 'http://127.0.0.1:6420/gateway//inspector/action/ { + const now = Date.now(); + await c.db + .insert(events) + .values({ + repoId: body.repoId ?? null, + taskId: body.taskId ?? null, + branchName: body.branchName ?? null, + kind: body.kind, + payloadJson: JSON.stringify(body.payload), + createdAt: now, + }) + .run(); + return { ok: true }; +} + +// --------------------------------------------------------------------------- +// Workflow command loop +// --------------------------------------------------------------------------- + +type AuditLogWorkflowHandler = (loopCtx: any, body: any) => Promise; + +const AUDIT_LOG_COMMAND_HANDLERS: Record = { + "auditLog.command.append": async (c, body) => appendMutation(c, body), +}; + +async function runAuditLogWorkflow(ctx: any): Promise { + await ctx.loop("audit-log-command-loop", async (loopCtx: any) => { + const msg = await loopCtx.queue.next("next-audit-log-command", { + names: [...AUDIT_LOG_QUEUE_NAMES], + completable: true, + }); + + if (!msg) { + return Loop.continue(undefined); + } + + const handler = AUDIT_LOG_COMMAND_HANDLERS[msg.name as AuditLogQueueName]; + if (!handler) { + logActorWarning("auditLog", "unknown audit-log command", { command: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {}); + return Loop.continue(undefined); + } + + try { + // Wrap in a step so c.state and c.db are accessible inside mutation functions. + const result = await loopCtx.step({ + name: msg.name, + timeout: 60_000, + run: async () => handler(loopCtx, msg.body), + }); + await msg.complete(result); + } catch (error) { + const message = resolveErrorMessage(error); + logActorWarning("auditLog", "audit-log workflow command failed", { + command: msg.name, + error: message, + }); + await msg.complete({ error: message }).catch(() => {}); + } + + return Loop.continue(undefined); + }); +} + +// --------------------------------------------------------------------------- +// Actor definition +// --------------------------------------------------------------------------- + /** * Organization-scoped audit log. One per org, not one per repo. * @@ -35,6 +123,7 @@ export interface ListAuditLogParams { */ export const auditLog = actor({ db: auditLogDb, + queues: Object.fromEntries(AUDIT_LOG_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "Audit Log", icon: "database", @@ -43,22 +132,14 @@ export const auditLog = actor({ organizationId: input.organizationId, }), actions: { - async append(c, body: AppendAuditLogCommand): Promise<{ ok: true }> { - const now = Date.now(); - await c.db - .insert(events) - .values({ - repoId: body.repoId ?? null, - taskId: body.taskId ?? null, - branchName: body.branchName ?? null, - kind: body.kind, - payloadJson: JSON.stringify(body.payload), - createdAt: now, - }) - .run(); + // Mutation — self-send to queue for workflow history + async append(c: any, body: AppendAuditLogCommand): Promise<{ ok: true }> { + const self = selfAuditLog(c); + await self.send(auditLogWorkflowQueueName("auditLog.command.append"), body, { wait: false }); return { ok: true }; }, + // Read — direct action (no queue) async list(c, params?: ListAuditLogParams): Promise { const whereParts = []; if (params?.repoId) { @@ -95,4 +176,5 @@ export const auditLog = actor({ })); }, }, + run: workflow(runAuditLogWorkflow), }); diff --git a/foundry/packages/backend/src/actors/github-data/index.ts b/foundry/packages/backend/src/actors/github-data/index.ts index a7d65a0..c010882 100644 --- a/foundry/packages/backend/src/actors/github-data/index.ts +++ b/foundry/packages/backend/src/actors/github-data/index.ts @@ -1,15 +1,17 @@ // @ts-nocheck import { eq, inArray } from "drizzle-orm"; -import { actor } from "rivetkit"; +import { actor, queue } from "rivetkit"; +import { workflow, Loop } from "rivetkit/workflow"; import type { FoundryOrganization } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../context.js"; import { getOrCreateOrganization, getTask } from "../handles.js"; +import { logActorWarning, resolveErrorMessage } from "../logging.js"; +import { taskWorkflowQueueName } from "../task/workflow/queue.js"; import { repoIdFromRemote } from "../../services/repo.js"; import { resolveOrganizationGithubAuth } from "../../services/github-auth.js"; -// actions called directly (no queue) +import { organizationWorkflowQueueName } from "../organization/queues.js"; import { githubDataDb } from "./db/db.js"; import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js"; -// workflow.ts is no longer used — commands are actions now const META_ROW_ID = 1; const SYNC_REPOSITORY_BATCH_SIZE = 10; @@ -74,7 +76,19 @@ interface ClearStateInput { label: string; } -// sendOrganizationCommand removed — org actions called directly +// Queue names for github-data actor +export const GITHUB_DATA_QUEUE_NAMES = [ + "githubData.command.syncRepos", + "githubData.command.handlePullRequestWebhook", + "githubData.command.clearState", + "githubData.command.reloadRepository", +] as const; + +type GithubDataQueueName = (typeof GITHUB_DATA_QUEUE_NAMES)[number]; + +export function githubDataWorkflowQueueName(name: GithubDataQueueName): GithubDataQueueName { + return name; +} interface PullRequestWebhookInput { connectedAccount: string; @@ -209,18 +223,22 @@ async function writeMeta(c: any, patch: Partial) { async function publishSyncProgress(c: any, patch: Partial): Promise { const meta = await writeMeta(c, patch); const organization = await getOrCreateOrganization(c, c.state.organizationId); - await organization.commandApplyGithubSyncProgress({ - connectedAccount: meta.connectedAccount, - installationStatus: meta.installationStatus, - installationId: meta.installationId, - syncStatus: meta.syncStatus, - lastSyncLabel: meta.lastSyncLabel, - lastSyncAt: meta.lastSyncAt, - syncGeneration: meta.syncGeneration, - syncPhase: meta.syncPhase, - processedRepositoryCount: meta.processedRepositoryCount, - totalRepositoryCount: meta.totalRepositoryCount, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.github.sync_progress.apply"), + { + connectedAccount: meta.connectedAccount, + installationStatus: meta.installationStatus, + installationId: meta.installationId, + syncStatus: meta.syncStatus, + lastSyncLabel: meta.lastSyncLabel, + lastSyncAt: meta.lastSyncAt, + syncGeneration: meta.syncGeneration, + syncPhase: meta.syncPhase, + processedRepositoryCount: meta.processedRepositoryCount, + totalRepositoryCount: meta.totalRepositoryCount, + }, + { wait: false }, + ); return meta; } @@ -424,7 +442,13 @@ async function refreshTaskSummaryForBranch(c: any, repoId: string, branchName: s return; } const organization = await getOrCreateOrganization(c, c.state.organizationId); - void organization.commandRefreshTaskSummaryForBranch({ repoId, branchName, pullRequest, repoName: repositoryRecord.fullName ?? undefined }).catch(() => {}); + void organization + .send( + organizationWorkflowQueueName("organization.command.refreshTaskSummaryForBranch"), + { repoId, branchName, pullRequest, repoName: repositoryRecord.fullName ?? undefined }, + { wait: false }, + ) + .catch(() => {}); } async function emitPullRequestChangeEvents(c: any, beforeRows: any[], afterRows: any[]) { @@ -472,7 +496,7 @@ async function autoArchiveTaskForClosedPullRequest(c: any, row: any) { } try { const task = getTask(c, c.state.organizationId, row.repoId, match.taskId); - void task.archive({ reason: `PR ${String(row.state).toLowerCase()}` }).catch(() => {}); + void task.send(taskWorkflowQueueName("task.command.archive"), { reason: `PR ${String(row.state).toLowerCase()}` }, { wait: false }).catch(() => {}); } catch { // Best-effort only. Task summary refresh will still clear the PR state. } @@ -877,8 +901,79 @@ export async function fullSyncError(c: any, error: unknown): Promise { }); } +// --------------------------------------------------------------------------- +// Workflow command loop +// --------------------------------------------------------------------------- + +type GithubDataWorkflowHandler = (loopCtx: any, body: any) => Promise; + +const GITHUB_DATA_COMMAND_HANDLERS: Record = { + "githubData.command.syncRepos": async (c, body) => { + try { + await runFullSync(c, body); + return { ok: true }; + } catch (error) { + try { + await fullSyncError(c, error); + } catch { + /* best effort */ + } + throw error; + } + }, + "githubData.command.handlePullRequestWebhook": async (c, body) => { + await handlePullRequestWebhookMutation(c, body); + return { ok: true }; + }, + "githubData.command.clearState": async (c, body) => { + await clearStateMutation(c, body); + return { ok: true }; + }, + "githubData.command.reloadRepository": async (c, body) => reloadRepositoryMutation(c, body), +}; + +async function runGithubDataWorkflow(ctx: any): Promise { + await ctx.loop("github-data-command-loop", async (loopCtx: any) => { + const msg = await loopCtx.queue.next("next-github-data-command", { + names: [...GITHUB_DATA_QUEUE_NAMES], + completable: true, + }); + + if (!msg) { + return Loop.continue(undefined); + } + + const handler = GITHUB_DATA_COMMAND_HANDLERS[msg.name as GithubDataQueueName]; + if (!handler) { + logActorWarning("github-data", "unknown github-data command", { command: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {}); + return Loop.continue(undefined); + } + + try { + // Wrap in a step so c.state and c.db are accessible inside mutation functions. + const result = await loopCtx.step({ + name: msg.name, + timeout: 10 * 60_000, + run: async () => handler(loopCtx, msg.body), + }); + await msg.complete(result); + } catch (error) { + const message = resolveErrorMessage(error); + logActorWarning("github-data", "github-data workflow command failed", { + command: msg.name, + error: message, + }); + await msg.complete({ error: message }).catch(() => {}); + } + + return Loop.continue(undefined); + }); +} + export const githubData = actor({ db: githubDataDb, + queues: Object.fromEntries(GITHUB_DATA_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "GitHub Data", icon: "github", @@ -945,35 +1040,8 @@ export const githubData = actor({ })) .sort((left, right) => left.branchName.localeCompare(right.branchName)); }, - - async syncRepos(c, body: any) { - try { - await runFullSync(c, body); - return { ok: true }; - } catch (error) { - try { - await fullSyncError(c, error); - } catch { - /* best effort */ - } - throw error; - } - }, - - async reloadRepository(c, body: { repoId: string }) { - return await reloadRepositoryMutation(c, body); - }, - - async clearState(c, body: any) { - await clearStateMutation(c, body); - return { ok: true }; - }, - - async handlePullRequestWebhook(c, body: any) { - await handlePullRequestWebhookMutation(c, body); - return { ok: true }; - }, }, + run: workflow(runGithubDataWorkflow), }); export async function reloadRepositoryMutation(c: any, input: { repoId: string }) { diff --git a/foundry/packages/backend/src/actors/handles.ts b/foundry/packages/backend/src/actors/handles.ts index 2cc83d9..5aa5715 100644 --- a/foundry/packages/backend/src/actors/handles.ts +++ b/foundry/packages/backend/src/actors/handles.ts @@ -79,3 +79,7 @@ export function selfUser(c: any) { export function selfGithubData(c: any) { return actorClient(c).githubData.getForId(c.actorId); } + +export function selfTaskSandbox(c: any) { + return actorClient(c).taskSandbox.getForId(c.actorId); +} diff --git a/foundry/packages/backend/src/actors/organization/actions/github.ts b/foundry/packages/backend/src/actors/organization/actions/github.ts index ff14d7e..6966405 100644 --- a/foundry/packages/backend/src/actors/organization/actions/github.ts +++ b/foundry/packages/backend/src/actors/organization/actions/github.ts @@ -1,6 +1,7 @@ import { desc } from "drizzle-orm"; import type { FoundryAppSnapshot } from "@sandbox-agent/foundry-shared"; import { getOrCreateGithubData, getOrCreateOrganization } from "../../handles.js"; +import { githubDataWorkflowQueueName } from "../../github-data/index.js"; import { authSessionIndex } from "../db/schema.js"; import { assertAppOrganization, @@ -11,6 +12,7 @@ import { } from "../app-shell.js"; import { getBetterAuthService } from "../../../services/better-auth.js"; import { refreshOrganizationSnapshotMutation } from "../actions.js"; +import { organizationWorkflowQueueName } from "../queues.js"; export const organizationGithubActions = { async resolveAppGithubToken( @@ -58,21 +60,27 @@ export const organizationGithubActions = { } const organizationHandle = await getOrCreateOrganization(c, input.organizationId); - await organizationHandle.commandMarkSyncStarted({ label: "Importing repository catalog..." }); - await organizationHandle.commandBroadcastSnapshot({}); + await organizationHandle.send( + organizationWorkflowQueueName("organization.command.shell.sync_started.mark"), + { label: "Importing repository catalog..." }, + { wait: false }, + ); + await organizationHandle.send(organizationWorkflowQueueName("organization.command.snapshot.broadcast"), {}, { wait: false }); - void githubData.syncRepos({ label: "Importing repository catalog..." }).catch(() => {}); + void githubData + .send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Importing repository catalog..." }, { wait: false }) + .catch(() => {}); return await buildAppSnapshot(c, input.sessionId); }, async adminReloadGithubOrganization(c: any): Promise { const githubData = await getOrCreateGithubData(c, c.state.organizationId); - await githubData.syncRepos({ label: "Reloading GitHub organization..." }); + await githubData.send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Reloading GitHub organization..." }, { wait: false }); }, async adminReloadGithubRepository(c: any, input: { repoId: string }): Promise { const githubData = await getOrCreateGithubData(c, c.state.organizationId); - await githubData.reloadRepository(input); + await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), input, { wait: false }); }, }; diff --git a/foundry/packages/backend/src/actors/organization/actions/organization.ts b/foundry/packages/backend/src/actors/organization/actions/organization.ts index d38e113..978628b 100644 --- a/foundry/packages/backend/src/actors/organization/actions/organization.ts +++ b/foundry/packages/backend/src/actors/organization/actions/organization.ts @@ -1,7 +1,6 @@ import type { FoundryAppSnapshot, UpdateFoundryOrganizationProfileInput, WorkspaceModelId } from "@sandbox-agent/foundry-shared"; import { getBetterAuthService } from "../../../services/better-auth.js"; import { getOrCreateOrganization } from "../../handles.js"; -// actions called directly (no queue) import { assertAppOrganization, assertOrganizationShell, @@ -11,7 +10,7 @@ import { requireEligibleOrganization, requireSignedInSession, } from "../app-shell.js"; -// org queue names removed — using direct actions +import { organizationWorkflowQueueName } from "../queues.js"; export const organizationShellActions = { async getAppSnapshot(c: any, input: { sessionId: string }): Promise { @@ -35,11 +34,15 @@ export const organizationShellActions = { const session = await requireSignedInSession(c, input.sessionId); requireEligibleOrganization(session, input.organizationId); const organization = await getOrCreateOrganization(c, input.organizationId); - await organization.commandUpdateShellProfile({ - displayName: input.displayName, - slug: input.slug, - primaryDomain: input.primaryDomain, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.shell.profile.update"), + { + displayName: input.displayName, + slug: input.slug, + primaryDomain: input.primaryDomain, + }, + { wait: true, timeout: 10_000 }, + ); return await buildAppSnapshot(c, input.sessionId); }, diff --git a/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts index 3049bb4..a067563 100644 --- a/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts +++ b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts @@ -17,6 +17,8 @@ import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../../service // actions return directly (no queue response unwrapping) import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; +import { taskWorkflowQueueName } from "../../task/workflow/queue.js"; +import { expectQueueResponse } from "../../../services/queue.js"; import { taskIndex, taskSummaries } from "../db/schema.js"; import { refreshOrganizationSnapshotMutation } from "../actions.js"; @@ -202,12 +204,18 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis throw error; } - const created = await taskHandle.initialize({ - sandboxProviderId: cmd.sandboxProviderId, - branchName: initialBranchName, - title: initialTitle, - task: cmd.task, - }); + const created = expectQueueResponse( + await taskHandle.send( + taskWorkflowQueueName("task.command.initialize"), + { + sandboxProviderId: cmd.sandboxProviderId, + branchName: initialBranchName, + title: initialTitle, + task: cmd.task, + }, + { wait: true, timeout: 10_000 }, + ), + ); try { await upsertTaskSummary(c, await taskHandle.getTaskSummary({})); @@ -384,7 +392,7 @@ export async function refreshTaskSummaryForBranchMutation( // Best-effort notify the task actor if it exists (fire-and-forget) try { const task = getTask(c, c.state.organizationId, input.repoId, row.taskId); - void task.pullRequestSync({ pullRequest }).catch(() => {}); + void task.send(taskWorkflowQueueName("task.command.pull_request.sync"), { pullRequest }, { wait: false }).catch(() => {}); } catch { // Task actor doesn't exist yet — that's fine, it's virtual } diff --git a/foundry/packages/backend/src/actors/organization/actions/tasks.ts b/foundry/packages/backend/src/actors/organization/actions/tasks.ts index c3794e1..48538e8 100644 --- a/foundry/packages/backend/src/actors/organization/actions/tasks.ts +++ b/foundry/packages/backend/src/actors/organization/actions/tasks.ts @@ -25,6 +25,8 @@ import { getActorRuntimeContext } from "../../context.js"; import { getOrCreateAuditLog, getOrCreateTask, getTask as getTaskHandle } from "../../handles.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; import { logActorWarning, resolveErrorMessage } from "../../logging.js"; +import { taskWorkflowQueueName } from "../../task/workflow/queue.js"; +import { expectQueueResponse } from "../../../services/queue.js"; import { taskIndex, taskSummaries } from "../db/schema.js"; import { createTaskMutation, @@ -131,11 +133,15 @@ export const organizationTaskActions = { const task = await requireWorkspaceTask(c, input.repoId, created.taskId); void task - .createSessionAndSend({ - model: input.model, - text: input.task, - authSessionId: input.authSessionId, - }) + .send( + taskWorkflowQueueName("task.command.workspace.create_session_and_send"), + { + model: input.model, + text: input.task, + authSessionId: input.authSessionId, + }, + { wait: false }, + ) .catch(() => {}); return { taskId: created.taskId }; @@ -143,94 +149,132 @@ export const organizationTaskActions = { async markWorkspaceUnread(c: any, input: TaskWorkspaceSelectInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.markUnread({ authSessionId: input.authSessionId }); + await task.send(taskWorkflowQueueName("task.command.workspace.mark_unread"), { authSessionId: input.authSessionId }, { wait: false }); }, async renameWorkspaceTask(c: any, input: TaskWorkspaceRenameInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.renameTask({ value: input.value }); + await task.send(taskWorkflowQueueName("task.command.workspace.rename_task"), { value: input.value }, { wait: false }); }, async createWorkspaceSession(c: any, input: TaskWorkspaceSelectInput & { model?: string }): Promise<{ sessionId: string }> { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - return await task.createSession({ - ...(input.model ? { model: input.model } : {}), - ...(input.authSessionId ? { authSessionId: input.authSessionId } : {}), - }); + return expectQueueResponse( + await task.send( + taskWorkflowQueueName("task.command.workspace.create_session"), + { + ...(input.model ? { model: input.model } : {}), + ...(input.authSessionId ? { authSessionId: input.authSessionId } : {}), + }, + { wait: true, timeout: 10_000 }, + ), + ); }, async renameWorkspaceSession(c: any, input: TaskWorkspaceRenameSessionInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.renameSession({ sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId }); + await task.send( + taskWorkflowQueueName("task.command.workspace.rename_session"), + { sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId }, + { wait: false }, + ); }, async selectWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.selectSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }); + await task.send( + taskWorkflowQueueName("task.command.workspace.select_session"), + { sessionId: input.sessionId, authSessionId: input.authSessionId }, + { wait: false }, + ); }, async setWorkspaceSessionUnread(c: any, input: TaskWorkspaceSetSessionUnreadInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.setSessionUnread({ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId }); + await task.send( + taskWorkflowQueueName("task.command.workspace.set_session_unread"), + { sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId }, + { wait: false }, + ); }, async updateWorkspaceDraft(c: any, input: TaskWorkspaceUpdateDraftInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); void task - .updateDraft({ - sessionId: input.sessionId, - text: input.text, - attachments: input.attachments, - authSessionId: input.authSessionId, - }) + .send( + taskWorkflowQueueName("task.command.workspace.update_draft"), + { + sessionId: input.sessionId, + text: input.text, + attachments: input.attachments, + authSessionId: input.authSessionId, + }, + { wait: false }, + ) .catch(() => {}); }, async changeWorkspaceModel(c: any, input: TaskWorkspaceChangeModelInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.changeModel({ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId }); + await task.send( + taskWorkflowQueueName("task.command.workspace.change_model"), + { sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId }, + { wait: false }, + ); }, async sendWorkspaceMessage(c: any, input: TaskWorkspaceSendMessageInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); void task - .sendMessage({ - sessionId: input.sessionId, - text: input.text, - attachments: input.attachments, - authSessionId: input.authSessionId, - }) + .send( + taskWorkflowQueueName("task.command.workspace.send_message"), + { + sessionId: input.sessionId, + text: input.text, + attachments: input.attachments, + authSessionId: input.authSessionId, + }, + { wait: false }, + ) .catch(() => {}); }, async stopWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - void task.stopSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {}); + void task + .send(taskWorkflowQueueName("task.command.workspace.stop_session"), { sessionId: input.sessionId, authSessionId: input.authSessionId }, { wait: false }) + .catch(() => {}); }, async closeWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - void task.closeSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {}); + void task + .send(taskWorkflowQueueName("task.command.workspace.close_session"), { sessionId: input.sessionId, authSessionId: input.authSessionId }, { wait: false }) + .catch(() => {}); }, async publishWorkspacePr(c: any, input: TaskWorkspaceSelectInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - void task.publishPr({}).catch(() => {}); + void task.send(taskWorkflowQueueName("task.command.workspace.publish_pr"), {}, { wait: false }).catch(() => {}); }, async changeWorkspaceTaskOwner(c: any, input: TaskWorkspaceChangeOwnerInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - await task.changeOwner({ - primaryUserId: input.targetUserId, - primaryGithubLogin: input.targetUserName, - primaryGithubEmail: input.targetUserEmail, - primaryGithubAvatarUrl: null, - }); + await task.send( + taskWorkflowQueueName("task.command.workspace.change_owner"), + { + primaryUserId: input.targetUserId, + primaryGithubLogin: input.targetUserName, + primaryGithubEmail: input.targetUserEmail, + primaryGithubAvatarUrl: null, + }, + { wait: false }, + ); }, async revertWorkspaceFile(c: any, input: TaskWorkspaceDiffInput): Promise { const task = await requireWorkspaceTask(c, input.repoId, input.taskId); - void task.revertFile(input).catch(() => {}); + void task.send(taskWorkflowQueueName("task.command.workspace.revert_file"), input, { wait: false }).catch(() => {}); }, async getRepoOverview(c: any, input: RepoOverviewInput): Promise { @@ -250,7 +294,9 @@ export const organizationTaskActions = { async switchTask(c: any, input: { repoId: string; taskId: string }): Promise { const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); const record = await h.get(); - const switched = await h.switchTask({}); + const switched = expectQueueResponse<{ switchTarget: string | null }>( + await h.send(taskWorkflowQueueName("task.command.switch"), {}, { wait: true, timeout: 10_000 }), + ); return { organizationId: c.state.organizationId, taskId: input.taskId, @@ -288,42 +334,42 @@ export const organizationTaskActions = { assertOrganization(c, input.organizationId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - return await h.attach({ reason: input.reason }); + return expectQueueResponse(await h.send(taskWorkflowQueueName("task.command.attach"), { reason: input.reason }, { wait: true, timeout: 10_000 })); }, async pushTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - void h.push({ reason: input.reason }).catch(() => {}); + void h.send(taskWorkflowQueueName("task.command.push"), { reason: input.reason }, { wait: false }).catch(() => {}); }, async syncTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - void h.sync({ reason: input.reason }).catch(() => {}); + void h.send(taskWorkflowQueueName("task.command.sync"), { reason: input.reason }, { wait: false }).catch(() => {}); }, async mergeTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - void h.merge({ reason: input.reason }).catch(() => {}); + void h.send(taskWorkflowQueueName("task.command.merge"), { reason: input.reason }, { wait: false }).catch(() => {}); }, async archiveTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - void h.archive({ reason: input.reason }).catch(() => {}); + void h.send(taskWorkflowQueueName("task.command.archive"), { reason: input.reason }, { wait: false }).catch(() => {}); }, async killTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); - void h.kill({ reason: input.reason }).catch(() => {}); + void h.send(taskWorkflowQueueName("task.command.kill"), { reason: input.reason }, { wait: false }).catch(() => {}); }, async getRepositoryMetadata(c: any, input: { repoId: string }): Promise<{ defaultBranch: string | null; fullName: string | null; remoteUrl: string }> { diff --git a/foundry/packages/backend/src/actors/organization/app-shell.ts b/foundry/packages/backend/src/actors/organization/app-shell.ts index dce5855..dc9de97 100644 --- a/foundry/packages/backend/src/actors/organization/app-shell.ts +++ b/foundry/packages/backend/src/actors/organization/app-shell.ts @@ -17,6 +17,8 @@ import { GitHubAppError } from "../../services/app-github.js"; import { getBetterAuthService } from "../../services/better-auth.js"; import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js"; import { logger } from "../../logging.js"; +import { githubDataWorkflowQueueName } from "../github-data/index.js"; +import { organizationWorkflowQueueName } from "./queues.js"; import { invoices, organizationMembers, organizationProfile, seatAssignments, stripeLookup } from "./db/schema.js"; import { APP_SHELL_ORGANIZATION_ID } from "./constants.js"; @@ -482,19 +484,23 @@ async function syncGithubOrganizationsInternal(c: any, input: { sessionId: strin const organizationId = organizationOrganizationId(account.kind, account.githubLogin); const installation = installations.find((candidate) => candidate.accountLogin === account.githubLogin) ?? null; const organization = await getOrCreateOrganization(c, organizationId); - await organization.commandSyncOrganizationShellFromGithub({ - userId: githubUserId, - userName: viewer.name || viewer.login, - userEmail: viewer.email ?? `${viewer.login}@users.noreply.github.com`, - githubUserLogin: viewer.login, - githubAccountId: account.githubAccountId, - githubLogin: account.githubLogin, - githubAccountType: account.githubAccountType, - kind: account.kind, - displayName: account.displayName, - installationId: installation?.id ?? null, - appConfigured: appShell.github.isAppConfigured(), - }); + await organization.send( + organizationWorkflowQueueName("organization.command.github.organization_shell.sync_from_github"), + { + userId: githubUserId, + userName: viewer.name || viewer.login, + userEmail: viewer.email ?? `${viewer.login}@users.noreply.github.com`, + githubUserLogin: viewer.login, + githubAccountId: account.githubAccountId, + githubLogin: account.githubLogin, + githubAccountType: account.githubAccountType, + kind: account.kind, + displayName: account.displayName, + installationId: installation?.id ?? null, + appConfigured: appShell.github.isAppConfigured(), + }, + { wait: true, timeout: 10_000 }, + ); linkedOrganizationIds.push(organizationId); } @@ -677,10 +683,14 @@ async function applySubscriptionState( }, fallbackPlanId: FoundryBillingPlanId, ): Promise { - await organization.commandApplyStripeSubscription({ - subscription, - fallbackPlanId, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.billing.stripe_subscription.apply"), + { + subscription, + fallbackPlanId, + }, + { wait: true, timeout: 10_000 }, + ); } export const organizationAppActions = { @@ -693,9 +703,13 @@ export const organizationAppActions = { const organizationState = await getOrganizationState(organizationHandle); if (input.planId === "free") { - await organizationHandle.commandApplyFreePlan({ - clearSubscription: false, - }); + await organizationHandle.send( + organizationWorkflowQueueName("organization.command.billing.free_plan.apply"), + { + clearSubscription: false, + }, + { wait: true, timeout: 10_000 }, + ); return { url: `${appShell.appUrl}/organizations/${input.organizationId}/billing`, }; @@ -714,9 +728,13 @@ export const organizationAppActions = { email: session.currentUserEmail, }) ).id; - await organizationHandle.commandApplyStripeCustomer({ - customerId, - }); + await organizationHandle.send( + organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"), + { + customerId, + }, + { wait: true, timeout: 10_000 }, + ); await upsertStripeLookupEntries(c, input.organizationId, customerId, null); } @@ -744,9 +762,13 @@ export const organizationAppActions = { const completion = await appShell.stripe.retrieveCheckoutCompletion(input.checkoutSessionId); if (completion.customerId) { - await organizationHandle.commandApplyStripeCustomer({ - customerId: completion.customerId, - }); + await organizationHandle.send( + organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"), + { + customerId: completion.customerId, + }, + { wait: true, timeout: 10_000 }, + ); } await upsertStripeLookupEntries(c, input.organizationId, completion.customerId, completion.subscriptionId); @@ -756,9 +778,13 @@ export const organizationAppActions = { } if (completion.paymentMethodLabel) { - await organizationHandle.commandSetPaymentMethod({ - label: completion.paymentMethodLabel, - }); + await organizationHandle.send( + organizationWorkflowQueueName("organization.command.billing.payment_method.set"), + { + label: completion.paymentMethodLabel, + }, + { wait: true, timeout: 10_000 }, + ); } return { @@ -796,9 +822,13 @@ export const organizationAppActions = { await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId); await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id); } else { - await organizationHandle.commandSetBillingStatus({ - status: "scheduled_cancel", - }); + await organizationHandle.send( + organizationWorkflowQueueName("organization.command.billing.status.set"), + { + status: "scheduled_cancel", + }, + { wait: true, timeout: 10_000 }, + ); } return await buildAppSnapshot(c, input.sessionId); @@ -817,9 +847,13 @@ export const organizationAppActions = { await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId); await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id); } else { - await organizationHandle.commandSetBillingStatus({ - status: "active", - }); + await organizationHandle.send( + organizationWorkflowQueueName("organization.command.billing.status.set"), + { + status: "active", + }, + { wait: true, timeout: 10_000 }, + ); } return await buildAppSnapshot(c, input.sessionId); @@ -830,9 +864,13 @@ export const organizationAppActions = { const session = await requireSignedInSession(c, input.sessionId); requireEligibleOrganization(session, input.organizationId); const organization = await getOrCreateOrganization(c, input.organizationId); - await organization.commandRecordSeatUsage({ - email: session.currentUserEmail, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.billing.seat_usage.record"), + { + email: session.currentUserEmail, + }, + { wait: true, timeout: 10_000 }, + ); return await buildAppSnapshot(c, input.sessionId); }, @@ -853,9 +891,13 @@ export const organizationAppActions = { if (organizationId) { const organization = await getOrCreateOrganization(c, organizationId); if (typeof object.customer === "string") { - await organization.commandApplyStripeCustomer({ - customerId: object.customer, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"), + { + customerId: object.customer, + }, + { wait: true, timeout: 10_000 }, + ); } await upsertStripeLookupEntries( c, @@ -888,9 +930,13 @@ export const organizationAppActions = { const organizationId = await findOrganizationIdForStripeEvent(c, subscription.customerId, subscription.id); if (organizationId) { const organization = await getOrCreateOrganization(c, organizationId); - await organization.commandApplyFreePlan({ - clearSubscription: true, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.billing.free_plan.apply"), + { + clearSubscription: true, + }, + { wait: true, timeout: 10_000 }, + ); } return { ok: true }; } @@ -902,13 +948,17 @@ export const organizationAppActions = { const organization = await getOrCreateOrganization(c, organizationId); const rawAmount = typeof invoice.amount_paid === "number" ? invoice.amount_paid : invoice.amount_due; const amountUsd = Math.round((typeof rawAmount === "number" ? rawAmount : 0) / 100); - await organization.commandUpsertInvoice({ - id: String(invoice.id), - label: typeof invoice.number === "string" ? `Invoice ${invoice.number}` : "Stripe invoice", - issuedAt: formatUnixDate(typeof invoice.created === "number" ? invoice.created : Math.floor(Date.now() / 1000)), - amountUsd: Number.isFinite(amountUsd) ? amountUsd : 0, - status: event.type === "invoice.paid" ? "paid" : "open", - }); + await organization.send( + organizationWorkflowQueueName("organization.command.billing.invoice.upsert"), + { + id: String(invoice.id), + label: typeof invoice.number === "string" ? `Invoice ${invoice.number}` : "Stripe invoice", + issuedAt: formatUnixDate(typeof invoice.created === "number" ? invoice.created : Math.floor(Date.now() / 1000)), + amountUsd: Number.isFinite(amountUsd) ? amountUsd : 0, + status: event.type === "invoice.paid" ? "paid" : "open", + }, + { wait: true, timeout: 10_000 }, + ); } } @@ -938,12 +988,16 @@ export const organizationAppActions = { const organizationId = organizationOrganizationId(kind, accountLogin); const receivedAt = Date.now(); const organization = await getOrCreateOrganization(c, organizationId); - await organization.commandRecordGithubWebhookReceipt({ - organizationId: organizationId, - event, - action: body.action ?? null, - receivedAt, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.github.webhook_receipt.record"), + { + organizationId: organizationId, + event, + action: body.action ?? null, + receivedAt, + }, + { wait: false }, + ); const githubData = await getOrCreateGithubData(c, organizationId); if (event === "installation" && (body.action === "created" || body.action === "deleted" || body.action === "suspend" || body.action === "unsuspend")) { @@ -957,40 +1011,56 @@ export const organizationAppActions = { "installation_event", ); if (body.action === "deleted") { - await githubData.clearState({ - connectedAccount: accountLogin, - installationStatus: "install_required", - installationId: null, - label: "GitHub App installation removed", - }); + await githubData.send( + githubDataWorkflowQueueName("githubData.command.clearState"), + { + connectedAccount: accountLogin, + installationStatus: "install_required", + installationId: null, + label: "GitHub App installation removed", + }, + { wait: false }, + ); } else if (body.action === "created") { void githubData - .syncRepos({ - connectedAccount: accountLogin, - installationStatus: "connected", - installationId: body.installation?.id ?? null, - githubLogin: accountLogin, - kind, - label: "Syncing GitHub data from installation webhook...", - }) + .send( + githubDataWorkflowQueueName("githubData.command.syncRepos"), + { + connectedAccount: accountLogin, + installationStatus: "connected", + installationId: body.installation?.id ?? null, + githubLogin: accountLogin, + kind, + label: "Syncing GitHub data from installation webhook...", + }, + { wait: false }, + ) .catch(() => {}); } else if (body.action === "suspend") { - await githubData.clearState({ - connectedAccount: accountLogin, - installationStatus: "reconnect_required", - installationId: body.installation?.id ?? null, - label: "GitHub App installation suspended", - }); + await githubData.send( + githubDataWorkflowQueueName("githubData.command.clearState"), + { + connectedAccount: accountLogin, + installationStatus: "reconnect_required", + installationId: body.installation?.id ?? null, + label: "GitHub App installation suspended", + }, + { wait: false }, + ); } else if (body.action === "unsuspend") { void githubData - .syncRepos({ - connectedAccount: accountLogin, - installationStatus: "connected", - installationId: body.installation?.id ?? null, - githubLogin: accountLogin, - kind, - label: "Resyncing GitHub data after unsuspend...", - }) + .send( + githubDataWorkflowQueueName("githubData.command.syncRepos"), + { + connectedAccount: accountLogin, + installationStatus: "connected", + installationId: body.installation?.id ?? null, + githubLogin: accountLogin, + kind, + label: "Resyncing GitHub data after unsuspend...", + }, + { wait: false }, + ) .catch(() => {}); } return { ok: true }; @@ -1009,14 +1079,18 @@ export const organizationAppActions = { "repository_membership_changed", ); void githubData - .syncRepos({ - connectedAccount: accountLogin, - installationStatus: "connected", - installationId: body.installation?.id ?? null, - githubLogin: accountLogin, - kind, - label: "Resyncing GitHub data after repository access change...", - }) + .send( + githubDataWorkflowQueueName("githubData.command.syncRepos"), + { + connectedAccount: accountLogin, + installationStatus: "connected", + installationId: body.installation?.id ?? null, + githubLogin: accountLogin, + kind, + label: "Resyncing GitHub data after repository access change...", + }, + { wait: false }, + ) .catch(() => {}); return { ok: true }; } @@ -1045,35 +1119,39 @@ export const organizationAppActions = { "repository_event", ); if (event === "pull_request" && body.repository?.clone_url && body.pull_request) { - await githubData.handlePullRequestWebhook({ - connectedAccount: accountLogin, - installationStatus: "connected", - installationId: body.installation?.id ?? null, - repository: { - fullName: body.repository.full_name, - cloneUrl: body.repository.clone_url, - private: Boolean(body.repository.private), + await githubData.send( + githubDataWorkflowQueueName("githubData.command.handlePullRequestWebhook"), + { + connectedAccount: accountLogin, + installationStatus: "connected", + installationId: body.installation?.id ?? null, + repository: { + fullName: body.repository.full_name, + cloneUrl: body.repository.clone_url, + private: Boolean(body.repository.private), + }, + pullRequest: { + number: body.pull_request.number, + status: body.pull_request.draft ? "draft" : "ready", + title: body.pull_request.title ?? "", + body: body.pull_request.body ?? null, + state: body.pull_request.state ?? "open", + url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`, + headRefName: body.pull_request.head?.ref ?? "", + baseRefName: body.pull_request.base?.ref ?? "", + authorLogin: body.pull_request.user?.login ?? null, + isDraft: Boolean(body.pull_request.draft), + merged: Boolean(body.pull_request.merged), + }, }, - pullRequest: { - number: body.pull_request.number, - status: body.pull_request.draft ? "draft" : "ready", - title: body.pull_request.title ?? "", - body: body.pull_request.body ?? null, - state: body.pull_request.state ?? "open", - url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`, - headRefName: body.pull_request.head?.ref ?? "", - baseRefName: body.pull_request.base?.ref ?? "", - authorLogin: body.pull_request.user?.login ?? null, - isDraft: Boolean(body.pull_request.draft), - merged: Boolean(body.pull_request.merged), - }, - }); + { wait: false }, + ); } if ((event === "push" || event === "create" || event === "delete") && body.repository?.clone_url) { const repoId = repoIdFromRemote(body.repository.clone_url); const knownRepository = await githubData.getRepository({ repoId }); if (knownRepository) { - await githubData.reloadRepository({ repoId }); + await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), { repoId }, { wait: false }); } } } @@ -1232,14 +1310,18 @@ export async function syncOrganizationShellFromGithubMutation( if (needsInitialSync) { const githubData = await getOrCreateGithubData(c, organizationId); void githubData - .syncRepos({ - connectedAccount: input.githubLogin, - installationStatus: "connected", - installationId: input.installationId, - githubLogin: input.githubLogin, - kind: input.kind, - label: "Initial repository sync...", - }) + .send( + githubDataWorkflowQueueName("githubData.command.syncRepos"), + { + connectedAccount: input.githubLogin, + installationStatus: "connected", + installationId: input.installationId, + githubLogin: input.githubLogin, + kind: input.kind, + label: "Initial repository sync...", + }, + { wait: false }, + ) .catch(() => {}); } diff --git a/foundry/packages/backend/src/actors/organization/index.ts b/foundry/packages/backend/src/actors/organization/index.ts index 1bd8896..9ceb27f 100644 --- a/foundry/packages/backend/src/actors/organization/index.ts +++ b/foundry/packages/backend/src/actors/organization/index.ts @@ -1,10 +1,13 @@ -import { actor } from "rivetkit"; +import { actor, queue } from "rivetkit"; +import { workflow } from "rivetkit/workflow"; import { organizationDb } from "./db/db.js"; import { organizationActions } from "./actions.js"; -import { organizationCommandActions } from "./workflow.js"; +import { runOrganizationWorkflow } from "./workflow.js"; +import { ORGANIZATION_QUEUE_NAMES } from "./queues.js"; export const organization = actor({ db: organizationDb, + queues: Object.fromEntries(ORGANIZATION_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "Organization", icon: "compass", @@ -15,6 +18,6 @@ export const organization = actor({ }), actions: { ...organizationActions, - ...organizationCommandActions, }, + run: workflow(runOrganizationWorkflow), }); diff --git a/foundry/packages/backend/src/actors/organization/workflow.ts b/foundry/packages/backend/src/actors/organization/workflow.ts index 189225b..b64c997 100644 --- a/foundry/packages/backend/src/actors/organization/workflow.ts +++ b/foundry/packages/backend/src/actors/organization/workflow.ts @@ -1,8 +1,17 @@ // @ts-nocheck /** - * Organization command actions — converted from queue handlers to direct actions. - * Each export becomes an action on the organization actor. + * Organization workflow — queue-based command loop. + * + * Mutations are dispatched through named queues and processed inside workflow + * steps so that every command appears in the RivetKit inspector's workflow + * history. Read actions remain direct (no queue). + * + * Callers send commands directly via `.send()` to the appropriate queue name. */ +import { Loop } from "rivetkit/workflow"; +import { logActorWarning, resolveErrorMessage } from "../logging.js"; +import { ORGANIZATION_QUEUE_NAMES, type OrganizationQueueName } from "./queues.js"; + import { applyGithubSyncProgressMutation, recordGithubWebhookReceiptMutation, refreshOrganizationSnapshotMutation } from "./actions.js"; import { applyTaskSummaryUpdateMutation, @@ -37,127 +46,164 @@ import { upsertOrganizationInvoiceMutation, } from "./app-shell.js"; -export const organizationCommandActions = { - async commandCreateTask(c: any, body: any) { - return await createTaskMutation(c, body); - }, - async commandMaterializeTask(c: any, body: any) { - return await createTaskMutation(c, body); - }, - async commandRegisterTaskBranch(c: any, body: any) { - return await registerTaskBranchMutation(c, body); - }, - async commandApplyTaskSummaryUpdate(c: any, body: any) { +// --------------------------------------------------------------------------- +// Workflow command loop — runs inside `run: workflow(runOrganizationWorkflow)` +// --------------------------------------------------------------------------- + +type WorkflowHandler = (loopCtx: any, body: any) => Promise; + +/** + * Maps queue names to their mutation handlers. + * Each handler receives the workflow loop context and the message body, + * executes the mutation, and returns the result (which is sent back via + * msg.complete). + */ +const COMMAND_HANDLERS: Record = { + // Task mutations + "organization.command.createTask": async (c, body) => createTaskMutation(c, body), + "organization.command.materializeTask": async (c, body) => createTaskMutation(c, body), + "organization.command.registerTaskBranch": async (c, body) => registerTaskBranchMutation(c, body), + "organization.command.applyTaskSummaryUpdate": async (c, body) => { await applyTaskSummaryUpdateMutation(c, body); return { ok: true }; }, - async commandRemoveTaskSummary(c: any, body: any) { + "organization.command.removeTaskSummary": async (c, body) => { await removeTaskSummaryMutation(c, body); return { ok: true }; }, - async commandRefreshTaskSummaryForBranch(c: any, body: any) { + "organization.command.refreshTaskSummaryForBranch": async (c, body) => { await refreshTaskSummaryForBranchMutation(c, body); return { ok: true }; }, - async commandBroadcastSnapshot(c: any, _body: any) { + "organization.command.snapshot.broadcast": async (c, _body) => { await refreshOrganizationSnapshotMutation(c); return { ok: true }; }, - async commandSyncGithubSession(c: any, body: any) { + "organization.command.syncGithubSession": async (c, body) => { const { syncGithubOrganizations } = await import("./app-shell.js"); await syncGithubOrganizations(c, body); return { ok: true }; }, - // Better Auth index actions - async commandBetterAuthSessionIndexUpsert(c: any, body: any) { - return await betterAuthUpsertSessionIndexMutation(c, body); - }, - async commandBetterAuthSessionIndexDelete(c: any, body: any) { + // Better Auth index mutations + "organization.command.better_auth.session_index.upsert": async (c, body) => betterAuthUpsertSessionIndexMutation(c, body), + "organization.command.better_auth.session_index.delete": async (c, body) => { await betterAuthDeleteSessionIndexMutation(c, body); return { ok: true }; }, - async commandBetterAuthEmailIndexUpsert(c: any, body: any) { - return await betterAuthUpsertEmailIndexMutation(c, body); - }, - async commandBetterAuthEmailIndexDelete(c: any, body: any) { + "organization.command.better_auth.email_index.upsert": async (c, body) => betterAuthUpsertEmailIndexMutation(c, body), + "organization.command.better_auth.email_index.delete": async (c, body) => { await betterAuthDeleteEmailIndexMutation(c, body); return { ok: true }; }, - async commandBetterAuthAccountIndexUpsert(c: any, body: any) { - return await betterAuthUpsertAccountIndexMutation(c, body); - }, - async commandBetterAuthAccountIndexDelete(c: any, body: any) { + "organization.command.better_auth.account_index.upsert": async (c, body) => betterAuthUpsertAccountIndexMutation(c, body), + "organization.command.better_auth.account_index.delete": async (c, body) => { await betterAuthDeleteAccountIndexMutation(c, body); return { ok: true }; }, - async commandBetterAuthVerificationCreate(c: any, body: any) { - return await betterAuthCreateVerificationMutation(c, body); - }, - async commandBetterAuthVerificationUpdate(c: any, body: any) { - return await betterAuthUpdateVerificationMutation(c, body); - }, - async commandBetterAuthVerificationUpdateMany(c: any, body: any) { - return await betterAuthUpdateManyVerificationMutation(c, body); - }, - async commandBetterAuthVerificationDelete(c: any, body: any) { + "organization.command.better_auth.verification.create": async (c, body) => betterAuthCreateVerificationMutation(c, body), + "organization.command.better_auth.verification.update": async (c, body) => betterAuthUpdateVerificationMutation(c, body), + "organization.command.better_auth.verification.update_many": async (c, body) => betterAuthUpdateManyVerificationMutation(c, body), + "organization.command.better_auth.verification.delete": async (c, body) => { await betterAuthDeleteVerificationMutation(c, body); return { ok: true }; }, - async commandBetterAuthVerificationDeleteMany(c: any, body: any) { - return await betterAuthDeleteManyVerificationMutation(c, body); - }, + "organization.command.better_auth.verification.delete_many": async (c, body) => betterAuthDeleteManyVerificationMutation(c, body), - // GitHub sync actions - async commandApplyGithubSyncProgress(c: any, body: any) { + // GitHub sync mutations + "organization.command.github.sync_progress.apply": async (c, body) => { await applyGithubSyncProgressMutation(c, body); return { ok: true }; }, - async commandRecordGithubWebhookReceipt(c: any, body: any) { + "organization.command.github.webhook_receipt.record": async (c, body) => { await recordGithubWebhookReceiptMutation(c, body); return { ok: true }; }, - async commandSyncOrganizationShellFromGithub(c: any, body: any) { - return await syncOrganizationShellFromGithubMutation(c, body); - }, + "organization.command.github.organization_shell.sync_from_github": async (c, body) => syncOrganizationShellFromGithubMutation(c, body), - // Shell/profile actions - async commandUpdateShellProfile(c: any, body: any) { + // Shell/profile mutations + "organization.command.shell.profile.update": async (c, body) => { await updateOrganizationShellProfileMutation(c, body); return { ok: true }; }, - async commandMarkSyncStarted(c: any, body: any) { + "organization.command.shell.sync_started.mark": async (c, body) => { await markOrganizationSyncStartedMutation(c, body); return { ok: true }; }, - // Billing actions - async commandApplyStripeCustomer(c: any, body: any) { + // Billing mutations + "organization.command.billing.stripe_customer.apply": async (c, body) => { await applyOrganizationStripeCustomerMutation(c, body); return { ok: true }; }, - async commandApplyStripeSubscription(c: any, body: any) { + "organization.command.billing.stripe_subscription.apply": async (c, body) => { await applyOrganizationStripeSubscriptionMutation(c, body); return { ok: true }; }, - async commandApplyFreePlan(c: any, body: any) { + "organization.command.billing.free_plan.apply": async (c, body) => { await applyOrganizationFreePlanMutation(c, body); return { ok: true }; }, - async commandSetPaymentMethod(c: any, body: any) { + "organization.command.billing.payment_method.set": async (c, body) => { await setOrganizationBillingPaymentMethodMutation(c, body); return { ok: true }; }, - async commandSetBillingStatus(c: any, body: any) { + "organization.command.billing.status.set": async (c, body) => { await setOrganizationBillingStatusMutation(c, body); return { ok: true }; }, - async commandUpsertInvoice(c: any, body: any) { + "organization.command.billing.invoice.upsert": async (c, body) => { await upsertOrganizationInvoiceMutation(c, body); return { ok: true }; }, - async commandRecordSeatUsage(c: any, body: any) { + "organization.command.billing.seat_usage.record": async (c, body) => { await recordOrganizationSeatUsageMutation(c, body); return { ok: true }; }, }; + +export async function runOrganizationWorkflow(ctx: any): Promise { + await ctx.loop("organization-command-loop", async (loopCtx: any) => { + const msg = await loopCtx.queue.next("next-organization-command", { + names: [...ORGANIZATION_QUEUE_NAMES], + completable: true, + }); + + if (!msg) { + return Loop.continue(undefined); + } + + const handler = COMMAND_HANDLERS[msg.name as OrganizationQueueName]; + if (!handler) { + logActorWarning("organization", "unknown organization command", { command: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {}); + return Loop.continue(undefined); + } + + try { + // Wrap in a step so c.state and c.db are accessible inside mutation functions. + const result = await loopCtx.step({ + name: msg.name, + timeout: 10 * 60_000, + run: async () => handler(loopCtx, msg.body), + }); + try { + await msg.complete(result); + } catch (completeError) { + logActorWarning("organization", "organization workflow failed completing response", { + command: msg.name, + error: resolveErrorMessage(completeError), + }); + } + } catch (error) { + const message = resolveErrorMessage(error); + logActorWarning("organization", "organization workflow command failed", { + command: msg.name, + error: message, + }); + await msg.complete({ error: message }).catch(() => {}); + } + + return Loop.continue(undefined); + }); +} diff --git a/foundry/packages/backend/src/actors/sandbox/index.ts b/foundry/packages/backend/src/actors/sandbox/index.ts index a35a149..c1f2780 100644 --- a/foundry/packages/backend/src/actors/sandbox/index.ts +++ b/foundry/packages/backend/src/actors/sandbox/index.ts @@ -1,4 +1,6 @@ -import { actor } from "rivetkit"; +// @ts-nocheck +import { actor, queue } from "rivetkit"; +import { workflow, Loop } from "rivetkit/workflow"; import { e2b, sandboxActor } from "rivetkit/sandbox"; import { existsSync } from "node:fs"; import Dockerode from "dockerode"; @@ -6,7 +8,9 @@ import { DEFAULT_WORKSPACE_MODEL_GROUPS, workspaceModelGroupsFromSandboxAgents, import { SandboxAgent } from "sandbox-agent"; import { getActorRuntimeContext } from "../context.js"; import { organizationKey } from "../keys.js"; +import { selfTaskSandbox } from "../handles.js"; import { logActorWarning, resolveErrorMessage } from "../logging.js"; +import { expectQueueResponse } from "../../services/queue.js"; import { resolveSandboxProviderId } from "../../sandbox-config.js"; const SANDBOX_REPO_CWD = "/home/user/repo"; @@ -293,36 +297,165 @@ async function listWorkspaceModelGroupsForSandbox(c: any): Promise Promise>; +// --------------------------------------------------------------------------- +// Queue names for sandbox actor +// --------------------------------------------------------------------------- + +const SANDBOX_QUEUE_NAMES = [ + "sandbox.command.createSession", + "sandbox.command.resumeOrCreateSession", + "sandbox.command.destroySession", + "sandbox.command.createProcess", + "sandbox.command.stopProcess", + "sandbox.command.killProcess", + "sandbox.command.deleteProcess", +] as const; + +type SandboxQueueName = (typeof SANDBOX_QUEUE_NAMES)[number]; + +function sandboxWorkflowQueueName(name: SandboxQueueName): SandboxQueueName { + return name; +} + +// --------------------------------------------------------------------------- +// Mutation handlers — executed inside the workflow command loop +// --------------------------------------------------------------------------- + +async function createSessionMutation(c: any, request: any): Promise { + const session = await baseActions.createSession(c, request); + const sessionId = typeof request?.id === "string" && request.id.length > 0 ? request.id : session?.id; + const modeId = modeIdForAgent(request?.agent); + if (sessionId && modeId) { + try { + await baseActions.rawSendSessionMethod(c, sessionId, "session/set_mode", { modeId }); + } catch { + // Session mode updates are best-effort. + } + } + return sanitizeActorResult(session); +} + +async function resumeOrCreateSessionMutation(c: any, request: any): Promise { + return sanitizeActorResult(await baseActions.resumeOrCreateSession(c, request)); +} + +async function destroySessionMutation(c: any, sessionId: string): Promise { + return sanitizeActorResult(await baseActions.destroySession(c, sessionId)); +} + +async function createProcessMutation(c: any, request: any): Promise { + const created = await baseActions.createProcess(c, request); + await broadcastProcesses(c, baseActions); + return created; +} + +async function runProcessMutation(c: any, request: any): Promise { + const result = await baseActions.runProcess(c, request); + await broadcastProcesses(c, baseActions); + return result; +} + +async function stopProcessMutation(c: any, processId: string, query?: any): Promise { + const stopped = await baseActions.stopProcess(c, processId, query); + await broadcastProcesses(c, baseActions); + return stopped; +} + +async function killProcessMutation(c: any, processId: string, query?: any): Promise { + const killed = await baseActions.killProcess(c, processId, query); + await broadcastProcesses(c, baseActions); + return killed; +} + +async function deleteProcessMutation(c: any, processId: string): Promise { + await baseActions.deleteProcess(c, processId); + await broadcastProcesses(c, baseActions); +} + +// --------------------------------------------------------------------------- +// Workflow command loop +// --------------------------------------------------------------------------- + +type SandboxWorkflowHandler = (loopCtx: any, body: any) => Promise; + +const SANDBOX_COMMAND_HANDLERS: Record = { + "sandbox.command.createSession": async (c, body) => createSessionMutation(c, body), + "sandbox.command.resumeOrCreateSession": async (c, body) => resumeOrCreateSessionMutation(c, body), + "sandbox.command.destroySession": async (c, body) => destroySessionMutation(c, body?.sessionId), + "sandbox.command.createProcess": async (c, body) => createProcessMutation(c, body), + "sandbox.command.stopProcess": async (c, body) => stopProcessMutation(c, body?.processId, body?.query), + "sandbox.command.killProcess": async (c, body) => killProcessMutation(c, body?.processId, body?.query), + "sandbox.command.deleteProcess": async (c, body) => { + await deleteProcessMutation(c, body?.processId); + return { ok: true }; + }, +}; + +async function runSandboxWorkflow(ctx: any): Promise { + await ctx.loop("sandbox-command-loop", async (loopCtx: any) => { + const msg = await loopCtx.queue.next("next-sandbox-command", { + names: [...SANDBOX_QUEUE_NAMES], + completable: true, + }); + + if (!msg) { + return Loop.continue(undefined); + } + + const handler = SANDBOX_COMMAND_HANDLERS[msg.name as SandboxQueueName]; + if (!handler) { + logActorWarning("taskSandbox", "unknown sandbox command", { command: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {}); + return Loop.continue(undefined); + } + + try { + // Wrap in a step so c.state and c.db are accessible inside mutation functions. + const result = await loopCtx.step({ + name: msg.name, + timeout: 10 * 60_000, + run: async () => handler(loopCtx, msg.body), + }); + try { + await msg.complete(result); + } catch (completeError) { + logActorWarning("taskSandbox", "sandbox workflow failed completing response", { + command: msg.name, + error: resolveErrorMessage(completeError), + }); + } + } catch (error) { + const message = resolveErrorMessage(error); + logActorWarning("taskSandbox", "sandbox workflow command failed", { + command: msg.name, + error: message, + }); + await msg.complete({ error: message }).catch(() => {}); + } + + return Loop.continue(undefined); + }); +} + +// --------------------------------------------------------------------------- +// Actor definition +// --------------------------------------------------------------------------- + export const taskSandbox = actor({ ...baseTaskSandbox.config, + queues: Object.fromEntries(SANDBOX_QUEUE_NAMES.map((name) => [name, queue()])), options: { ...baseTaskSandbox.config.options, actionTimeout: 10 * 60_000, }, actions: { ...baseActions, - async createSession(c: any, request: any): Promise { - const session = await baseActions.createSession(c, request); - const sessionId = typeof request?.id === "string" && request.id.length > 0 ? request.id : session?.id; - const modeId = modeIdForAgent(request?.agent); - if (sessionId && modeId) { - try { - await baseActions.rawSendSessionMethod(c, sessionId, "session/set_mode", { modeId }); - } catch { - // Session mode updates are best-effort. - } - } - return sanitizeActorResult(session); - }, + // Read actions — direct (no queue) async resumeSession(c: any, sessionId: string): Promise { return sanitizeActorResult(await baseActions.resumeSession(c, sessionId)); }, - async resumeOrCreateSession(c: any, request: any): Promise { - return sanitizeActorResult(await baseActions.resumeOrCreateSession(c, request)); - }, - async getSession(c: any, sessionId: string): Promise { return sanitizeActorResult(await baseActions.getSession(c, sessionId)); }, @@ -331,24 +464,6 @@ export const taskSandbox = actor({ return sanitizeActorResult(await baseActions.listSessions(c, query)); }, - async destroySession(c: any, sessionId: string): Promise { - return sanitizeActorResult(await baseActions.destroySession(c, sessionId)); - }, - - async sendPrompt(c: any, request: { sessionId: string; prompt: string }): Promise { - const text = typeof request?.prompt === "string" ? request.prompt.trim() : ""; - if (!text) { - return null; - } - - const session = await baseActions.resumeSession(c, request.sessionId); - if (!session || typeof session.prompt !== "function") { - throw new Error(`session '${request.sessionId}' not found`); - } - - return sanitizeActorResult(await session.prompt([{ type: "text", text }])); - }, - async listProcesses(c: any): Promise { try { return await baseActions.listProcesses(c); @@ -362,35 +477,6 @@ export const taskSandbox = actor({ } }, - async createProcess(c: any, request: any): Promise { - const created = await baseActions.createProcess(c, request); - await broadcastProcesses(c, baseActions); - return created; - }, - - async runProcess(c: any, request: any): Promise { - const result = await baseActions.runProcess(c, request); - await broadcastProcesses(c, baseActions); - return result; - }, - - async stopProcess(c: any, processId: string, query?: any): Promise { - const stopped = await baseActions.stopProcess(c, processId, query); - await broadcastProcesses(c, baseActions); - return stopped; - }, - - async killProcess(c: any, processId: string, query?: any): Promise { - const killed = await baseActions.killProcess(c, processId, query); - await broadcastProcesses(c, baseActions); - return killed; - }, - - async deleteProcess(c: any, processId: string): Promise { - await baseActions.deleteProcess(c, processId); - await broadcastProcesses(c, baseActions); - }, - async sandboxAgentConnection(c: any): Promise<{ endpoint: string; token?: string }> { const provider = await providerForConnection(c); if (!provider || !c.state.sandboxId) { @@ -445,7 +531,73 @@ export const taskSandbox = actor({ async repoCwd(): Promise<{ cwd: string }> { return { cwd: SANDBOX_REPO_CWD }; }, + + // Long-running action — kept as direct action to avoid blocking the + // workflow loop (prompt responses can take minutes). + async sendPrompt(c: any, request: { sessionId: string; prompt: string }): Promise { + const text = typeof request?.prompt === "string" ? request.prompt.trim() : ""; + if (!text) { + return null; + } + + const session = await baseActions.resumeSession(c, request.sessionId); + if (!session || typeof session.prompt !== "function") { + throw new Error(`session '${request.sessionId}' not found`); + } + + return sanitizeActorResult(await session.prompt([{ type: "text", text }])); + }, + + // Mutation actions — self-send to queue for workflow history + async createSession(c: any, request: any): Promise { + const self = selfTaskSandbox(c); + return expectQueueResponse(await self.send(sandboxWorkflowQueueName("sandbox.command.createSession"), request ?? {}, { wait: true, timeout: 10_000 })); + }, + + async resumeOrCreateSession(c: any, request: any): Promise { + const self = selfTaskSandbox(c); + return expectQueueResponse( + await self.send(sandboxWorkflowQueueName("sandbox.command.resumeOrCreateSession"), request ?? {}, { wait: true, timeout: 10_000 }), + ); + }, + + async destroySession(c: any, sessionId: string): Promise { + const self = selfTaskSandbox(c); + return expectQueueResponse(await self.send(sandboxWorkflowQueueName("sandbox.command.destroySession"), { sessionId }, { wait: true, timeout: 10_000 })); + }, + + async createProcess(c: any, request: any): Promise { + const self = selfTaskSandbox(c); + return expectQueueResponse(await self.send(sandboxWorkflowQueueName("sandbox.command.createProcess"), request ?? {}, { wait: true, timeout: 10_000 })); + }, + + // runProcess kept as direct action — response can exceed 128KB queue limit + async runProcess(c: any, request: any): Promise { + const result = await baseActions.runProcess(c, request); + await broadcastProcesses(c, baseActions); + return result; + }, + + async stopProcess(c: any, processId: string, query?: any): Promise { + const self = selfTaskSandbox(c); + return expectQueueResponse( + await self.send(sandboxWorkflowQueueName("sandbox.command.stopProcess"), { processId, query }, { wait: true, timeout: 10_000 }), + ); + }, + + async killProcess(c: any, processId: string, query?: any): Promise { + const self = selfTaskSandbox(c); + return expectQueueResponse( + await self.send(sandboxWorkflowQueueName("sandbox.command.killProcess"), { processId, query }, { wait: true, timeout: 10_000 }), + ); + }, + + async deleteProcess(c: any, processId: string): Promise { + const self = selfTaskSandbox(c); + await self.send(sandboxWorkflowQueueName("sandbox.command.deleteProcess"), { processId }, { wait: false }); + }, }, + run: workflow(runSandboxWorkflow), }); export { SANDBOX_REPO_CWD }; diff --git a/foundry/packages/backend/src/actors/task/index.ts b/foundry/packages/backend/src/actors/task/index.ts index 7e1c5e2..6305e17 100644 --- a/foundry/packages/backend/src/actors/task/index.ts +++ b/foundry/packages/backend/src/actors/task/index.ts @@ -1,9 +1,11 @@ -import { actor } from "rivetkit"; +import { actor, queue } from "rivetkit"; +import { workflow } from "rivetkit/workflow"; import type { TaskRecord } from "@sandbox-agent/foundry-shared"; import { taskDb } from "./db/db.js"; import { getCurrentRecord } from "./workflow/common.js"; import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js"; -import { taskCommandActions } from "./workflow/index.js"; +import { runTaskWorkflow } from "./workflow/index.js"; +import { TASK_QUEUE_NAMES } from "./workflow/queue.js"; export interface TaskInput { organizationId: string; @@ -13,6 +15,7 @@ export interface TaskInput { export const task = actor({ db: taskDb, + queues: Object.fromEntries(TASK_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "Task", icon: "wrench", @@ -39,9 +42,8 @@ export const task = actor({ async getSessionDetail(c, input: { sessionId: string; authSessionId?: string }) { return await getSessionDetail(c, input.sessionId, input.authSessionId); }, - - ...taskCommandActions, }, + run: workflow(runTaskWorkflow), }); export { taskWorkflowQueueName } from "./workflow/index.js"; diff --git a/foundry/packages/backend/src/actors/task/workflow/index.ts b/foundry/packages/backend/src/actors/task/workflow/index.ts index de25813..29391f2 100644 --- a/foundry/packages/backend/src/actors/task/workflow/index.ts +++ b/foundry/packages/backend/src/actors/task/workflow/index.ts @@ -1,4 +1,16 @@ +// @ts-nocheck +/** + * Task workflow — queue-based command loop. + * + * Mutations are dispatched through named queues and processed inside the + * workflow command loop so that every command appears in the RivetKit + * inspector's workflow history. Read actions remain direct (no queue). + * + * Callers send commands directly via `.send(taskWorkflowQueueName(...), ...)`. + */ +import { Loop } from "rivetkit/workflow"; import { logActorWarning, resolveErrorMessage } from "../../logging.js"; +import { TASK_QUEUE_NAMES, type TaskQueueName, taskWorkflowQueueName } from "./queue.js"; import { getCurrentRecord } from "./common.js"; import { initBootstrapDbActivity, initCompleteActivity, initEnqueueProvisionActivity, initFailedActivity } from "./init.js"; import { @@ -35,241 +47,210 @@ import { export { taskWorkflowQueueName } from "./queue.js"; -/** - * Task command actions — converted from queue/workflow handlers to direct actions. - * Each export becomes an action on the task actor. - */ -export const taskCommandActions = { - async initialize(c: any, body: any) { - await initBootstrapDbActivity(c, body); - await initEnqueueProvisionActivity(c, body); - return await getCurrentRecord(c); +// --------------------------------------------------------------------------- +// Workflow command loop — runs inside `run: workflow(runTaskWorkflow)` +// --------------------------------------------------------------------------- + +type WorkflowHandler = (loopCtx: any, msg: any) => Promise; + +const COMMAND_HANDLERS: Record = { + "task.command.initialize": async (loopCtx, msg) => { + await initBootstrapDbActivity(loopCtx, msg.body); + await initEnqueueProvisionActivity(loopCtx, msg.body); + const record = await getCurrentRecord(loopCtx); + await msg.complete(record); }, - async provision(c: any, body: any) { + "task.command.provision": async (loopCtx, msg) => { try { - await initCompleteActivity(c, body); - return { ok: true }; + await initCompleteActivity(loopCtx, msg.body); + await msg.complete({ ok: true }); } catch (error) { - await initFailedActivity(c, error, body); - return { ok: false, error: resolveErrorMessage(error) }; + await initFailedActivity(loopCtx, error, msg.body); + await msg.complete({ ok: false, error: resolveErrorMessage(error) }); } }, - async attach(c: any, body: any) { - // handleAttachActivity expects msg with complete — adapt - const result = { value: undefined as any }; - const msg = { - name: "task.command.attach", - body, - complete: async (v: any) => { - result.value = v; - }, - }; - await handleAttachActivity(c, msg); - return result.value; + "task.command.attach": async (loopCtx, msg) => { + await handleAttachActivity(loopCtx, msg); }, - async switchTask(c: any, body: any) { - const result = { value: undefined as any }; - const msg = { - name: "task.command.switch", - body, - complete: async (v: any) => { - result.value = v; - }, - }; - await handleSwitchActivity(c, msg); - return result.value; + "task.command.switch": async (loopCtx, msg) => { + await handleSwitchActivity(loopCtx, msg); }, - async push(c: any, body: any) { - const result = { value: undefined as any }; - const msg = { - name: "task.command.push", - body, - complete: async (v: any) => { - result.value = v; - }, - }; - await handlePushActivity(c, msg); - return result.value; + "task.command.push": async (loopCtx, msg) => { + await handlePushActivity(loopCtx, msg); }, - async sync(c: any, body: any) { - const result = { value: undefined as any }; - const msg = { - name: "task.command.sync", - body, - complete: async (v: any) => { - result.value = v; - }, - }; - await handleSimpleCommandActivity(c, msg, "task.sync"); - return result.value; + "task.command.sync": async (loopCtx, msg) => { + await handleSimpleCommandActivity(loopCtx, msg, "task.sync"); }, - async merge(c: any, body: any) { - const result = { value: undefined as any }; - const msg = { - name: "task.command.merge", - body, - complete: async (v: any) => { - result.value = v; - }, - }; - await handleSimpleCommandActivity(c, msg, "task.merge"); - return result.value; + "task.command.merge": async (loopCtx, msg) => { + await handleSimpleCommandActivity(loopCtx, msg, "task.merge"); }, - async archive(c: any, body: any) { - const result = { value: undefined as any }; - const msg = { - name: "task.command.archive", - body, - complete: async (v: any) => { - result.value = v; - }, - }; - await handleArchiveActivity(c, msg); - return result.value; + "task.command.archive": async (loopCtx, msg) => { + await handleArchiveActivity(loopCtx, msg); }, - async kill(c: any, body: any) { - const result = { value: undefined as any }; - const msg = { - name: "task.command.kill", - body, - complete: async (v: any) => { - result.value = v; - }, - }; - await killDestroySandboxActivity(c); - await killWriteDbActivity(c, msg); - return result.value; + "task.command.kill": async (loopCtx, msg) => { + await killDestroySandboxActivity(loopCtx); + await killWriteDbActivity(loopCtx, msg); }, - async getRecord(c: any, body: any) { - const result = { value: undefined as any }; - const msg = { - name: "task.command.get", - body, - complete: async (v: any) => { - result.value = v; - }, - }; - await handleGetActivity(c, msg); - return result.value; + "task.command.get": async (loopCtx, msg) => { + await handleGetActivity(loopCtx, msg); }, - async pullRequestSync(c: any, body: any) { - await syncTaskPullRequest(c, body?.pullRequest ?? null); - return { ok: true }; + "task.command.pull_request.sync": async (loopCtx, msg) => { + await syncTaskPullRequest(loopCtx, msg.body?.pullRequest ?? null); + await msg.complete({ ok: true }); }, - async markUnread(c: any, body: any) { - await markWorkspaceUnread(c, body?.authSessionId); - return { ok: true }; + "task.command.workspace.mark_unread": async (loopCtx, msg) => { + await markWorkspaceUnread(loopCtx, msg.body?.authSessionId); + await msg.complete({ ok: true }); }, - async renameTask(c: any, body: any) { - await renameWorkspaceTask(c, body.value); - return { ok: true }; + "task.command.workspace.rename_task": async (loopCtx, msg) => { + await renameWorkspaceTask(loopCtx, msg.body.value); + await msg.complete({ ok: true }); }, - async changeOwner(c: any, body: any) { - await changeTaskOwnerManually(c, { - primaryUserId: body.primaryUserId, - primaryGithubLogin: body.primaryGithubLogin, - primaryGithubEmail: body.primaryGithubEmail, - primaryGithubAvatarUrl: body.primaryGithubAvatarUrl ?? null, - }); - return { ok: true }; + "task.command.workspace.create_session": async (loopCtx, msg) => { + const result = await createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId); + await msg.complete(result); }, - async createSession(c: any, body: any) { - return await createWorkspaceSession(c, body?.model, body?.authSessionId); - }, - - async createSessionAndSend(c: any, body: any) { + "task.command.workspace.create_session_and_send": async (loopCtx, msg) => { try { - const created = await createWorkspaceSession(c, body?.model, body?.authSessionId); - await sendWorkspaceMessage(c, created.sessionId, body.text, [], body?.authSessionId); + const created = await createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId); + await sendWorkspaceMessage(loopCtx, created.sessionId, msg.body.text, [], msg.body?.authSessionId); } catch (error) { logActorWarning("task.workflow", "create_session_and_send failed", { error: resolveErrorMessage(error), }); } - return { ok: true }; + await msg.complete({ ok: true }); }, - async ensureSession(c: any, body: any) { - await ensureWorkspaceSession(c, body.sessionId, body?.model, body?.authSessionId); - return { ok: true }; + "task.command.workspace.ensure_session": async (loopCtx, msg) => { + await ensureWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId); + await msg.complete({ ok: true }); }, - async renameSession(c: any, body: any) { - await renameWorkspaceSession(c, body.sessionId, body.title); - return { ok: true }; + "task.command.workspace.rename_session": async (loopCtx, msg) => { + await renameWorkspaceSession(loopCtx, msg.body.sessionId, msg.body.title); + await msg.complete({ ok: true }); }, - async selectSession(c: any, body: any) { - await selectWorkspaceSession(c, body.sessionId, body?.authSessionId); - return { ok: true }; + "task.command.workspace.select_session": async (loopCtx, msg) => { + await selectWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId); + await msg.complete({ ok: true }); }, - async setSessionUnread(c: any, body: any) { - await setWorkspaceSessionUnread(c, body.sessionId, body.unread, body?.authSessionId); - return { ok: true }; + "task.command.workspace.set_session_unread": async (loopCtx, msg) => { + await setWorkspaceSessionUnread(loopCtx, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId); + await msg.complete({ ok: true }); }, - async updateDraft(c: any, body: any) { - await updateWorkspaceDraft(c, body.sessionId, body.text, body.attachments, body?.authSessionId); - return { ok: true }; + "task.command.workspace.update_draft": async (loopCtx, msg) => { + await updateWorkspaceDraft(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId); + await msg.complete({ ok: true }); }, - async changeModel(c: any, body: any) { - await changeWorkspaceModel(c, body.sessionId, body.model, body?.authSessionId); - return { ok: true }; + "task.command.workspace.change_model": async (loopCtx, msg) => { + await changeWorkspaceModel(loopCtx, msg.body.sessionId, msg.body.model, msg.body?.authSessionId); + await msg.complete({ ok: true }); }, - async sendMessage(c: any, body: any) { - await sendWorkspaceMessage(c, body.sessionId, body.text, body.attachments, body?.authSessionId); - return { ok: true }; + "task.command.workspace.send_message": async (loopCtx, msg) => { + await sendWorkspaceMessage(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId); + await msg.complete({ ok: true }); }, - async stopSession(c: any, body: any) { - await stopWorkspaceSession(c, body.sessionId); - return { ok: true }; + "task.command.workspace.stop_session": async (loopCtx, msg) => { + await stopWorkspaceSession(loopCtx, msg.body.sessionId); + await msg.complete({ ok: true }); }, - async syncSessionStatus(c: any, body: any) { - await syncWorkspaceSessionStatus(c, body.sessionId, body.status, body.at); - return { ok: true }; + "task.command.workspace.sync_session_status": async (loopCtx, msg) => { + await syncWorkspaceSessionStatus(loopCtx, msg.body.sessionId, msg.body.status, msg.body.at); + await msg.complete({ ok: true }); }, - async refreshDerived(c: any, _body: any) { - await refreshWorkspaceDerivedState(c); - return { ok: true }; + "task.command.workspace.refresh_derived": async (loopCtx, msg) => { + await refreshWorkspaceDerivedState(loopCtx); + await msg.complete({ ok: true }); }, - async refreshSessionTranscript(c: any, body: any) { - await refreshWorkspaceSessionTranscript(c, body.sessionId); - return { ok: true }; + "task.command.workspace.refresh_session_transcript": async (loopCtx, msg) => { + await refreshWorkspaceSessionTranscript(loopCtx, msg.body.sessionId); + await msg.complete({ ok: true }); }, - async closeSession(c: any, body: any) { - await closeWorkspaceSession(c, body.sessionId, body?.authSessionId); - return { ok: true }; + "task.command.workspace.close_session": async (loopCtx, msg) => { + await closeWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId); + await msg.complete({ ok: true }); }, - async publishPr(c: any, _body: any) { - await publishWorkspacePr(c); - return { ok: true }; + "task.command.workspace.publish_pr": async (loopCtx, msg) => { + await publishWorkspacePr(loopCtx); + await msg.complete({ ok: true }); }, - async revertFile(c: any, body: any) { - await revertWorkspaceFile(c, body.path); - return { ok: true }; + "task.command.workspace.revert_file": async (loopCtx, msg) => { + await revertWorkspaceFile(loopCtx, msg.body.path); + await msg.complete({ ok: true }); + }, + + "task.command.workspace.change_owner": async (loopCtx, msg) => { + await changeTaskOwnerManually(loopCtx, { + primaryUserId: msg.body.primaryUserId, + primaryGithubLogin: msg.body.primaryGithubLogin, + primaryGithubEmail: msg.body.primaryGithubEmail, + primaryGithubAvatarUrl: msg.body.primaryGithubAvatarUrl ?? null, + }); + await msg.complete({ ok: true }); }, }; + +export async function runTaskWorkflow(ctx: any): Promise { + await ctx.loop("task-command-loop", async (loopCtx: any) => { + const msg = await loopCtx.queue.next("next-task-command", { + names: [...TASK_QUEUE_NAMES], + completable: true, + }); + + if (!msg) { + return Loop.continue(undefined); + } + + const handler = COMMAND_HANDLERS[msg.name as TaskQueueName]; + if (!handler) { + logActorWarning("task.workflow", "unknown task command", { command: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {}); + return Loop.continue(undefined); + } + + try { + // Wrap in a step so c.state and c.db are accessible inside mutation functions. + await loopCtx.step({ + name: msg.name, + timeout: 10 * 60_000, + run: async () => handler(loopCtx, msg), + }); + } catch (error) { + const message = resolveErrorMessage(error); + logActorWarning("task.workflow", "task workflow command failed", { + command: msg.name, + error: message, + }); + await msg.complete({ error: message }).catch(() => {}); + } + + return Loop.continue(undefined); + }); +} diff --git a/foundry/packages/backend/src/actors/task/workflow/init.ts b/foundry/packages/backend/src/actors/task/workflow/init.ts index 08085e8..ffdf1d4 100644 --- a/foundry/packages/backend/src/actors/task/workflow/init.ts +++ b/foundry/packages/backend/src/actors/task/workflow/init.ts @@ -3,6 +3,7 @@ import { eq } from "drizzle-orm"; import { getActorRuntimeContext } from "../../context.js"; import { selfTask } from "../../handles.js"; import { resolveErrorMessage } from "../../logging.js"; +import { taskWorkflowQueueName } from "./queue.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; import { task as taskTable, taskRuntime } from "../db/schema.js"; import { TASK_ROW_ID, appendAuditLog, collectErrorMessages, resolveErrorDetail, setTaskState } from "./common.js"; @@ -72,7 +73,7 @@ export async function initEnqueueProvisionActivity(loopCtx: any, body: any): Pro const self = selfTask(loopCtx); try { - void self.provision(body).catch(() => {}); + void self.send(taskWorkflowQueueName("task.command.provision"), body ?? {}, { wait: false }).catch(() => {}); } catch (error) { logActorWarning("task.init", "background provision command failed", { organizationId: loopCtx.state.organizationId, diff --git a/foundry/packages/backend/src/actors/task/workflow/queue.ts b/foundry/packages/backend/src/actors/task/workflow/queue.ts index 133a657..9594f24 100644 --- a/foundry/packages/backend/src/actors/task/workflow/queue.ts +++ b/foundry/packages/backend/src/actors/task/workflow/queue.ts @@ -28,8 +28,11 @@ export const TASK_QUEUE_NAMES = [ "task.command.workspace.close_session", "task.command.workspace.publish_pr", "task.command.workspace.revert_file", + "task.command.workspace.change_owner", ] as const; +export type TaskQueueName = (typeof TASK_QUEUE_NAMES)[number]; + export function taskWorkflowQueueName(name: string): string { return name; } diff --git a/foundry/packages/backend/src/actors/task/workspace.ts b/foundry/packages/backend/src/actors/task/workspace.ts index ac2430f..8585416 100644 --- a/foundry/packages/backend/src/actors/task/workspace.ts +++ b/foundry/packages/backend/src/actors/task/workspace.ts @@ -14,10 +14,12 @@ import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { SANDBOX_REPO_CWD } from "../sandbox/index.js"; import { resolveSandboxProviderId } from "../../sandbox-config.js"; import { getBetterAuthService } from "../../services/better-auth.js"; -// expectQueueResponse removed — actions return values directly import { resolveOrganizationGithubAuth } from "../../services/github-auth.js"; import { githubRepoFullNameFromRemote } from "../../services/repo.js"; -// organization actions called directly (no queue) +import { taskWorkflowQueueName } from "./workflow/queue.js"; +import { expectQueueResponse } from "../../services/queue.js"; +import { userWorkflowQueueName } from "../user/workflow.js"; +import { organizationWorkflowQueueName } from "../organization/queues.js"; import { task as taskTable, taskOwner, taskRuntime, taskSandboxes, taskWorkspaceSessions } from "./db/schema.js"; import { getCurrentRecord } from "./workflow/common.js"; @@ -123,9 +125,7 @@ function parseGitState(value: string | null | undefined): { fileChanges: Array { @@ -446,10 +452,14 @@ async function deleteUserTaskState(c: any, authSessionId: string | null | undefi } const user = await getOrCreateUser(c, userId); - await user.taskStateDelete({ - taskId: c.state.taskId, - sessionId, - }); + await user.send( + userWorkflowQueueName("user.command.task_state.delete"), + { + taskId: c.state.taskId, + sessionId, + }, + { wait: true, timeout: 10_000 }, + ); } async function resolveDefaultModel(c: any, authSessionId?: string | null): Promise { @@ -932,17 +942,13 @@ async function enqueueWorkspaceRefresh( command: "task.command.workspace.refresh_derived" | "task.command.workspace.refresh_session_transcript", body: Record, ): Promise { - // Call directly since we're inside the task actor (no queue needed) - if (command === "task.command.workspace.refresh_derived") { - void refreshWorkspaceDerivedState(c).catch(() => {}); - } else { - void refreshWorkspaceSessionTranscript(c, body.sessionId as string).catch(() => {}); - } + const self = selfTask(c); + await self.send(taskWorkflowQueueName(command as any), body, { wait: false }); } async function enqueueWorkspaceEnsureSession(c: any, sessionId: string): Promise { - // Call directly since we're inside the task actor - void ensureWorkspaceSession(c, sessionId).catch(() => {}); + const self = selfTask(c); + await self.send(taskWorkflowQueueName("task.command.workspace.ensure_session" as any), { sessionId }, { wait: false }); } function pendingWorkspaceSessionStatus(record: any): "pending_provision" | "pending_session_create" { @@ -1166,7 +1172,11 @@ export async function getSessionDetail(c: any, sessionId: string, authSessionId? */ export async function broadcastTaskUpdate(c: any, options?: { sessionId?: string }): Promise { const organization = await getOrCreateOrganization(c, c.state.organizationId); - await organization.commandApplyTaskSummaryUpdate({ taskSummary: await buildTaskSummary(c) }); + await organization.send( + organizationWorkflowQueueName("organization.command.applyTaskSummaryUpdate"), + { taskSummary: await buildTaskSummary(c) }, + { wait: false }, + ); c.broadcast("taskUpdated", { type: "taskUpdated", detail: await buildTaskDetail(c), @@ -1307,8 +1317,9 @@ export async function enqueuePendingWorkspaceSessions(c: any): Promise { (row) => row.closed !== true && row.status !== "ready" && row.status !== "error", ); + const self = selfTask(c); for (const row of pending) { - void ensureWorkspaceSession(c, row.sessionId, row.model).catch(() => {}); + await self.send(taskWorkflowQueueName("task.command.workspace.ensure_session" as any), { sessionId: row.sessionId, model: row.model }, { wait: false }); } } diff --git a/foundry/packages/backend/src/actors/user/index.ts b/foundry/packages/backend/src/actors/user/index.ts index 8a15b58..28d48e6 100644 --- a/foundry/packages/backend/src/actors/user/index.ts +++ b/foundry/packages/backend/src/actors/user/index.ts @@ -1,21 +1,13 @@ -import { actor } from "rivetkit"; +import { actor, queue } from "rivetkit"; +import { workflow } from "rivetkit/workflow"; import { userDb } from "./db/db.js"; import { betterAuthActions } from "./actions/better-auth.js"; import { userActions } from "./actions/user.js"; -import { - createAuthRecordMutation, - updateAuthRecordMutation, - updateManyAuthRecordsMutation, - deleteAuthRecordMutation, - deleteManyAuthRecordsMutation, - upsertUserProfileMutation, - upsertSessionStateMutation, - upsertTaskStateMutation, - deleteTaskStateMutation, -} from "./workflow.js"; +import { USER_QUEUE_NAMES, runUserWorkflow } from "./workflow.js"; export const user = actor({ db: userDb, + queues: Object.fromEntries(USER_QUEUE_NAMES.map((name) => [name, queue()])), options: { name: "User", icon: "shield", @@ -27,34 +19,6 @@ export const user = actor({ actions: { ...betterAuthActions, ...userActions, - async authCreate(c, body) { - return await createAuthRecordMutation(c, body); - }, - async authUpdate(c, body) { - return await updateAuthRecordMutation(c, body); - }, - async authUpdateMany(c, body) { - return await updateManyAuthRecordsMutation(c, body); - }, - async authDelete(c, body) { - await deleteAuthRecordMutation(c, body); - return { ok: true }; - }, - async authDeleteMany(c, body) { - return await deleteManyAuthRecordsMutation(c, body); - }, - async profileUpsert(c, body) { - return await upsertUserProfileMutation(c, body); - }, - async sessionStateUpsert(c, body) { - return await upsertSessionStateMutation(c, body); - }, - async taskStateUpsert(c, body) { - return await upsertTaskStateMutation(c, body); - }, - async taskStateDelete(c, body) { - await deleteTaskStateMutation(c, body); - return { ok: true }; - }, }, + run: workflow(runUserWorkflow), }); diff --git a/foundry/packages/backend/src/actors/user/workflow.ts b/foundry/packages/backend/src/actors/user/workflow.ts index 9bf2675..3bd3118 100644 --- a/foundry/packages/backend/src/actors/user/workflow.ts +++ b/foundry/packages/backend/src/actors/user/workflow.ts @@ -1,8 +1,45 @@ +// @ts-nocheck +/** + * User workflow — queue-based command loop. + * + * Auth mutation commands are dispatched through named queues and processed + * inside the workflow command loop for observability and replay semantics. + */ import { eq, count as sqlCount, and } from "drizzle-orm"; +import { Loop } from "rivetkit/workflow"; import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared"; +import { logActorWarning, resolveErrorMessage } from "../logging.js"; +import { selfUser } from "../handles.js"; +import { expectQueueResponse } from "../../services/queue.js"; import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js"; import { buildWhere, columnFor, materializeRow, persistInput, persistPatch, tableFor } from "./query-helpers.js"; +// --------------------------------------------------------------------------- +// Queue names +// --------------------------------------------------------------------------- + +export const USER_QUEUE_NAMES = [ + "user.command.auth.create", + "user.command.auth.update", + "user.command.auth.update_many", + "user.command.auth.delete", + "user.command.auth.delete_many", + "user.command.profile.upsert", + "user.command.session_state.upsert", + "user.command.task_state.upsert", + "user.command.task_state.delete", +] as const; + +export type UserQueueName = (typeof USER_QUEUE_NAMES)[number]; + +export function userWorkflowQueueName(name: UserQueueName): UserQueueName { + return name; +} + +// --------------------------------------------------------------------------- +// Mutation functions +// --------------------------------------------------------------------------- + export async function createAuthRecordMutation(c: any, input: { model: string; data: Record }) { const table = tableFor(input.model); const persisted = persistInput(input.model, input.data); @@ -195,3 +232,66 @@ export async function deleteTaskStateMutation(c: any, input: { taskId: string; s } await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run(); } + +// --------------------------------------------------------------------------- +// Workflow command loop +// --------------------------------------------------------------------------- + +type WorkflowHandler = (loopCtx: any, body: any) => Promise; + +const COMMAND_HANDLERS: Record = { + "user.command.auth.create": async (c, body) => createAuthRecordMutation(c, body), + "user.command.auth.update": async (c, body) => updateAuthRecordMutation(c, body), + "user.command.auth.update_many": async (c, body) => updateManyAuthRecordsMutation(c, body), + "user.command.auth.delete": async (c, body) => { + await deleteAuthRecordMutation(c, body); + return { ok: true }; + }, + "user.command.auth.delete_many": async (c, body) => deleteManyAuthRecordsMutation(c, body), + "user.command.profile.upsert": async (c, body) => upsertUserProfileMutation(c, body), + "user.command.session_state.upsert": async (c, body) => upsertSessionStateMutation(c, body), + "user.command.task_state.upsert": async (c, body) => upsertTaskStateMutation(c, body), + "user.command.task_state.delete": async (c, body) => { + await deleteTaskStateMutation(c, body); + return { ok: true }; + }, +}; + +export async function runUserWorkflow(ctx: any): Promise { + await ctx.loop("user-command-loop", async (loopCtx: any) => { + const msg = await loopCtx.queue.next("next-user-command", { + names: [...USER_QUEUE_NAMES], + completable: true, + }); + + if (!msg) { + return Loop.continue(undefined); + } + + const handler = COMMAND_HANDLERS[msg.name as UserQueueName]; + if (!handler) { + logActorWarning("user", "unknown user command", { command: msg.name }); + await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {}); + return Loop.continue(undefined); + } + + try { + // Wrap in a step so c.state and c.db are accessible inside mutation functions. + const result = await loopCtx.step({ + name: msg.name, + timeout: 60_000, + run: async () => handler(loopCtx, msg.body), + }); + await msg.complete(result); + } catch (error) { + const message = resolveErrorMessage(error); + logActorWarning("user", "user workflow command failed", { + command: msg.name, + error: message, + }); + await msg.complete({ error: message }).catch(() => {}); + } + + return Loop.continue(undefined); + }); +} diff --git a/foundry/packages/backend/src/services/better-auth.ts b/foundry/packages/backend/src/services/better-auth.ts index c36b900..1f1ae4c 100644 --- a/foundry/packages/backend/src/services/better-auth.ts +++ b/foundry/packages/backend/src/services/better-auth.ts @@ -1,11 +1,11 @@ import { betterAuth } from "better-auth"; import { createAdapterFactory } from "better-auth/adapters"; import { APP_SHELL_ORGANIZATION_ID } from "../actors/organization/constants.js"; -// organization actions are called directly (no queue) -// user actor actions are called directly (no queue) import { organizationKey, userKey } from "../actors/keys.js"; import { logger } from "../logging.js"; -// expectQueueResponse removed — actions return values directly +import { expectQueueResponse } from "./queue.js"; +import { userWorkflowQueueName } from "../actors/user/workflow.js"; +import { organizationWorkflowQueueName } from "../actors/organization/queues.js"; const AUTH_BASE_PATH = "/v1/auth"; const SESSION_COOKIE = "better-auth.session_token"; @@ -62,8 +62,6 @@ function resolveRouteUserId(organization: any, resolved: any): string | null { return null; } -// sendOrganizationCommand removed — org actions are called directly - export interface BetterAuthService { auth: any; resolveSession(headers: Headers): Promise<{ session: any; user: any } | null>; @@ -162,9 +160,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin return null; }; - const ensureOrganizationVerification = async (actionName: string, payload: Record) => { + const ensureOrganizationVerification = async (queueName: string, payload: Record) => { const organization = await appOrganization(); - return await (organization as any)[actionName](payload); + return expectQueueResponse(await organization.send(organizationWorkflowQueueName(queueName as any), payload, { wait: true, timeout: 10_000 })); }; return { @@ -175,7 +173,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin create: async ({ model, data }) => { const transformed = await transformInput(data, model, "create", true); if (model === "verification") { - return await ensureOrganizationVerification("commandBetterAuthVerificationCreate", { data: transformed }); + return await ensureOrganizationVerification("organization.command.better_auth.verification.create", { data: transformed }); } const userId = await resolveUserIdForQuery(model, undefined, transformed); @@ -184,31 +182,51 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } const userActor = await getUser(userId); - const created = await userActor.authCreate({ model, data: transformed }); + const created = expectQueueResponse( + await userActor.send(userWorkflowQueueName("user.command.auth.create"), { model, data: transformed }, { wait: true, timeout: 10_000 }), + ); const organization = await appOrganization(); if (model === "user" && typeof transformed.email === "string" && transformed.email.length > 0) { - await organization.commandBetterAuthEmailIndexUpsert({ - email: transformed.email.toLowerCase(), - userId, - }); + expectQueueResponse( + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.email_index.upsert"), + { + email: transformed.email.toLowerCase(), + userId, + }, + { wait: true, timeout: 10_000 }, + ), + ); } if (model === "session") { - await organization.commandBetterAuthSessionIndexUpsert({ - sessionId: String(created.id), - sessionToken: String(created.token), - userId, - }); + expectQueueResponse( + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.session_index.upsert"), + { + sessionId: String(created.id), + sessionToken: String(created.token), + userId, + }, + { wait: true, timeout: 10_000 }, + ), + ); } if (model === "account") { - await organization.commandBetterAuthAccountIndexUpsert({ - id: String(created.id), - providerId: String(created.providerId), - accountId: String(created.accountId), - userId, - }); + expectQueueResponse( + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.account_index.upsert"), + { + id: String(created.id), + providerId: String(created.providerId), + accountId: String(created.accountId), + userId, + }, + { wait: true, timeout: 10_000 }, + ), + ); } return (await transformOutput(created, model)) as any; @@ -291,7 +309,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const transformedWhere = transformWhereClause({ model, where, action: "update" }); const transformedUpdate = (await transformInput(update as Record, model, "update", true)) as Record; if (model === "verification") { - return await ensureOrganizationVerification("commandBetterAuthVerificationUpdate", { + return await ensureOrganizationVerification("organization.command.better_auth.verification.update", { where: transformedWhere, update: transformedUpdate, }); @@ -311,38 +329,66 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin : model === "session" ? await userActor.betterAuthFindOneRecord({ model, where: transformedWhere }) : null; - const updated = await userActor.authUpdate({ model, where: transformedWhere, update: transformedUpdate }); + const updated = expectQueueResponse( + await userActor.send( + userWorkflowQueueName("user.command.auth.update"), + { model, where: transformedWhere, update: transformedUpdate }, + { wait: true, timeout: 10_000 }, + ), + ); const organization = await appOrganization(); if (model === "user" && updated) { if (before?.email && before.email !== updated.email) { - await organization.commandBetterAuthEmailIndexDelete({ - email: before.email.toLowerCase(), - }); + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.email_index.delete"), + { + email: before.email.toLowerCase(), + }, + { wait: true, timeout: 10_000 }, + ); } if (updated.email) { - await organization.commandBetterAuthEmailIndexUpsert({ - email: updated.email.toLowerCase(), - userId, - }); + expectQueueResponse( + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.email_index.upsert"), + { + email: updated.email.toLowerCase(), + userId, + }, + { wait: true, timeout: 10_000 }, + ), + ); } } if (model === "session" && updated) { - await organization.commandBetterAuthSessionIndexUpsert({ - sessionId: String(updated.id), - sessionToken: String(updated.token), - userId, - }); + expectQueueResponse( + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.session_index.upsert"), + { + sessionId: String(updated.id), + sessionToken: String(updated.token), + userId, + }, + { wait: true, timeout: 10_000 }, + ), + ); } if (model === "account" && updated) { - await organization.commandBetterAuthAccountIndexUpsert({ - id: String(updated.id), - providerId: String(updated.providerId), - accountId: String(updated.accountId), - userId, - }); + expectQueueResponse( + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.account_index.upsert"), + { + id: String(updated.id), + providerId: String(updated.providerId), + accountId: String(updated.accountId), + userId, + }, + { wait: true, timeout: 10_000 }, + ), + ); } return updated ? ((await transformOutput(updated, model)) as any) : null; @@ -352,7 +398,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const transformedWhere = transformWhereClause({ model, where, action: "updateMany" }); const transformedUpdate = (await transformInput(update as Record, model, "update", true)) as Record; if (model === "verification") { - return await ensureOrganizationVerification("commandBetterAuthVerificationUpdateMany", { + return await ensureOrganizationVerification("organization.command.better_auth.verification.update_many", { where: transformedWhere, update: transformedUpdate, }); @@ -364,14 +410,24 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } const userActor = await getUser(userId); - return await userActor.authUpdateMany({ model, where: transformedWhere, update: transformedUpdate }); + return expectQueueResponse( + await userActor.send( + userWorkflowQueueName("user.command.auth.update_many"), + { model, where: transformedWhere, update: transformedUpdate }, + { wait: true, timeout: 10_000 }, + ), + ); }, delete: async ({ model, where }) => { const transformedWhere = transformWhereClause({ model, where, action: "delete" }); if (model === "verification") { const organization = await appOrganization(); - await organization.commandBetterAuthVerificationDelete({ where: transformedWhere }); + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.verification.delete"), + { where: transformedWhere }, + { wait: true, timeout: 10_000 }, + ); return; } @@ -383,34 +439,46 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const userActor = await getUser(userId); const organization = await appOrganization(); const before = await userActor.betterAuthFindOneRecord({ model, where: transformedWhere }); - await userActor.authDelete({ model, where: transformedWhere }); + await userActor.send(userWorkflowQueueName("user.command.auth.delete"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }); if (model === "session" && before) { - await organization.commandBetterAuthSessionIndexDelete({ - sessionId: before.id, - sessionToken: before.token, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.session_index.delete"), + { + sessionId: before.id, + sessionToken: before.token, + }, + { wait: true, timeout: 10_000 }, + ); } if (model === "account" && before) { - await organization.commandBetterAuthAccountIndexDelete({ - id: before.id, - providerId: before.providerId, - accountId: before.accountId, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.account_index.delete"), + { + id: before.id, + providerId: before.providerId, + accountId: before.accountId, + }, + { wait: true, timeout: 10_000 }, + ); } if (model === "user" && before?.email) { - await organization.commandBetterAuthEmailIndexDelete({ - email: before.email.toLowerCase(), - }); + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.email_index.delete"), + { + email: before.email.toLowerCase(), + }, + { wait: true, timeout: 10_000 }, + ); } }, deleteMany: async ({ model, where }) => { const transformedWhere = transformWhereClause({ model, where, action: "deleteMany" }); if (model === "verification") { - return await ensureOrganizationVerification("commandBetterAuthVerificationDeleteMany", { where: transformedWhere }); + return await ensureOrganizationVerification("organization.command.better_auth.verification.delete_many", { where: transformedWhere }); } if (model === "session") { @@ -421,12 +489,18 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin const userActor = await getUser(userId); const organization = await appOrganization(); const sessions = await userActor.betterAuthFindManyRecords({ model, where: transformedWhere, limit: 5000 }); - const deleted = await userActor.authDeleteMany({ model, where: transformedWhere }); + const deleted = expectQueueResponse( + await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }), + ); for (const session of sessions) { - await organization.commandBetterAuthSessionIndexDelete({ - sessionId: session.id, - sessionToken: session.token, - }); + await organization.send( + organizationWorkflowQueueName("organization.command.better_auth.session_index.delete"), + { + sessionId: session.id, + sessionToken: session.token, + }, + { wait: true, timeout: 10_000 }, + ); } return deleted; } @@ -437,7 +511,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin } const userActor = await getUser(userId); - const deleted = await userActor.authDeleteMany({ model, where: transformedWhere }); + const deleted = expectQueueResponse( + await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }), + ); return deleted; }, @@ -509,7 +585,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin async upsertUserProfile(userId: string, patch: Record) { const userActor = await getUser(userId); - return await userActor.profileUpsert({ userId, patch }); + return expectQueueResponse( + await userActor.send(userWorkflowQueueName("user.command.profile.upsert"), { userId, patch }, { wait: true, timeout: 10_000 }), + ); }, async setActiveOrganization(sessionId: string, activeOrganizationId: string | null) { @@ -518,7 +596,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin throw new Error(`Unknown auth session ${sessionId}`); } const userActor = await getUser(authState.user.id); - return await userActor.sessionStateUpsert({ sessionId, activeOrganizationId }); + return expectQueueResponse( + await userActor.send(userWorkflowQueueName("user.command.session_state.upsert"), { sessionId, activeOrganizationId }, { wait: true, timeout: 10_000 }), + ); }, async getAccessTokenForSession(sessionId: string) {