feat(foundry): revert actions to queue/workflow pattern with direct sends

Revert actor communication from direct action calls to queue/workflow-based
patterns for better observability (workflow history in RivetKit inspector),
replay/recovery semantics, and idiomatic RivetKit usage.

- Add queue/workflow infrastructure to all actors: organization, task, user,
  github-data, sandbox, and audit-log
- Mutations route through named queues processed by workflow command loops
  with ctx.step() wrapping for c.state/c.db access and observability
- Remove command action wrappers (~460 lines) — callers use .send() directly
  to queue names with expectQueueResponse() for wait:true results
- Keep sendPrompt and runProcess as direct sandbox actions (long-running /
  large responses that would block the workflow loop or exceed 128KB limit)
- Fix workspace fire-and-forget calls (enqueueWorkspaceEnsureSession,
  enqueueWorkspaceRefresh) to self-send to task queue instead of calling
  directly outside workflow step context

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Nathan Flurry 2026-03-16 18:46:53 -07:00
parent 4111aebfce
commit a171956298
21 changed files with 1443 additions and 688 deletions

View file

@ -1,7 +1,10 @@
// @ts-nocheck
import { and, desc, eq } from "drizzle-orm";
import { actor } from "rivetkit";
import { actor, queue } from "rivetkit";
import { workflow, Loop } from "rivetkit/workflow";
import type { AuditLogEvent } from "@sandbox-agent/foundry-shared";
import { selfAuditLog } from "../handles.js";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { auditLogDb } from "./db/db.js";
import { events } from "./db/schema.js";
@ -24,6 +27,91 @@ export interface ListAuditLogParams {
limit?: number;
}
// ---------------------------------------------------------------------------
// Queue names
// ---------------------------------------------------------------------------
const AUDIT_LOG_QUEUE_NAMES = ["auditLog.command.append"] as const;
type AuditLogQueueName = (typeof AUDIT_LOG_QUEUE_NAMES)[number];
function auditLogWorkflowQueueName(name: AuditLogQueueName): AuditLogQueueName {
return name;
}
// ---------------------------------------------------------------------------
// Mutation functions
// ---------------------------------------------------------------------------
async function appendMutation(c: any, body: AppendAuditLogCommand): Promise<{ ok: true }> {
const now = Date.now();
await c.db
.insert(events)
.values({
repoId: body.repoId ?? null,
taskId: body.taskId ?? null,
branchName: body.branchName ?? null,
kind: body.kind,
payloadJson: JSON.stringify(body.payload),
createdAt: now,
})
.run();
return { ok: true };
}
// ---------------------------------------------------------------------------
// Workflow command loop
// ---------------------------------------------------------------------------
type AuditLogWorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
const AUDIT_LOG_COMMAND_HANDLERS: Record<AuditLogQueueName, AuditLogWorkflowHandler> = {
"auditLog.command.append": async (c, body) => appendMutation(c, body),
};
async function runAuditLogWorkflow(ctx: any): Promise<void> {
await ctx.loop("audit-log-command-loop", async (loopCtx: any) => {
const msg = await loopCtx.queue.next("next-audit-log-command", {
names: [...AUDIT_LOG_QUEUE_NAMES],
completable: true,
});
if (!msg) {
return Loop.continue(undefined);
}
const handler = AUDIT_LOG_COMMAND_HANDLERS[msg.name as AuditLogQueueName];
if (!handler) {
logActorWarning("auditLog", "unknown audit-log command", { command: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
return Loop.continue(undefined);
}
try {
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
const result = await loopCtx.step({
name: msg.name,
timeout: 60_000,
run: async () => handler(loopCtx, msg.body),
});
await msg.complete(result);
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("auditLog", "audit-log workflow command failed", {
command: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {});
}
return Loop.continue(undefined);
});
}
// ---------------------------------------------------------------------------
// Actor definition
// ---------------------------------------------------------------------------
/**
* Organization-scoped audit log. One per org, not one per repo.
*
@ -35,6 +123,7 @@ export interface ListAuditLogParams {
*/
export const auditLog = actor({
db: auditLogDb,
queues: Object.fromEntries(AUDIT_LOG_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "Audit Log",
icon: "database",
@ -43,22 +132,14 @@ export const auditLog = actor({
organizationId: input.organizationId,
}),
actions: {
async append(c, body: AppendAuditLogCommand): Promise<{ ok: true }> {
const now = Date.now();
await c.db
.insert(events)
.values({
repoId: body.repoId ?? null,
taskId: body.taskId ?? null,
branchName: body.branchName ?? null,
kind: body.kind,
payloadJson: JSON.stringify(body.payload),
createdAt: now,
})
.run();
// Mutation — self-send to queue for workflow history
async append(c: any, body: AppendAuditLogCommand): Promise<{ ok: true }> {
const self = selfAuditLog(c);
await self.send(auditLogWorkflowQueueName("auditLog.command.append"), body, { wait: false });
return { ok: true };
},
// Read — direct action (no queue)
async list(c, params?: ListAuditLogParams): Promise<AuditLogEvent[]> {
const whereParts = [];
if (params?.repoId) {
@ -95,4 +176,5 @@ export const auditLog = actor({
}));
},
},
run: workflow(runAuditLogWorkflow),
});

View file

@ -1,15 +1,17 @@
// @ts-nocheck
import { eq, inArray } from "drizzle-orm";
import { actor } from "rivetkit";
import { actor, queue } from "rivetkit";
import { workflow, Loop } from "rivetkit/workflow";
import type { FoundryOrganization } from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js";
import { getOrCreateOrganization, getTask } from "../handles.js";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { taskWorkflowQueueName } from "../task/workflow/queue.js";
import { repoIdFromRemote } from "../../services/repo.js";
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
// actions called directly (no queue)
import { organizationWorkflowQueueName } from "../organization/queues.js";
import { githubDataDb } from "./db/db.js";
import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js";
// workflow.ts is no longer used — commands are actions now
const META_ROW_ID = 1;
const SYNC_REPOSITORY_BATCH_SIZE = 10;
@ -74,7 +76,19 @@ interface ClearStateInput {
label: string;
}
// sendOrganizationCommand removed — org actions called directly
// Queue names for github-data actor
export const GITHUB_DATA_QUEUE_NAMES = [
"githubData.command.syncRepos",
"githubData.command.handlePullRequestWebhook",
"githubData.command.clearState",
"githubData.command.reloadRepository",
] as const;
type GithubDataQueueName = (typeof GITHUB_DATA_QUEUE_NAMES)[number];
export function githubDataWorkflowQueueName(name: GithubDataQueueName): GithubDataQueueName {
return name;
}
interface PullRequestWebhookInput {
connectedAccount: string;
@ -209,18 +223,22 @@ async function writeMeta(c: any, patch: Partial<GithubMetaState>) {
async function publishSyncProgress(c: any, patch: Partial<GithubMetaState>): Promise<GithubMetaState> {
const meta = await writeMeta(c, patch);
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await organization.commandApplyGithubSyncProgress({
connectedAccount: meta.connectedAccount,
installationStatus: meta.installationStatus,
installationId: meta.installationId,
syncStatus: meta.syncStatus,
lastSyncLabel: meta.lastSyncLabel,
lastSyncAt: meta.lastSyncAt,
syncGeneration: meta.syncGeneration,
syncPhase: meta.syncPhase,
processedRepositoryCount: meta.processedRepositoryCount,
totalRepositoryCount: meta.totalRepositoryCount,
});
await organization.send(
organizationWorkflowQueueName("organization.command.github.sync_progress.apply"),
{
connectedAccount: meta.connectedAccount,
installationStatus: meta.installationStatus,
installationId: meta.installationId,
syncStatus: meta.syncStatus,
lastSyncLabel: meta.lastSyncLabel,
lastSyncAt: meta.lastSyncAt,
syncGeneration: meta.syncGeneration,
syncPhase: meta.syncPhase,
processedRepositoryCount: meta.processedRepositoryCount,
totalRepositoryCount: meta.totalRepositoryCount,
},
{ wait: false },
);
return meta;
}
@ -424,7 +442,13 @@ async function refreshTaskSummaryForBranch(c: any, repoId: string, branchName: s
return;
}
const organization = await getOrCreateOrganization(c, c.state.organizationId);
void organization.commandRefreshTaskSummaryForBranch({ repoId, branchName, pullRequest, repoName: repositoryRecord.fullName ?? undefined }).catch(() => {});
void organization
.send(
organizationWorkflowQueueName("organization.command.refreshTaskSummaryForBranch"),
{ repoId, branchName, pullRequest, repoName: repositoryRecord.fullName ?? undefined },
{ wait: false },
)
.catch(() => {});
}
async function emitPullRequestChangeEvents(c: any, beforeRows: any[], afterRows: any[]) {
@ -472,7 +496,7 @@ async function autoArchiveTaskForClosedPullRequest(c: any, row: any) {
}
try {
const task = getTask(c, c.state.organizationId, row.repoId, match.taskId);
void task.archive({ reason: `PR ${String(row.state).toLowerCase()}` }).catch(() => {});
void task.send(taskWorkflowQueueName("task.command.archive"), { reason: `PR ${String(row.state).toLowerCase()}` }, { wait: false }).catch(() => {});
} catch {
// Best-effort only. Task summary refresh will still clear the PR state.
}
@ -877,8 +901,79 @@ export async function fullSyncError(c: any, error: unknown): Promise<void> {
});
}
// ---------------------------------------------------------------------------
// Workflow command loop
// ---------------------------------------------------------------------------
type GithubDataWorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
const GITHUB_DATA_COMMAND_HANDLERS: Record<GithubDataQueueName, GithubDataWorkflowHandler> = {
"githubData.command.syncRepos": async (c, body) => {
try {
await runFullSync(c, body);
return { ok: true };
} catch (error) {
try {
await fullSyncError(c, error);
} catch {
/* best effort */
}
throw error;
}
},
"githubData.command.handlePullRequestWebhook": async (c, body) => {
await handlePullRequestWebhookMutation(c, body);
return { ok: true };
},
"githubData.command.clearState": async (c, body) => {
await clearStateMutation(c, body);
return { ok: true };
},
"githubData.command.reloadRepository": async (c, body) => reloadRepositoryMutation(c, body),
};
async function runGithubDataWorkflow(ctx: any): Promise<void> {
await ctx.loop("github-data-command-loop", async (loopCtx: any) => {
const msg = await loopCtx.queue.next("next-github-data-command", {
names: [...GITHUB_DATA_QUEUE_NAMES],
completable: true,
});
if (!msg) {
return Loop.continue(undefined);
}
const handler = GITHUB_DATA_COMMAND_HANDLERS[msg.name as GithubDataQueueName];
if (!handler) {
logActorWarning("github-data", "unknown github-data command", { command: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
return Loop.continue(undefined);
}
try {
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
const result = await loopCtx.step({
name: msg.name,
timeout: 10 * 60_000,
run: async () => handler(loopCtx, msg.body),
});
await msg.complete(result);
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("github-data", "github-data workflow command failed", {
command: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {});
}
return Loop.continue(undefined);
});
}
export const githubData = actor({
db: githubDataDb,
queues: Object.fromEntries(GITHUB_DATA_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "GitHub Data",
icon: "github",
@ -945,35 +1040,8 @@ export const githubData = actor({
}))
.sort((left, right) => left.branchName.localeCompare(right.branchName));
},
async syncRepos(c, body: any) {
try {
await runFullSync(c, body);
return { ok: true };
} catch (error) {
try {
await fullSyncError(c, error);
} catch {
/* best effort */
}
throw error;
}
},
async reloadRepository(c, body: { repoId: string }) {
return await reloadRepositoryMutation(c, body);
},
async clearState(c, body: any) {
await clearStateMutation(c, body);
return { ok: true };
},
async handlePullRequestWebhook(c, body: any) {
await handlePullRequestWebhookMutation(c, body);
return { ok: true };
},
},
run: workflow(runGithubDataWorkflow),
});
export async function reloadRepositoryMutation(c: any, input: { repoId: string }) {

View file

@ -79,3 +79,7 @@ export function selfUser(c: any) {
export function selfGithubData(c: any) {
return actorClient(c).githubData.getForId(c.actorId);
}
export function selfTaskSandbox(c: any) {
return actorClient(c).taskSandbox.getForId(c.actorId);
}

View file

@ -1,6 +1,7 @@
import { desc } from "drizzle-orm";
import type { FoundryAppSnapshot } from "@sandbox-agent/foundry-shared";
import { getOrCreateGithubData, getOrCreateOrganization } from "../../handles.js";
import { githubDataWorkflowQueueName } from "../../github-data/index.js";
import { authSessionIndex } from "../db/schema.js";
import {
assertAppOrganization,
@ -11,6 +12,7 @@ import {
} from "../app-shell.js";
import { getBetterAuthService } from "../../../services/better-auth.js";
import { refreshOrganizationSnapshotMutation } from "../actions.js";
import { organizationWorkflowQueueName } from "../queues.js";
export const organizationGithubActions = {
async resolveAppGithubToken(
@ -58,21 +60,27 @@ export const organizationGithubActions = {
}
const organizationHandle = await getOrCreateOrganization(c, input.organizationId);
await organizationHandle.commandMarkSyncStarted({ label: "Importing repository catalog..." });
await organizationHandle.commandBroadcastSnapshot({});
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.shell.sync_started.mark"),
{ label: "Importing repository catalog..." },
{ wait: false },
);
await organizationHandle.send(organizationWorkflowQueueName("organization.command.snapshot.broadcast"), {}, { wait: false });
void githubData.syncRepos({ label: "Importing repository catalog..." }).catch(() => {});
void githubData
.send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Importing repository catalog..." }, { wait: false })
.catch(() => {});
return await buildAppSnapshot(c, input.sessionId);
},
async adminReloadGithubOrganization(c: any): Promise<void> {
const githubData = await getOrCreateGithubData(c, c.state.organizationId);
await githubData.syncRepos({ label: "Reloading GitHub organization..." });
await githubData.send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Reloading GitHub organization..." }, { wait: false });
},
async adminReloadGithubRepository(c: any, input: { repoId: string }): Promise<void> {
const githubData = await getOrCreateGithubData(c, c.state.organizationId);
await githubData.reloadRepository(input);
await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), input, { wait: false });
},
};

View file

@ -1,7 +1,6 @@
import type { FoundryAppSnapshot, UpdateFoundryOrganizationProfileInput, WorkspaceModelId } from "@sandbox-agent/foundry-shared";
import { getBetterAuthService } from "../../../services/better-auth.js";
import { getOrCreateOrganization } from "../../handles.js";
// actions called directly (no queue)
import {
assertAppOrganization,
assertOrganizationShell,
@ -11,7 +10,7 @@ import {
requireEligibleOrganization,
requireSignedInSession,
} from "../app-shell.js";
// org queue names removed — using direct actions
import { organizationWorkflowQueueName } from "../queues.js";
export const organizationShellActions = {
async getAppSnapshot(c: any, input: { sessionId: string }): Promise<FoundryAppSnapshot> {
@ -35,11 +34,15 @@ export const organizationShellActions = {
const session = await requireSignedInSession(c, input.sessionId);
requireEligibleOrganization(session, input.organizationId);
const organization = await getOrCreateOrganization(c, input.organizationId);
await organization.commandUpdateShellProfile({
displayName: input.displayName,
slug: input.slug,
primaryDomain: input.primaryDomain,
});
await organization.send(
organizationWorkflowQueueName("organization.command.shell.profile.update"),
{
displayName: input.displayName,
slug: input.slug,
primaryDomain: input.primaryDomain,
},
{ wait: true, timeout: 10_000 },
);
return await buildAppSnapshot(c, input.sessionId);
},

View file

@ -17,6 +17,8 @@ import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../../service
// actions return directly (no queue response unwrapping)
import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js";
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
import { taskWorkflowQueueName } from "../../task/workflow/queue.js";
import { expectQueueResponse } from "../../../services/queue.js";
import { taskIndex, taskSummaries } from "../db/schema.js";
import { refreshOrganizationSnapshotMutation } from "../actions.js";
@ -202,12 +204,18 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis
throw error;
}
const created = await taskHandle.initialize({
sandboxProviderId: cmd.sandboxProviderId,
branchName: initialBranchName,
title: initialTitle,
task: cmd.task,
});
const created = expectQueueResponse<TaskRecord>(
await taskHandle.send(
taskWorkflowQueueName("task.command.initialize"),
{
sandboxProviderId: cmd.sandboxProviderId,
branchName: initialBranchName,
title: initialTitle,
task: cmd.task,
},
{ wait: true, timeout: 10_000 },
),
);
try {
await upsertTaskSummary(c, await taskHandle.getTaskSummary({}));
@ -384,7 +392,7 @@ export async function refreshTaskSummaryForBranchMutation(
// Best-effort notify the task actor if it exists (fire-and-forget)
try {
const task = getTask(c, c.state.organizationId, input.repoId, row.taskId);
void task.pullRequestSync({ pullRequest }).catch(() => {});
void task.send(taskWorkflowQueueName("task.command.pull_request.sync"), { pullRequest }, { wait: false }).catch(() => {});
} catch {
// Task actor doesn't exist yet — that's fine, it's virtual
}

View file

@ -25,6 +25,8 @@ import { getActorRuntimeContext } from "../../context.js";
import { getOrCreateAuditLog, getOrCreateTask, getTask as getTaskHandle } from "../../handles.js";
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
import { logActorWarning, resolveErrorMessage } from "../../logging.js";
import { taskWorkflowQueueName } from "../../task/workflow/queue.js";
import { expectQueueResponse } from "../../../services/queue.js";
import { taskIndex, taskSummaries } from "../db/schema.js";
import {
createTaskMutation,
@ -131,11 +133,15 @@ export const organizationTaskActions = {
const task = await requireWorkspaceTask(c, input.repoId, created.taskId);
void task
.createSessionAndSend({
model: input.model,
text: input.task,
authSessionId: input.authSessionId,
})
.send(
taskWorkflowQueueName("task.command.workspace.create_session_and_send"),
{
model: input.model,
text: input.task,
authSessionId: input.authSessionId,
},
{ wait: false },
)
.catch(() => {});
return { taskId: created.taskId };
@ -143,94 +149,132 @@ export const organizationTaskActions = {
async markWorkspaceUnread(c: any, input: TaskWorkspaceSelectInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.markUnread({ authSessionId: input.authSessionId });
await task.send(taskWorkflowQueueName("task.command.workspace.mark_unread"), { authSessionId: input.authSessionId }, { wait: false });
},
async renameWorkspaceTask(c: any, input: TaskWorkspaceRenameInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.renameTask({ value: input.value });
await task.send(taskWorkflowQueueName("task.command.workspace.rename_task"), { value: input.value }, { wait: false });
},
async createWorkspaceSession(c: any, input: TaskWorkspaceSelectInput & { model?: string }): Promise<{ sessionId: string }> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
return await task.createSession({
...(input.model ? { model: input.model } : {}),
...(input.authSessionId ? { authSessionId: input.authSessionId } : {}),
});
return expectQueueResponse(
await task.send(
taskWorkflowQueueName("task.command.workspace.create_session"),
{
...(input.model ? { model: input.model } : {}),
...(input.authSessionId ? { authSessionId: input.authSessionId } : {}),
},
{ wait: true, timeout: 10_000 },
),
);
},
async renameWorkspaceSession(c: any, input: TaskWorkspaceRenameSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.renameSession({ sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId });
await task.send(
taskWorkflowQueueName("task.command.workspace.rename_session"),
{ sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId },
{ wait: false },
);
},
async selectWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.selectSession({ sessionId: input.sessionId, authSessionId: input.authSessionId });
await task.send(
taskWorkflowQueueName("task.command.workspace.select_session"),
{ sessionId: input.sessionId, authSessionId: input.authSessionId },
{ wait: false },
);
},
async setWorkspaceSessionUnread(c: any, input: TaskWorkspaceSetSessionUnreadInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.setSessionUnread({ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId });
await task.send(
taskWorkflowQueueName("task.command.workspace.set_session_unread"),
{ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId },
{ wait: false },
);
},
async updateWorkspaceDraft(c: any, input: TaskWorkspaceUpdateDraftInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
void task
.updateDraft({
sessionId: input.sessionId,
text: input.text,
attachments: input.attachments,
authSessionId: input.authSessionId,
})
.send(
taskWorkflowQueueName("task.command.workspace.update_draft"),
{
sessionId: input.sessionId,
text: input.text,
attachments: input.attachments,
authSessionId: input.authSessionId,
},
{ wait: false },
)
.catch(() => {});
},
async changeWorkspaceModel(c: any, input: TaskWorkspaceChangeModelInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.changeModel({ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId });
await task.send(
taskWorkflowQueueName("task.command.workspace.change_model"),
{ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId },
{ wait: false },
);
},
async sendWorkspaceMessage(c: any, input: TaskWorkspaceSendMessageInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
void task
.sendMessage({
sessionId: input.sessionId,
text: input.text,
attachments: input.attachments,
authSessionId: input.authSessionId,
})
.send(
taskWorkflowQueueName("task.command.workspace.send_message"),
{
sessionId: input.sessionId,
text: input.text,
attachments: input.attachments,
authSessionId: input.authSessionId,
},
{ wait: false },
)
.catch(() => {});
},
async stopWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
void task.stopSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {});
void task
.send(taskWorkflowQueueName("task.command.workspace.stop_session"), { sessionId: input.sessionId, authSessionId: input.authSessionId }, { wait: false })
.catch(() => {});
},
async closeWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
void task.closeSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {});
void task
.send(taskWorkflowQueueName("task.command.workspace.close_session"), { sessionId: input.sessionId, authSessionId: input.authSessionId }, { wait: false })
.catch(() => {});
},
async publishWorkspacePr(c: any, input: TaskWorkspaceSelectInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
void task.publishPr({}).catch(() => {});
void task.send(taskWorkflowQueueName("task.command.workspace.publish_pr"), {}, { wait: false }).catch(() => {});
},
async changeWorkspaceTaskOwner(c: any, input: TaskWorkspaceChangeOwnerInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
await task.changeOwner({
primaryUserId: input.targetUserId,
primaryGithubLogin: input.targetUserName,
primaryGithubEmail: input.targetUserEmail,
primaryGithubAvatarUrl: null,
});
await task.send(
taskWorkflowQueueName("task.command.workspace.change_owner"),
{
primaryUserId: input.targetUserId,
primaryGithubLogin: input.targetUserName,
primaryGithubEmail: input.targetUserEmail,
primaryGithubAvatarUrl: null,
},
{ wait: false },
);
},
async revertWorkspaceFile(c: any, input: TaskWorkspaceDiffInput): Promise<void> {
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
void task.revertFile(input).catch(() => {});
void task.send(taskWorkflowQueueName("task.command.workspace.revert_file"), input, { wait: false }).catch(() => {});
},
async getRepoOverview(c: any, input: RepoOverviewInput): Promise<RepoOverview> {
@ -250,7 +294,9 @@ export const organizationTaskActions = {
async switchTask(c: any, input: { repoId: string; taskId: string }): Promise<SwitchResult> {
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
const record = await h.get();
const switched = await h.switchTask({});
const switched = expectQueueResponse<{ switchTarget: string | null }>(
await h.send(taskWorkflowQueueName("task.command.switch"), {}, { wait: true, timeout: 10_000 }),
);
return {
organizationId: c.state.organizationId,
taskId: input.taskId,
@ -288,42 +334,42 @@ export const organizationTaskActions = {
assertOrganization(c, input.organizationId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
return await h.attach({ reason: input.reason });
return expectQueueResponse(await h.send(taskWorkflowQueueName("task.command.attach"), { reason: input.reason }, { wait: true, timeout: 10_000 }));
},
async pushTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
void h.push({ reason: input.reason }).catch(() => {});
void h.send(taskWorkflowQueueName("task.command.push"), { reason: input.reason }, { wait: false }).catch(() => {});
},
async syncTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
void h.sync({ reason: input.reason }).catch(() => {});
void h.send(taskWorkflowQueueName("task.command.sync"), { reason: input.reason }, { wait: false }).catch(() => {});
},
async mergeTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
void h.merge({ reason: input.reason }).catch(() => {});
void h.send(taskWorkflowQueueName("task.command.merge"), { reason: input.reason }, { wait: false }).catch(() => {});
},
async archiveTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
void h.archive({ reason: input.reason }).catch(() => {});
void h.send(taskWorkflowQueueName("task.command.archive"), { reason: input.reason }, { wait: false }).catch(() => {});
},
async killTask(c: any, input: TaskProxyActionInput): Promise<void> {
assertOrganization(c, input.organizationId);
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
void h.kill({ reason: input.reason }).catch(() => {});
void h.send(taskWorkflowQueueName("task.command.kill"), { reason: input.reason }, { wait: false }).catch(() => {});
},
async getRepositoryMetadata(c: any, input: { repoId: string }): Promise<{ defaultBranch: string | null; fullName: string | null; remoteUrl: string }> {

View file

@ -17,6 +17,8 @@ import { GitHubAppError } from "../../services/app-github.js";
import { getBetterAuthService } from "../../services/better-auth.js";
import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js";
import { logger } from "../../logging.js";
import { githubDataWorkflowQueueName } from "../github-data/index.js";
import { organizationWorkflowQueueName } from "./queues.js";
import { invoices, organizationMembers, organizationProfile, seatAssignments, stripeLookup } from "./db/schema.js";
import { APP_SHELL_ORGANIZATION_ID } from "./constants.js";
@ -482,19 +484,23 @@ async function syncGithubOrganizationsInternal(c: any, input: { sessionId: strin
const organizationId = organizationOrganizationId(account.kind, account.githubLogin);
const installation = installations.find((candidate) => candidate.accountLogin === account.githubLogin) ?? null;
const organization = await getOrCreateOrganization(c, organizationId);
await organization.commandSyncOrganizationShellFromGithub({
userId: githubUserId,
userName: viewer.name || viewer.login,
userEmail: viewer.email ?? `${viewer.login}@users.noreply.github.com`,
githubUserLogin: viewer.login,
githubAccountId: account.githubAccountId,
githubLogin: account.githubLogin,
githubAccountType: account.githubAccountType,
kind: account.kind,
displayName: account.displayName,
installationId: installation?.id ?? null,
appConfigured: appShell.github.isAppConfigured(),
});
await organization.send(
organizationWorkflowQueueName("organization.command.github.organization_shell.sync_from_github"),
{
userId: githubUserId,
userName: viewer.name || viewer.login,
userEmail: viewer.email ?? `${viewer.login}@users.noreply.github.com`,
githubUserLogin: viewer.login,
githubAccountId: account.githubAccountId,
githubLogin: account.githubLogin,
githubAccountType: account.githubAccountType,
kind: account.kind,
displayName: account.displayName,
installationId: installation?.id ?? null,
appConfigured: appShell.github.isAppConfigured(),
},
{ wait: true, timeout: 10_000 },
);
linkedOrganizationIds.push(organizationId);
}
@ -677,10 +683,14 @@ async function applySubscriptionState(
},
fallbackPlanId: FoundryBillingPlanId,
): Promise<void> {
await organization.commandApplyStripeSubscription({
subscription,
fallbackPlanId,
});
await organization.send(
organizationWorkflowQueueName("organization.command.billing.stripe_subscription.apply"),
{
subscription,
fallbackPlanId,
},
{ wait: true, timeout: 10_000 },
);
}
export const organizationAppActions = {
@ -693,9 +703,13 @@ export const organizationAppActions = {
const organizationState = await getOrganizationState(organizationHandle);
if (input.planId === "free") {
await organizationHandle.commandApplyFreePlan({
clearSubscription: false,
});
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.free_plan.apply"),
{
clearSubscription: false,
},
{ wait: true, timeout: 10_000 },
);
return {
url: `${appShell.appUrl}/organizations/${input.organizationId}/billing`,
};
@ -714,9 +728,13 @@ export const organizationAppActions = {
email: session.currentUserEmail,
})
).id;
await organizationHandle.commandApplyStripeCustomer({
customerId,
});
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"),
{
customerId,
},
{ wait: true, timeout: 10_000 },
);
await upsertStripeLookupEntries(c, input.organizationId, customerId, null);
}
@ -744,9 +762,13 @@ export const organizationAppActions = {
const completion = await appShell.stripe.retrieveCheckoutCompletion(input.checkoutSessionId);
if (completion.customerId) {
await organizationHandle.commandApplyStripeCustomer({
customerId: completion.customerId,
});
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"),
{
customerId: completion.customerId,
},
{ wait: true, timeout: 10_000 },
);
}
await upsertStripeLookupEntries(c, input.organizationId, completion.customerId, completion.subscriptionId);
@ -756,9 +778,13 @@ export const organizationAppActions = {
}
if (completion.paymentMethodLabel) {
await organizationHandle.commandSetPaymentMethod({
label: completion.paymentMethodLabel,
});
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.payment_method.set"),
{
label: completion.paymentMethodLabel,
},
{ wait: true, timeout: 10_000 },
);
}
return {
@ -796,9 +822,13 @@ export const organizationAppActions = {
await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId);
await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id);
} else {
await organizationHandle.commandSetBillingStatus({
status: "scheduled_cancel",
});
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.status.set"),
{
status: "scheduled_cancel",
},
{ wait: true, timeout: 10_000 },
);
}
return await buildAppSnapshot(c, input.sessionId);
@ -817,9 +847,13 @@ export const organizationAppActions = {
await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId);
await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id);
} else {
await organizationHandle.commandSetBillingStatus({
status: "active",
});
await organizationHandle.send(
organizationWorkflowQueueName("organization.command.billing.status.set"),
{
status: "active",
},
{ wait: true, timeout: 10_000 },
);
}
return await buildAppSnapshot(c, input.sessionId);
@ -830,9 +864,13 @@ export const organizationAppActions = {
const session = await requireSignedInSession(c, input.sessionId);
requireEligibleOrganization(session, input.organizationId);
const organization = await getOrCreateOrganization(c, input.organizationId);
await organization.commandRecordSeatUsage({
email: session.currentUserEmail,
});
await organization.send(
organizationWorkflowQueueName("organization.command.billing.seat_usage.record"),
{
email: session.currentUserEmail,
},
{ wait: true, timeout: 10_000 },
);
return await buildAppSnapshot(c, input.sessionId);
},
@ -853,9 +891,13 @@ export const organizationAppActions = {
if (organizationId) {
const organization = await getOrCreateOrganization(c, organizationId);
if (typeof object.customer === "string") {
await organization.commandApplyStripeCustomer({
customerId: object.customer,
});
await organization.send(
organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"),
{
customerId: object.customer,
},
{ wait: true, timeout: 10_000 },
);
}
await upsertStripeLookupEntries(
c,
@ -888,9 +930,13 @@ export const organizationAppActions = {
const organizationId = await findOrganizationIdForStripeEvent(c, subscription.customerId, subscription.id);
if (organizationId) {
const organization = await getOrCreateOrganization(c, organizationId);
await organization.commandApplyFreePlan({
clearSubscription: true,
});
await organization.send(
organizationWorkflowQueueName("organization.command.billing.free_plan.apply"),
{
clearSubscription: true,
},
{ wait: true, timeout: 10_000 },
);
}
return { ok: true };
}
@ -902,13 +948,17 @@ export const organizationAppActions = {
const organization = await getOrCreateOrganization(c, organizationId);
const rawAmount = typeof invoice.amount_paid === "number" ? invoice.amount_paid : invoice.amount_due;
const amountUsd = Math.round((typeof rawAmount === "number" ? rawAmount : 0) / 100);
await organization.commandUpsertInvoice({
id: String(invoice.id),
label: typeof invoice.number === "string" ? `Invoice ${invoice.number}` : "Stripe invoice",
issuedAt: formatUnixDate(typeof invoice.created === "number" ? invoice.created : Math.floor(Date.now() / 1000)),
amountUsd: Number.isFinite(amountUsd) ? amountUsd : 0,
status: event.type === "invoice.paid" ? "paid" : "open",
});
await organization.send(
organizationWorkflowQueueName("organization.command.billing.invoice.upsert"),
{
id: String(invoice.id),
label: typeof invoice.number === "string" ? `Invoice ${invoice.number}` : "Stripe invoice",
issuedAt: formatUnixDate(typeof invoice.created === "number" ? invoice.created : Math.floor(Date.now() / 1000)),
amountUsd: Number.isFinite(amountUsd) ? amountUsd : 0,
status: event.type === "invoice.paid" ? "paid" : "open",
},
{ wait: true, timeout: 10_000 },
);
}
}
@ -938,12 +988,16 @@ export const organizationAppActions = {
const organizationId = organizationOrganizationId(kind, accountLogin);
const receivedAt = Date.now();
const organization = await getOrCreateOrganization(c, organizationId);
await organization.commandRecordGithubWebhookReceipt({
organizationId: organizationId,
event,
action: body.action ?? null,
receivedAt,
});
await organization.send(
organizationWorkflowQueueName("organization.command.github.webhook_receipt.record"),
{
organizationId: organizationId,
event,
action: body.action ?? null,
receivedAt,
},
{ wait: false },
);
const githubData = await getOrCreateGithubData(c, organizationId);
if (event === "installation" && (body.action === "created" || body.action === "deleted" || body.action === "suspend" || body.action === "unsuspend")) {
@ -957,40 +1011,56 @@ export const organizationAppActions = {
"installation_event",
);
if (body.action === "deleted") {
await githubData.clearState({
connectedAccount: accountLogin,
installationStatus: "install_required",
installationId: null,
label: "GitHub App installation removed",
});
await githubData.send(
githubDataWorkflowQueueName("githubData.command.clearState"),
{
connectedAccount: accountLogin,
installationStatus: "install_required",
installationId: null,
label: "GitHub App installation removed",
},
{ wait: false },
);
} else if (body.action === "created") {
void githubData
.syncRepos({
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Syncing GitHub data from installation webhook...",
})
.send(
githubDataWorkflowQueueName("githubData.command.syncRepos"),
{
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Syncing GitHub data from installation webhook...",
},
{ wait: false },
)
.catch(() => {});
} else if (body.action === "suspend") {
await githubData.clearState({
connectedAccount: accountLogin,
installationStatus: "reconnect_required",
installationId: body.installation?.id ?? null,
label: "GitHub App installation suspended",
});
await githubData.send(
githubDataWorkflowQueueName("githubData.command.clearState"),
{
connectedAccount: accountLogin,
installationStatus: "reconnect_required",
installationId: body.installation?.id ?? null,
label: "GitHub App installation suspended",
},
{ wait: false },
);
} else if (body.action === "unsuspend") {
void githubData
.syncRepos({
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Resyncing GitHub data after unsuspend...",
})
.send(
githubDataWorkflowQueueName("githubData.command.syncRepos"),
{
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Resyncing GitHub data after unsuspend...",
},
{ wait: false },
)
.catch(() => {});
}
return { ok: true };
@ -1009,14 +1079,18 @@ export const organizationAppActions = {
"repository_membership_changed",
);
void githubData
.syncRepos({
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Resyncing GitHub data after repository access change...",
})
.send(
githubDataWorkflowQueueName("githubData.command.syncRepos"),
{
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
githubLogin: accountLogin,
kind,
label: "Resyncing GitHub data after repository access change...",
},
{ wait: false },
)
.catch(() => {});
return { ok: true };
}
@ -1045,35 +1119,39 @@ export const organizationAppActions = {
"repository_event",
);
if (event === "pull_request" && body.repository?.clone_url && body.pull_request) {
await githubData.handlePullRequestWebhook({
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
repository: {
fullName: body.repository.full_name,
cloneUrl: body.repository.clone_url,
private: Boolean(body.repository.private),
await githubData.send(
githubDataWorkflowQueueName("githubData.command.handlePullRequestWebhook"),
{
connectedAccount: accountLogin,
installationStatus: "connected",
installationId: body.installation?.id ?? null,
repository: {
fullName: body.repository.full_name,
cloneUrl: body.repository.clone_url,
private: Boolean(body.repository.private),
},
pullRequest: {
number: body.pull_request.number,
status: body.pull_request.draft ? "draft" : "ready",
title: body.pull_request.title ?? "",
body: body.pull_request.body ?? null,
state: body.pull_request.state ?? "open",
url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`,
headRefName: body.pull_request.head?.ref ?? "",
baseRefName: body.pull_request.base?.ref ?? "",
authorLogin: body.pull_request.user?.login ?? null,
isDraft: Boolean(body.pull_request.draft),
merged: Boolean(body.pull_request.merged),
},
},
pullRequest: {
number: body.pull_request.number,
status: body.pull_request.draft ? "draft" : "ready",
title: body.pull_request.title ?? "",
body: body.pull_request.body ?? null,
state: body.pull_request.state ?? "open",
url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`,
headRefName: body.pull_request.head?.ref ?? "",
baseRefName: body.pull_request.base?.ref ?? "",
authorLogin: body.pull_request.user?.login ?? null,
isDraft: Boolean(body.pull_request.draft),
merged: Boolean(body.pull_request.merged),
},
});
{ wait: false },
);
}
if ((event === "push" || event === "create" || event === "delete") && body.repository?.clone_url) {
const repoId = repoIdFromRemote(body.repository.clone_url);
const knownRepository = await githubData.getRepository({ repoId });
if (knownRepository) {
await githubData.reloadRepository({ repoId });
await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), { repoId }, { wait: false });
}
}
}
@ -1232,14 +1310,18 @@ export async function syncOrganizationShellFromGithubMutation(
if (needsInitialSync) {
const githubData = await getOrCreateGithubData(c, organizationId);
void githubData
.syncRepos({
connectedAccount: input.githubLogin,
installationStatus: "connected",
installationId: input.installationId,
githubLogin: input.githubLogin,
kind: input.kind,
label: "Initial repository sync...",
})
.send(
githubDataWorkflowQueueName("githubData.command.syncRepos"),
{
connectedAccount: input.githubLogin,
installationStatus: "connected",
installationId: input.installationId,
githubLogin: input.githubLogin,
kind: input.kind,
label: "Initial repository sync...",
},
{ wait: false },
)
.catch(() => {});
}

View file

@ -1,10 +1,13 @@
import { actor } from "rivetkit";
import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import { organizationDb } from "./db/db.js";
import { organizationActions } from "./actions.js";
import { organizationCommandActions } from "./workflow.js";
import { runOrganizationWorkflow } from "./workflow.js";
import { ORGANIZATION_QUEUE_NAMES } from "./queues.js";
export const organization = actor({
db: organizationDb,
queues: Object.fromEntries(ORGANIZATION_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "Organization",
icon: "compass",
@ -15,6 +18,6 @@ export const organization = actor({
}),
actions: {
...organizationActions,
...organizationCommandActions,
},
run: workflow(runOrganizationWorkflow),
});

View file

@ -1,8 +1,17 @@
// @ts-nocheck
/**
* Organization command actions converted from queue handlers to direct actions.
* Each export becomes an action on the organization actor.
* Organization workflow queue-based command loop.
*
* Mutations are dispatched through named queues and processed inside workflow
* steps so that every command appears in the RivetKit inspector's workflow
* history. Read actions remain direct (no queue).
*
* Callers send commands directly via `.send()` to the appropriate queue name.
*/
import { Loop } from "rivetkit/workflow";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { ORGANIZATION_QUEUE_NAMES, type OrganizationQueueName } from "./queues.js";
import { applyGithubSyncProgressMutation, recordGithubWebhookReceiptMutation, refreshOrganizationSnapshotMutation } from "./actions.js";
import {
applyTaskSummaryUpdateMutation,
@ -37,127 +46,164 @@ import {
upsertOrganizationInvoiceMutation,
} from "./app-shell.js";
export const organizationCommandActions = {
async commandCreateTask(c: any, body: any) {
return await createTaskMutation(c, body);
},
async commandMaterializeTask(c: any, body: any) {
return await createTaskMutation(c, body);
},
async commandRegisterTaskBranch(c: any, body: any) {
return await registerTaskBranchMutation(c, body);
},
async commandApplyTaskSummaryUpdate(c: any, body: any) {
// ---------------------------------------------------------------------------
// Workflow command loop — runs inside `run: workflow(runOrganizationWorkflow)`
// ---------------------------------------------------------------------------
type WorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
/**
* Maps queue names to their mutation handlers.
* Each handler receives the workflow loop context and the message body,
* executes the mutation, and returns the result (which is sent back via
* msg.complete).
*/
const COMMAND_HANDLERS: Record<OrganizationQueueName, WorkflowHandler> = {
// Task mutations
"organization.command.createTask": async (c, body) => createTaskMutation(c, body),
"organization.command.materializeTask": async (c, body) => createTaskMutation(c, body),
"organization.command.registerTaskBranch": async (c, body) => registerTaskBranchMutation(c, body),
"organization.command.applyTaskSummaryUpdate": async (c, body) => {
await applyTaskSummaryUpdateMutation(c, body);
return { ok: true };
},
async commandRemoveTaskSummary(c: any, body: any) {
"organization.command.removeTaskSummary": async (c, body) => {
await removeTaskSummaryMutation(c, body);
return { ok: true };
},
async commandRefreshTaskSummaryForBranch(c: any, body: any) {
"organization.command.refreshTaskSummaryForBranch": async (c, body) => {
await refreshTaskSummaryForBranchMutation(c, body);
return { ok: true };
},
async commandBroadcastSnapshot(c: any, _body: any) {
"organization.command.snapshot.broadcast": async (c, _body) => {
await refreshOrganizationSnapshotMutation(c);
return { ok: true };
},
async commandSyncGithubSession(c: any, body: any) {
"organization.command.syncGithubSession": async (c, body) => {
const { syncGithubOrganizations } = await import("./app-shell.js");
await syncGithubOrganizations(c, body);
return { ok: true };
},
// Better Auth index actions
async commandBetterAuthSessionIndexUpsert(c: any, body: any) {
return await betterAuthUpsertSessionIndexMutation(c, body);
},
async commandBetterAuthSessionIndexDelete(c: any, body: any) {
// Better Auth index mutations
"organization.command.better_auth.session_index.upsert": async (c, body) => betterAuthUpsertSessionIndexMutation(c, body),
"organization.command.better_auth.session_index.delete": async (c, body) => {
await betterAuthDeleteSessionIndexMutation(c, body);
return { ok: true };
},
async commandBetterAuthEmailIndexUpsert(c: any, body: any) {
return await betterAuthUpsertEmailIndexMutation(c, body);
},
async commandBetterAuthEmailIndexDelete(c: any, body: any) {
"organization.command.better_auth.email_index.upsert": async (c, body) => betterAuthUpsertEmailIndexMutation(c, body),
"organization.command.better_auth.email_index.delete": async (c, body) => {
await betterAuthDeleteEmailIndexMutation(c, body);
return { ok: true };
},
async commandBetterAuthAccountIndexUpsert(c: any, body: any) {
return await betterAuthUpsertAccountIndexMutation(c, body);
},
async commandBetterAuthAccountIndexDelete(c: any, body: any) {
"organization.command.better_auth.account_index.upsert": async (c, body) => betterAuthUpsertAccountIndexMutation(c, body),
"organization.command.better_auth.account_index.delete": async (c, body) => {
await betterAuthDeleteAccountIndexMutation(c, body);
return { ok: true };
},
async commandBetterAuthVerificationCreate(c: any, body: any) {
return await betterAuthCreateVerificationMutation(c, body);
},
async commandBetterAuthVerificationUpdate(c: any, body: any) {
return await betterAuthUpdateVerificationMutation(c, body);
},
async commandBetterAuthVerificationUpdateMany(c: any, body: any) {
return await betterAuthUpdateManyVerificationMutation(c, body);
},
async commandBetterAuthVerificationDelete(c: any, body: any) {
"organization.command.better_auth.verification.create": async (c, body) => betterAuthCreateVerificationMutation(c, body),
"organization.command.better_auth.verification.update": async (c, body) => betterAuthUpdateVerificationMutation(c, body),
"organization.command.better_auth.verification.update_many": async (c, body) => betterAuthUpdateManyVerificationMutation(c, body),
"organization.command.better_auth.verification.delete": async (c, body) => {
await betterAuthDeleteVerificationMutation(c, body);
return { ok: true };
},
async commandBetterAuthVerificationDeleteMany(c: any, body: any) {
return await betterAuthDeleteManyVerificationMutation(c, body);
},
"organization.command.better_auth.verification.delete_many": async (c, body) => betterAuthDeleteManyVerificationMutation(c, body),
// GitHub sync actions
async commandApplyGithubSyncProgress(c: any, body: any) {
// GitHub sync mutations
"organization.command.github.sync_progress.apply": async (c, body) => {
await applyGithubSyncProgressMutation(c, body);
return { ok: true };
},
async commandRecordGithubWebhookReceipt(c: any, body: any) {
"organization.command.github.webhook_receipt.record": async (c, body) => {
await recordGithubWebhookReceiptMutation(c, body);
return { ok: true };
},
async commandSyncOrganizationShellFromGithub(c: any, body: any) {
return await syncOrganizationShellFromGithubMutation(c, body);
},
"organization.command.github.organization_shell.sync_from_github": async (c, body) => syncOrganizationShellFromGithubMutation(c, body),
// Shell/profile actions
async commandUpdateShellProfile(c: any, body: any) {
// Shell/profile mutations
"organization.command.shell.profile.update": async (c, body) => {
await updateOrganizationShellProfileMutation(c, body);
return { ok: true };
},
async commandMarkSyncStarted(c: any, body: any) {
"organization.command.shell.sync_started.mark": async (c, body) => {
await markOrganizationSyncStartedMutation(c, body);
return { ok: true };
},
// Billing actions
async commandApplyStripeCustomer(c: any, body: any) {
// Billing mutations
"organization.command.billing.stripe_customer.apply": async (c, body) => {
await applyOrganizationStripeCustomerMutation(c, body);
return { ok: true };
},
async commandApplyStripeSubscription(c: any, body: any) {
"organization.command.billing.stripe_subscription.apply": async (c, body) => {
await applyOrganizationStripeSubscriptionMutation(c, body);
return { ok: true };
},
async commandApplyFreePlan(c: any, body: any) {
"organization.command.billing.free_plan.apply": async (c, body) => {
await applyOrganizationFreePlanMutation(c, body);
return { ok: true };
},
async commandSetPaymentMethod(c: any, body: any) {
"organization.command.billing.payment_method.set": async (c, body) => {
await setOrganizationBillingPaymentMethodMutation(c, body);
return { ok: true };
},
async commandSetBillingStatus(c: any, body: any) {
"organization.command.billing.status.set": async (c, body) => {
await setOrganizationBillingStatusMutation(c, body);
return { ok: true };
},
async commandUpsertInvoice(c: any, body: any) {
"organization.command.billing.invoice.upsert": async (c, body) => {
await upsertOrganizationInvoiceMutation(c, body);
return { ok: true };
},
async commandRecordSeatUsage(c: any, body: any) {
"organization.command.billing.seat_usage.record": async (c, body) => {
await recordOrganizationSeatUsageMutation(c, body);
return { ok: true };
},
};
export async function runOrganizationWorkflow(ctx: any): Promise<void> {
await ctx.loop("organization-command-loop", async (loopCtx: any) => {
const msg = await loopCtx.queue.next("next-organization-command", {
names: [...ORGANIZATION_QUEUE_NAMES],
completable: true,
});
if (!msg) {
return Loop.continue(undefined);
}
const handler = COMMAND_HANDLERS[msg.name as OrganizationQueueName];
if (!handler) {
logActorWarning("organization", "unknown organization command", { command: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
return Loop.continue(undefined);
}
try {
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
const result = await loopCtx.step({
name: msg.name,
timeout: 10 * 60_000,
run: async () => handler(loopCtx, msg.body),
});
try {
await msg.complete(result);
} catch (completeError) {
logActorWarning("organization", "organization workflow failed completing response", {
command: msg.name,
error: resolveErrorMessage(completeError),
});
}
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("organization", "organization workflow command failed", {
command: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {});
}
return Loop.continue(undefined);
});
}

View file

@ -1,4 +1,6 @@
import { actor } from "rivetkit";
// @ts-nocheck
import { actor, queue } from "rivetkit";
import { workflow, Loop } from "rivetkit/workflow";
import { e2b, sandboxActor } from "rivetkit/sandbox";
import { existsSync } from "node:fs";
import Dockerode from "dockerode";
@ -6,7 +8,9 @@ import { DEFAULT_WORKSPACE_MODEL_GROUPS, workspaceModelGroupsFromSandboxAgents,
import { SandboxAgent } from "sandbox-agent";
import { getActorRuntimeContext } from "../context.js";
import { organizationKey } from "../keys.js";
import { selfTaskSandbox } from "../handles.js";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { expectQueueResponse } from "../../services/queue.js";
import { resolveSandboxProviderId } from "../../sandbox-config.js";
const SANDBOX_REPO_CWD = "/home/user/repo";
@ -293,36 +297,165 @@ async function listWorkspaceModelGroupsForSandbox(c: any): Promise<WorkspaceMode
const baseActions = baseTaskSandbox.config.actions as Record<string, (c: any, ...args: any[]) => Promise<any>>;
// ---------------------------------------------------------------------------
// Queue names for sandbox actor
// ---------------------------------------------------------------------------
const SANDBOX_QUEUE_NAMES = [
"sandbox.command.createSession",
"sandbox.command.resumeOrCreateSession",
"sandbox.command.destroySession",
"sandbox.command.createProcess",
"sandbox.command.stopProcess",
"sandbox.command.killProcess",
"sandbox.command.deleteProcess",
] as const;
type SandboxQueueName = (typeof SANDBOX_QUEUE_NAMES)[number];
function sandboxWorkflowQueueName(name: SandboxQueueName): SandboxQueueName {
return name;
}
// ---------------------------------------------------------------------------
// Mutation handlers — executed inside the workflow command loop
// ---------------------------------------------------------------------------
async function createSessionMutation(c: any, request: any): Promise<any> {
const session = await baseActions.createSession(c, request);
const sessionId = typeof request?.id === "string" && request.id.length > 0 ? request.id : session?.id;
const modeId = modeIdForAgent(request?.agent);
if (sessionId && modeId) {
try {
await baseActions.rawSendSessionMethod(c, sessionId, "session/set_mode", { modeId });
} catch {
// Session mode updates are best-effort.
}
}
return sanitizeActorResult(session);
}
async function resumeOrCreateSessionMutation(c: any, request: any): Promise<any> {
return sanitizeActorResult(await baseActions.resumeOrCreateSession(c, request));
}
async function destroySessionMutation(c: any, sessionId: string): Promise<any> {
return sanitizeActorResult(await baseActions.destroySession(c, sessionId));
}
async function createProcessMutation(c: any, request: any): Promise<any> {
const created = await baseActions.createProcess(c, request);
await broadcastProcesses(c, baseActions);
return created;
}
async function runProcessMutation(c: any, request: any): Promise<any> {
const result = await baseActions.runProcess(c, request);
await broadcastProcesses(c, baseActions);
return result;
}
async function stopProcessMutation(c: any, processId: string, query?: any): Promise<any> {
const stopped = await baseActions.stopProcess(c, processId, query);
await broadcastProcesses(c, baseActions);
return stopped;
}
async function killProcessMutation(c: any, processId: string, query?: any): Promise<any> {
const killed = await baseActions.killProcess(c, processId, query);
await broadcastProcesses(c, baseActions);
return killed;
}
async function deleteProcessMutation(c: any, processId: string): Promise<void> {
await baseActions.deleteProcess(c, processId);
await broadcastProcesses(c, baseActions);
}
// ---------------------------------------------------------------------------
// Workflow command loop
// ---------------------------------------------------------------------------
type SandboxWorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
const SANDBOX_COMMAND_HANDLERS: Record<SandboxQueueName, SandboxWorkflowHandler> = {
"sandbox.command.createSession": async (c, body) => createSessionMutation(c, body),
"sandbox.command.resumeOrCreateSession": async (c, body) => resumeOrCreateSessionMutation(c, body),
"sandbox.command.destroySession": async (c, body) => destroySessionMutation(c, body?.sessionId),
"sandbox.command.createProcess": async (c, body) => createProcessMutation(c, body),
"sandbox.command.stopProcess": async (c, body) => stopProcessMutation(c, body?.processId, body?.query),
"sandbox.command.killProcess": async (c, body) => killProcessMutation(c, body?.processId, body?.query),
"sandbox.command.deleteProcess": async (c, body) => {
await deleteProcessMutation(c, body?.processId);
return { ok: true };
},
};
async function runSandboxWorkflow(ctx: any): Promise<void> {
await ctx.loop("sandbox-command-loop", async (loopCtx: any) => {
const msg = await loopCtx.queue.next("next-sandbox-command", {
names: [...SANDBOX_QUEUE_NAMES],
completable: true,
});
if (!msg) {
return Loop.continue(undefined);
}
const handler = SANDBOX_COMMAND_HANDLERS[msg.name as SandboxQueueName];
if (!handler) {
logActorWarning("taskSandbox", "unknown sandbox command", { command: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
return Loop.continue(undefined);
}
try {
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
const result = await loopCtx.step({
name: msg.name,
timeout: 10 * 60_000,
run: async () => handler(loopCtx, msg.body),
});
try {
await msg.complete(result);
} catch (completeError) {
logActorWarning("taskSandbox", "sandbox workflow failed completing response", {
command: msg.name,
error: resolveErrorMessage(completeError),
});
}
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("taskSandbox", "sandbox workflow command failed", {
command: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {});
}
return Loop.continue(undefined);
});
}
// ---------------------------------------------------------------------------
// Actor definition
// ---------------------------------------------------------------------------
export const taskSandbox = actor({
...baseTaskSandbox.config,
queues: Object.fromEntries(SANDBOX_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
...baseTaskSandbox.config.options,
actionTimeout: 10 * 60_000,
},
actions: {
...baseActions,
async createSession(c: any, request: any): Promise<any> {
const session = await baseActions.createSession(c, request);
const sessionId = typeof request?.id === "string" && request.id.length > 0 ? request.id : session?.id;
const modeId = modeIdForAgent(request?.agent);
if (sessionId && modeId) {
try {
await baseActions.rawSendSessionMethod(c, sessionId, "session/set_mode", { modeId });
} catch {
// Session mode updates are best-effort.
}
}
return sanitizeActorResult(session);
},
// Read actions — direct (no queue)
async resumeSession(c: any, sessionId: string): Promise<any> {
return sanitizeActorResult(await baseActions.resumeSession(c, sessionId));
},
async resumeOrCreateSession(c: any, request: any): Promise<any> {
return sanitizeActorResult(await baseActions.resumeOrCreateSession(c, request));
},
async getSession(c: any, sessionId: string): Promise<any> {
return sanitizeActorResult(await baseActions.getSession(c, sessionId));
},
@ -331,24 +464,6 @@ export const taskSandbox = actor({
return sanitizeActorResult(await baseActions.listSessions(c, query));
},
async destroySession(c: any, sessionId: string): Promise<any> {
return sanitizeActorResult(await baseActions.destroySession(c, sessionId));
},
async sendPrompt(c: any, request: { sessionId: string; prompt: string }): Promise<any> {
const text = typeof request?.prompt === "string" ? request.prompt.trim() : "";
if (!text) {
return null;
}
const session = await baseActions.resumeSession(c, request.sessionId);
if (!session || typeof session.prompt !== "function") {
throw new Error(`session '${request.sessionId}' not found`);
}
return sanitizeActorResult(await session.prompt([{ type: "text", text }]));
},
async listProcesses(c: any): Promise<any> {
try {
return await baseActions.listProcesses(c);
@ -362,35 +477,6 @@ export const taskSandbox = actor({
}
},
async createProcess(c: any, request: any): Promise<any> {
const created = await baseActions.createProcess(c, request);
await broadcastProcesses(c, baseActions);
return created;
},
async runProcess(c: any, request: any): Promise<any> {
const result = await baseActions.runProcess(c, request);
await broadcastProcesses(c, baseActions);
return result;
},
async stopProcess(c: any, processId: string, query?: any): Promise<any> {
const stopped = await baseActions.stopProcess(c, processId, query);
await broadcastProcesses(c, baseActions);
return stopped;
},
async killProcess(c: any, processId: string, query?: any): Promise<any> {
const killed = await baseActions.killProcess(c, processId, query);
await broadcastProcesses(c, baseActions);
return killed;
},
async deleteProcess(c: any, processId: string): Promise<void> {
await baseActions.deleteProcess(c, processId);
await broadcastProcesses(c, baseActions);
},
async sandboxAgentConnection(c: any): Promise<{ endpoint: string; token?: string }> {
const provider = await providerForConnection(c);
if (!provider || !c.state.sandboxId) {
@ -445,7 +531,73 @@ export const taskSandbox = actor({
async repoCwd(): Promise<{ cwd: string }> {
return { cwd: SANDBOX_REPO_CWD };
},
// Long-running action — kept as direct action to avoid blocking the
// workflow loop (prompt responses can take minutes).
async sendPrompt(c: any, request: { sessionId: string; prompt: string }): Promise<any> {
const text = typeof request?.prompt === "string" ? request.prompt.trim() : "";
if (!text) {
return null;
}
const session = await baseActions.resumeSession(c, request.sessionId);
if (!session || typeof session.prompt !== "function") {
throw new Error(`session '${request.sessionId}' not found`);
}
return sanitizeActorResult(await session.prompt([{ type: "text", text }]));
},
// Mutation actions — self-send to queue for workflow history
async createSession(c: any, request: any): Promise<any> {
const self = selfTaskSandbox(c);
return expectQueueResponse(await self.send(sandboxWorkflowQueueName("sandbox.command.createSession"), request ?? {}, { wait: true, timeout: 10_000 }));
},
async resumeOrCreateSession(c: any, request: any): Promise<any> {
const self = selfTaskSandbox(c);
return expectQueueResponse(
await self.send(sandboxWorkflowQueueName("sandbox.command.resumeOrCreateSession"), request ?? {}, { wait: true, timeout: 10_000 }),
);
},
async destroySession(c: any, sessionId: string): Promise<any> {
const self = selfTaskSandbox(c);
return expectQueueResponse(await self.send(sandboxWorkflowQueueName("sandbox.command.destroySession"), { sessionId }, { wait: true, timeout: 10_000 }));
},
async createProcess(c: any, request: any): Promise<any> {
const self = selfTaskSandbox(c);
return expectQueueResponse(await self.send(sandboxWorkflowQueueName("sandbox.command.createProcess"), request ?? {}, { wait: true, timeout: 10_000 }));
},
// runProcess kept as direct action — response can exceed 128KB queue limit
async runProcess(c: any, request: any): Promise<any> {
const result = await baseActions.runProcess(c, request);
await broadcastProcesses(c, baseActions);
return result;
},
async stopProcess(c: any, processId: string, query?: any): Promise<any> {
const self = selfTaskSandbox(c);
return expectQueueResponse(
await self.send(sandboxWorkflowQueueName("sandbox.command.stopProcess"), { processId, query }, { wait: true, timeout: 10_000 }),
);
},
async killProcess(c: any, processId: string, query?: any): Promise<any> {
const self = selfTaskSandbox(c);
return expectQueueResponse(
await self.send(sandboxWorkflowQueueName("sandbox.command.killProcess"), { processId, query }, { wait: true, timeout: 10_000 }),
);
},
async deleteProcess(c: any, processId: string): Promise<void> {
const self = selfTaskSandbox(c);
await self.send(sandboxWorkflowQueueName("sandbox.command.deleteProcess"), { processId }, { wait: false });
},
},
run: workflow(runSandboxWorkflow),
});
export { SANDBOX_REPO_CWD };

View file

@ -1,9 +1,11 @@
import { actor } from "rivetkit";
import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import type { TaskRecord } from "@sandbox-agent/foundry-shared";
import { taskDb } from "./db/db.js";
import { getCurrentRecord } from "./workflow/common.js";
import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js";
import { taskCommandActions } from "./workflow/index.js";
import { runTaskWorkflow } from "./workflow/index.js";
import { TASK_QUEUE_NAMES } from "./workflow/queue.js";
export interface TaskInput {
organizationId: string;
@ -13,6 +15,7 @@ export interface TaskInput {
export const task = actor({
db: taskDb,
queues: Object.fromEntries(TASK_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "Task",
icon: "wrench",
@ -39,9 +42,8 @@ export const task = actor({
async getSessionDetail(c, input: { sessionId: string; authSessionId?: string }) {
return await getSessionDetail(c, input.sessionId, input.authSessionId);
},
...taskCommandActions,
},
run: workflow(runTaskWorkflow),
});
export { taskWorkflowQueueName } from "./workflow/index.js";

View file

@ -1,4 +1,16 @@
// @ts-nocheck
/**
* Task workflow queue-based command loop.
*
* Mutations are dispatched through named queues and processed inside the
* workflow command loop so that every command appears in the RivetKit
* inspector's workflow history. Read actions remain direct (no queue).
*
* Callers send commands directly via `.send(taskWorkflowQueueName(...), ...)`.
*/
import { Loop } from "rivetkit/workflow";
import { logActorWarning, resolveErrorMessage } from "../../logging.js";
import { TASK_QUEUE_NAMES, type TaskQueueName, taskWorkflowQueueName } from "./queue.js";
import { getCurrentRecord } from "./common.js";
import { initBootstrapDbActivity, initCompleteActivity, initEnqueueProvisionActivity, initFailedActivity } from "./init.js";
import {
@ -35,241 +47,210 @@ import {
export { taskWorkflowQueueName } from "./queue.js";
/**
* Task command actions converted from queue/workflow handlers to direct actions.
* Each export becomes an action on the task actor.
*/
export const taskCommandActions = {
async initialize(c: any, body: any) {
await initBootstrapDbActivity(c, body);
await initEnqueueProvisionActivity(c, body);
return await getCurrentRecord(c);
// ---------------------------------------------------------------------------
// Workflow command loop — runs inside `run: workflow(runTaskWorkflow)`
// ---------------------------------------------------------------------------
type WorkflowHandler = (loopCtx: any, msg: any) => Promise<void>;
const COMMAND_HANDLERS: Record<TaskQueueName, WorkflowHandler> = {
"task.command.initialize": async (loopCtx, msg) => {
await initBootstrapDbActivity(loopCtx, msg.body);
await initEnqueueProvisionActivity(loopCtx, msg.body);
const record = await getCurrentRecord(loopCtx);
await msg.complete(record);
},
async provision(c: any, body: any) {
"task.command.provision": async (loopCtx, msg) => {
try {
await initCompleteActivity(c, body);
return { ok: true };
await initCompleteActivity(loopCtx, msg.body);
await msg.complete({ ok: true });
} catch (error) {
await initFailedActivity(c, error, body);
return { ok: false, error: resolveErrorMessage(error) };
await initFailedActivity(loopCtx, error, msg.body);
await msg.complete({ ok: false, error: resolveErrorMessage(error) });
}
},
async attach(c: any, body: any) {
// handleAttachActivity expects msg with complete — adapt
const result = { value: undefined as any };
const msg = {
name: "task.command.attach",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleAttachActivity(c, msg);
return result.value;
"task.command.attach": async (loopCtx, msg) => {
await handleAttachActivity(loopCtx, msg);
},
async switchTask(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.switch",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleSwitchActivity(c, msg);
return result.value;
"task.command.switch": async (loopCtx, msg) => {
await handleSwitchActivity(loopCtx, msg);
},
async push(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.push",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handlePushActivity(c, msg);
return result.value;
"task.command.push": async (loopCtx, msg) => {
await handlePushActivity(loopCtx, msg);
},
async sync(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.sync",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleSimpleCommandActivity(c, msg, "task.sync");
return result.value;
"task.command.sync": async (loopCtx, msg) => {
await handleSimpleCommandActivity(loopCtx, msg, "task.sync");
},
async merge(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.merge",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleSimpleCommandActivity(c, msg, "task.merge");
return result.value;
"task.command.merge": async (loopCtx, msg) => {
await handleSimpleCommandActivity(loopCtx, msg, "task.merge");
},
async archive(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.archive",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleArchiveActivity(c, msg);
return result.value;
"task.command.archive": async (loopCtx, msg) => {
await handleArchiveActivity(loopCtx, msg);
},
async kill(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.kill",
body,
complete: async (v: any) => {
result.value = v;
},
};
await killDestroySandboxActivity(c);
await killWriteDbActivity(c, msg);
return result.value;
"task.command.kill": async (loopCtx, msg) => {
await killDestroySandboxActivity(loopCtx);
await killWriteDbActivity(loopCtx, msg);
},
async getRecord(c: any, body: any) {
const result = { value: undefined as any };
const msg = {
name: "task.command.get",
body,
complete: async (v: any) => {
result.value = v;
},
};
await handleGetActivity(c, msg);
return result.value;
"task.command.get": async (loopCtx, msg) => {
await handleGetActivity(loopCtx, msg);
},
async pullRequestSync(c: any, body: any) {
await syncTaskPullRequest(c, body?.pullRequest ?? null);
return { ok: true };
"task.command.pull_request.sync": async (loopCtx, msg) => {
await syncTaskPullRequest(loopCtx, msg.body?.pullRequest ?? null);
await msg.complete({ ok: true });
},
async markUnread(c: any, body: any) {
await markWorkspaceUnread(c, body?.authSessionId);
return { ok: true };
"task.command.workspace.mark_unread": async (loopCtx, msg) => {
await markWorkspaceUnread(loopCtx, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
async renameTask(c: any, body: any) {
await renameWorkspaceTask(c, body.value);
return { ok: true };
"task.command.workspace.rename_task": async (loopCtx, msg) => {
await renameWorkspaceTask(loopCtx, msg.body.value);
await msg.complete({ ok: true });
},
async changeOwner(c: any, body: any) {
await changeTaskOwnerManually(c, {
primaryUserId: body.primaryUserId,
primaryGithubLogin: body.primaryGithubLogin,
primaryGithubEmail: body.primaryGithubEmail,
primaryGithubAvatarUrl: body.primaryGithubAvatarUrl ?? null,
});
return { ok: true };
"task.command.workspace.create_session": async (loopCtx, msg) => {
const result = await createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId);
await msg.complete(result);
},
async createSession(c: any, body: any) {
return await createWorkspaceSession(c, body?.model, body?.authSessionId);
},
async createSessionAndSend(c: any, body: any) {
"task.command.workspace.create_session_and_send": async (loopCtx, msg) => {
try {
const created = await createWorkspaceSession(c, body?.model, body?.authSessionId);
await sendWorkspaceMessage(c, created.sessionId, body.text, [], body?.authSessionId);
const created = await createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId);
await sendWorkspaceMessage(loopCtx, created.sessionId, msg.body.text, [], msg.body?.authSessionId);
} catch (error) {
logActorWarning("task.workflow", "create_session_and_send failed", {
error: resolveErrorMessage(error),
});
}
return { ok: true };
await msg.complete({ ok: true });
},
async ensureSession(c: any, body: any) {
await ensureWorkspaceSession(c, body.sessionId, body?.model, body?.authSessionId);
return { ok: true };
"task.command.workspace.ensure_session": async (loopCtx, msg) => {
await ensureWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
async renameSession(c: any, body: any) {
await renameWorkspaceSession(c, body.sessionId, body.title);
return { ok: true };
"task.command.workspace.rename_session": async (loopCtx, msg) => {
await renameWorkspaceSession(loopCtx, msg.body.sessionId, msg.body.title);
await msg.complete({ ok: true });
},
async selectSession(c: any, body: any) {
await selectWorkspaceSession(c, body.sessionId, body?.authSessionId);
return { ok: true };
"task.command.workspace.select_session": async (loopCtx, msg) => {
await selectWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
async setSessionUnread(c: any, body: any) {
await setWorkspaceSessionUnread(c, body.sessionId, body.unread, body?.authSessionId);
return { ok: true };
"task.command.workspace.set_session_unread": async (loopCtx, msg) => {
await setWorkspaceSessionUnread(loopCtx, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
async updateDraft(c: any, body: any) {
await updateWorkspaceDraft(c, body.sessionId, body.text, body.attachments, body?.authSessionId);
return { ok: true };
"task.command.workspace.update_draft": async (loopCtx, msg) => {
await updateWorkspaceDraft(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
async changeModel(c: any, body: any) {
await changeWorkspaceModel(c, body.sessionId, body.model, body?.authSessionId);
return { ok: true };
"task.command.workspace.change_model": async (loopCtx, msg) => {
await changeWorkspaceModel(loopCtx, msg.body.sessionId, msg.body.model, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
async sendMessage(c: any, body: any) {
await sendWorkspaceMessage(c, body.sessionId, body.text, body.attachments, body?.authSessionId);
return { ok: true };
"task.command.workspace.send_message": async (loopCtx, msg) => {
await sendWorkspaceMessage(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
async stopSession(c: any, body: any) {
await stopWorkspaceSession(c, body.sessionId);
return { ok: true };
"task.command.workspace.stop_session": async (loopCtx, msg) => {
await stopWorkspaceSession(loopCtx, msg.body.sessionId);
await msg.complete({ ok: true });
},
async syncSessionStatus(c: any, body: any) {
await syncWorkspaceSessionStatus(c, body.sessionId, body.status, body.at);
return { ok: true };
"task.command.workspace.sync_session_status": async (loopCtx, msg) => {
await syncWorkspaceSessionStatus(loopCtx, msg.body.sessionId, msg.body.status, msg.body.at);
await msg.complete({ ok: true });
},
async refreshDerived(c: any, _body: any) {
await refreshWorkspaceDerivedState(c);
return { ok: true };
"task.command.workspace.refresh_derived": async (loopCtx, msg) => {
await refreshWorkspaceDerivedState(loopCtx);
await msg.complete({ ok: true });
},
async refreshSessionTranscript(c: any, body: any) {
await refreshWorkspaceSessionTranscript(c, body.sessionId);
return { ok: true };
"task.command.workspace.refresh_session_transcript": async (loopCtx, msg) => {
await refreshWorkspaceSessionTranscript(loopCtx, msg.body.sessionId);
await msg.complete({ ok: true });
},
async closeSession(c: any, body: any) {
await closeWorkspaceSession(c, body.sessionId, body?.authSessionId);
return { ok: true };
"task.command.workspace.close_session": async (loopCtx, msg) => {
await closeWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId);
await msg.complete({ ok: true });
},
async publishPr(c: any, _body: any) {
await publishWorkspacePr(c);
return { ok: true };
"task.command.workspace.publish_pr": async (loopCtx, msg) => {
await publishWorkspacePr(loopCtx);
await msg.complete({ ok: true });
},
async revertFile(c: any, body: any) {
await revertWorkspaceFile(c, body.path);
return { ok: true };
"task.command.workspace.revert_file": async (loopCtx, msg) => {
await revertWorkspaceFile(loopCtx, msg.body.path);
await msg.complete({ ok: true });
},
"task.command.workspace.change_owner": async (loopCtx, msg) => {
await changeTaskOwnerManually(loopCtx, {
primaryUserId: msg.body.primaryUserId,
primaryGithubLogin: msg.body.primaryGithubLogin,
primaryGithubEmail: msg.body.primaryGithubEmail,
primaryGithubAvatarUrl: msg.body.primaryGithubAvatarUrl ?? null,
});
await msg.complete({ ok: true });
},
};
export async function runTaskWorkflow(ctx: any): Promise<void> {
await ctx.loop("task-command-loop", async (loopCtx: any) => {
const msg = await loopCtx.queue.next("next-task-command", {
names: [...TASK_QUEUE_NAMES],
completable: true,
});
if (!msg) {
return Loop.continue(undefined);
}
const handler = COMMAND_HANDLERS[msg.name as TaskQueueName];
if (!handler) {
logActorWarning("task.workflow", "unknown task command", { command: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
return Loop.continue(undefined);
}
try {
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
await loopCtx.step({
name: msg.name,
timeout: 10 * 60_000,
run: async () => handler(loopCtx, msg),
});
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("task.workflow", "task workflow command failed", {
command: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {});
}
return Loop.continue(undefined);
});
}

View file

@ -3,6 +3,7 @@ import { eq } from "drizzle-orm";
import { getActorRuntimeContext } from "../../context.js";
import { selfTask } from "../../handles.js";
import { resolveErrorMessage } from "../../logging.js";
import { taskWorkflowQueueName } from "./queue.js";
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
import { task as taskTable, taskRuntime } from "../db/schema.js";
import { TASK_ROW_ID, appendAuditLog, collectErrorMessages, resolveErrorDetail, setTaskState } from "./common.js";
@ -72,7 +73,7 @@ export async function initEnqueueProvisionActivity(loopCtx: any, body: any): Pro
const self = selfTask(loopCtx);
try {
void self.provision(body).catch(() => {});
void self.send(taskWorkflowQueueName("task.command.provision"), body ?? {}, { wait: false }).catch(() => {});
} catch (error) {
logActorWarning("task.init", "background provision command failed", {
organizationId: loopCtx.state.organizationId,

View file

@ -28,8 +28,11 @@ export const TASK_QUEUE_NAMES = [
"task.command.workspace.close_session",
"task.command.workspace.publish_pr",
"task.command.workspace.revert_file",
"task.command.workspace.change_owner",
] as const;
export type TaskQueueName = (typeof TASK_QUEUE_NAMES)[number];
export function taskWorkflowQueueName(name: string): string {
return name;
}

View file

@ -14,10 +14,12 @@ import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { SANDBOX_REPO_CWD } from "../sandbox/index.js";
import { resolveSandboxProviderId } from "../../sandbox-config.js";
import { getBetterAuthService } from "../../services/better-auth.js";
// expectQueueResponse removed — actions return values directly
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
import { githubRepoFullNameFromRemote } from "../../services/repo.js";
// organization actions called directly (no queue)
import { taskWorkflowQueueName } from "./workflow/queue.js";
import { expectQueueResponse } from "../../services/queue.js";
import { userWorkflowQueueName } from "../user/workflow.js";
import { organizationWorkflowQueueName } from "../organization/queues.js";
import { task as taskTable, taskOwner, taskRuntime, taskSandboxes, taskWorkspaceSessions } from "./db/schema.js";
import { getCurrentRecord } from "./workflow/common.js";
@ -123,9 +125,7 @@ function parseGitState(value: string | null | undefined): { fileChanges: Array<a
}
}
async function readTaskOwner(
c: any,
): Promise<{
async function readTaskOwner(c: any): Promise<{
primaryUserId: string | null;
primaryGithubLogin: string | null;
primaryGithubEmail: string | null;
@ -427,11 +427,17 @@ async function upsertUserTaskState(c: any, authSessionId: string | null | undefi
}
const user = await getOrCreateUser(c, userId);
await user.taskStateUpsert({
taskId: c.state.taskId,
sessionId,
patch,
});
expectQueueResponse(
await user.send(
userWorkflowQueueName("user.command.task_state.upsert"),
{
taskId: c.state.taskId,
sessionId,
patch,
},
{ wait: true, timeout: 10_000 },
),
);
}
async function deleteUserTaskState(c: any, authSessionId: string | null | undefined, sessionId: string): Promise<void> {
@ -446,10 +452,14 @@ async function deleteUserTaskState(c: any, authSessionId: string | null | undefi
}
const user = await getOrCreateUser(c, userId);
await user.taskStateDelete({
taskId: c.state.taskId,
sessionId,
});
await user.send(
userWorkflowQueueName("user.command.task_state.delete"),
{
taskId: c.state.taskId,
sessionId,
},
{ wait: true, timeout: 10_000 },
);
}
async function resolveDefaultModel(c: any, authSessionId?: string | null): Promise<string> {
@ -932,17 +942,13 @@ async function enqueueWorkspaceRefresh(
command: "task.command.workspace.refresh_derived" | "task.command.workspace.refresh_session_transcript",
body: Record<string, unknown>,
): Promise<void> {
// Call directly since we're inside the task actor (no queue needed)
if (command === "task.command.workspace.refresh_derived") {
void refreshWorkspaceDerivedState(c).catch(() => {});
} else {
void refreshWorkspaceSessionTranscript(c, body.sessionId as string).catch(() => {});
}
const self = selfTask(c);
await self.send(taskWorkflowQueueName(command as any), body, { wait: false });
}
async function enqueueWorkspaceEnsureSession(c: any, sessionId: string): Promise<void> {
// Call directly since we're inside the task actor
void ensureWorkspaceSession(c, sessionId).catch(() => {});
const self = selfTask(c);
await self.send(taskWorkflowQueueName("task.command.workspace.ensure_session" as any), { sessionId }, { wait: false });
}
function pendingWorkspaceSessionStatus(record: any): "pending_provision" | "pending_session_create" {
@ -1166,7 +1172,11 @@ export async function getSessionDetail(c: any, sessionId: string, authSessionId?
*/
export async function broadcastTaskUpdate(c: any, options?: { sessionId?: string }): Promise<void> {
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await organization.commandApplyTaskSummaryUpdate({ taskSummary: await buildTaskSummary(c) });
await organization.send(
organizationWorkflowQueueName("organization.command.applyTaskSummaryUpdate"),
{ taskSummary: await buildTaskSummary(c) },
{ wait: false },
);
c.broadcast("taskUpdated", {
type: "taskUpdated",
detail: await buildTaskDetail(c),
@ -1307,8 +1317,9 @@ export async function enqueuePendingWorkspaceSessions(c: any): Promise<void> {
(row) => row.closed !== true && row.status !== "ready" && row.status !== "error",
);
const self = selfTask(c);
for (const row of pending) {
void ensureWorkspaceSession(c, row.sessionId, row.model).catch(() => {});
await self.send(taskWorkflowQueueName("task.command.workspace.ensure_session" as any), { sessionId: row.sessionId, model: row.model }, { wait: false });
}
}

View file

@ -1,21 +1,13 @@
import { actor } from "rivetkit";
import { actor, queue } from "rivetkit";
import { workflow } from "rivetkit/workflow";
import { userDb } from "./db/db.js";
import { betterAuthActions } from "./actions/better-auth.js";
import { userActions } from "./actions/user.js";
import {
createAuthRecordMutation,
updateAuthRecordMutation,
updateManyAuthRecordsMutation,
deleteAuthRecordMutation,
deleteManyAuthRecordsMutation,
upsertUserProfileMutation,
upsertSessionStateMutation,
upsertTaskStateMutation,
deleteTaskStateMutation,
} from "./workflow.js";
import { USER_QUEUE_NAMES, runUserWorkflow } from "./workflow.js";
export const user = actor({
db: userDb,
queues: Object.fromEntries(USER_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "User",
icon: "shield",
@ -27,34 +19,6 @@ export const user = actor({
actions: {
...betterAuthActions,
...userActions,
async authCreate(c, body) {
return await createAuthRecordMutation(c, body);
},
async authUpdate(c, body) {
return await updateAuthRecordMutation(c, body);
},
async authUpdateMany(c, body) {
return await updateManyAuthRecordsMutation(c, body);
},
async authDelete(c, body) {
await deleteAuthRecordMutation(c, body);
return { ok: true };
},
async authDeleteMany(c, body) {
return await deleteManyAuthRecordsMutation(c, body);
},
async profileUpsert(c, body) {
return await upsertUserProfileMutation(c, body);
},
async sessionStateUpsert(c, body) {
return await upsertSessionStateMutation(c, body);
},
async taskStateUpsert(c, body) {
return await upsertTaskStateMutation(c, body);
},
async taskStateDelete(c, body) {
await deleteTaskStateMutation(c, body);
return { ok: true };
},
},
run: workflow(runUserWorkflow),
});

View file

@ -1,8 +1,45 @@
// @ts-nocheck
/**
* User workflow queue-based command loop.
*
* Auth mutation commands are dispatched through named queues and processed
* inside the workflow command loop for observability and replay semantics.
*/
import { eq, count as sqlCount, and } from "drizzle-orm";
import { Loop } from "rivetkit/workflow";
import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { selfUser } from "../handles.js";
import { expectQueueResponse } from "../../services/queue.js";
import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js";
import { buildWhere, columnFor, materializeRow, persistInput, persistPatch, tableFor } from "./query-helpers.js";
// ---------------------------------------------------------------------------
// Queue names
// ---------------------------------------------------------------------------
export const USER_QUEUE_NAMES = [
"user.command.auth.create",
"user.command.auth.update",
"user.command.auth.update_many",
"user.command.auth.delete",
"user.command.auth.delete_many",
"user.command.profile.upsert",
"user.command.session_state.upsert",
"user.command.task_state.upsert",
"user.command.task_state.delete",
] as const;
export type UserQueueName = (typeof USER_QUEUE_NAMES)[number];
export function userWorkflowQueueName(name: UserQueueName): UserQueueName {
return name;
}
// ---------------------------------------------------------------------------
// Mutation functions
// ---------------------------------------------------------------------------
export async function createAuthRecordMutation(c: any, input: { model: string; data: Record<string, unknown> }) {
const table = tableFor(input.model);
const persisted = persistInput(input.model, input.data);
@ -195,3 +232,66 @@ export async function deleteTaskStateMutation(c: any, input: { taskId: string; s
}
await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run();
}
// ---------------------------------------------------------------------------
// Workflow command loop
// ---------------------------------------------------------------------------
type WorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
const COMMAND_HANDLERS: Record<UserQueueName, WorkflowHandler> = {
"user.command.auth.create": async (c, body) => createAuthRecordMutation(c, body),
"user.command.auth.update": async (c, body) => updateAuthRecordMutation(c, body),
"user.command.auth.update_many": async (c, body) => updateManyAuthRecordsMutation(c, body),
"user.command.auth.delete": async (c, body) => {
await deleteAuthRecordMutation(c, body);
return { ok: true };
},
"user.command.auth.delete_many": async (c, body) => deleteManyAuthRecordsMutation(c, body),
"user.command.profile.upsert": async (c, body) => upsertUserProfileMutation(c, body),
"user.command.session_state.upsert": async (c, body) => upsertSessionStateMutation(c, body),
"user.command.task_state.upsert": async (c, body) => upsertTaskStateMutation(c, body),
"user.command.task_state.delete": async (c, body) => {
await deleteTaskStateMutation(c, body);
return { ok: true };
},
};
export async function runUserWorkflow(ctx: any): Promise<void> {
await ctx.loop("user-command-loop", async (loopCtx: any) => {
const msg = await loopCtx.queue.next("next-user-command", {
names: [...USER_QUEUE_NAMES],
completable: true,
});
if (!msg) {
return Loop.continue(undefined);
}
const handler = COMMAND_HANDLERS[msg.name as UserQueueName];
if (!handler) {
logActorWarning("user", "unknown user command", { command: msg.name });
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
return Loop.continue(undefined);
}
try {
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
const result = await loopCtx.step({
name: msg.name,
timeout: 60_000,
run: async () => handler(loopCtx, msg.body),
});
await msg.complete(result);
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("user", "user workflow command failed", {
command: msg.name,
error: message,
});
await msg.complete({ error: message }).catch(() => {});
}
return Loop.continue(undefined);
});
}