mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-15 03:00:48 +00:00
Merge pull request #265 from rivet-dev/revert-actions-to-queues
feat(foundry): revert actions to queue/workflow pattern
This commit is contained in:
commit
84a80d59d7
21 changed files with 1443 additions and 688 deletions
|
|
@ -7,10 +7,10 @@ We converted all actors from queue/workflow-based communication to direct action
|
|||
## Reference branches
|
||||
|
||||
- **`main`** at commit `32f3c6c3` — the original queue/workflow code BEFORE the actions refactor
|
||||
- **`queues-to-actions`** — the current refactored code using direct actions
|
||||
- **`task-owner-git-auth`** at commit `f45a4674` — the merged PR #262 that introduced the actions pattern
|
||||
- **`queues-to-actions`** — the actions refactor code with bug fixes (E2B, lazy tasks, etc.)
|
||||
- **`task-owner-git-auth`** at commit `3684e2e5` — the CURRENT branch with all work including task owner system, lazy tasks, and actions refactor
|
||||
|
||||
Use `main` as the reference for the queue/workflow communication patterns. Use `queues-to-actions` as the reference for bug fixes and new features that MUST be preserved.
|
||||
Use `main` as the reference for the queue/workflow communication patterns. Use `task-owner-git-auth` (current HEAD) as the authoritative source for ALL features and bug fixes that MUST be preserved — it has everything from `queues-to-actions` plus the task owner system.
|
||||
|
||||
## What to KEEP (do NOT revert these)
|
||||
|
||||
|
|
@ -60,6 +60,32 @@ These are bug fixes and improvements made during the actions refactor that are i
|
|||
- The audit-log actor was simplified to a single `append` action
|
||||
- Keep this simplification — audit-log doesn't need a workflow
|
||||
|
||||
### 11. Task owner (primary user) system
|
||||
- New `task_owner` single-row table in task actor DB schema (`foundry/packages/backend/src/actors/task/db/schema.ts`) — stores `primaryUserId`, `primaryGithubLogin`, `primaryGithubEmail`, `primaryGithubAvatarUrl`
|
||||
- New migration in `foundry/packages/backend/src/actors/task/db/migrations.ts` creating the `task_owner` table
|
||||
- `primaryUserLogin` and `primaryUserAvatarUrl` columns added to org's `taskSummaries` table (`foundry/packages/backend/src/actors/organization/db/schema.ts`) + corresponding migration
|
||||
- `readTaskOwner()`, `upsertTaskOwner()` helpers in `workspace.ts`
|
||||
- `maybeSwapTaskOwner()` — called from `sendWorkspaceMessage()`, checks if a different user is sending and swaps owner + injects git credentials into sandbox
|
||||
- `changeTaskOwnerManually()` — called from the new `changeOwner` action on the task actor, updates owner without injecting credentials (credentials injected on next message from that user)
|
||||
- `injectGitCredentials()` — pushes `git config user.name/email` + credential store file into the sandbox via `runProcess`
|
||||
- `resolveGithubIdentity()` — resolves user's GitHub login/email/avatar/accessToken from their auth session
|
||||
- `buildTaskSummary()` now includes `primaryUserLogin` and `primaryUserAvatarUrl` in the summary pushed to org coordinator
|
||||
- New `changeOwner` action on task actor in `workflow/index.ts`
|
||||
- New `changeWorkspaceTaskOwner` action on org actor in `actions/tasks.ts`
|
||||
- New `TaskWorkspaceChangeOwnerInput` type in shared types (`foundry/packages/shared/src/workspace.ts`)
|
||||
- `TaskSummary` type extended with `primaryUserLogin` and `primaryUserAvatarUrl`
|
||||
|
||||
### 12. Task owner UI
|
||||
- New "Overview" tab in right sidebar (`foundry/packages/frontend/src/components/mock-layout/right-sidebar.tsx`) — shows current owner with avatar, click to open dropdown of org members to change owner
|
||||
- `onChangeOwner` and `members` props added to `RightSidebar` component
|
||||
- Primary user login shown in green in left sidebar task items (`foundry/packages/frontend/src/components/mock-layout/sidebar.tsx`)
|
||||
- `changeWorkspaceTaskOwner` method added to backend client and workspace client interfaces
|
||||
|
||||
### 13. Client changes for task owner
|
||||
- `changeWorkspaceTaskOwner()` added to `backend-client.ts` and all workspace client implementations (mock, remote)
|
||||
- Mock workspace client implements the owner change
|
||||
- Subscription manager test updated for new task summary shape
|
||||
|
||||
## What to REVERT (communication pattern only)
|
||||
|
||||
For each actor, revert from direct action calls back to queue sends with `expectQueueResponse` / fire-and-forget patterns. The reference for the queue patterns is `main` at `32f3c6c3`.
|
||||
|
|
@ -86,6 +112,7 @@ For each actor, revert from direct action calls back to queue sends with `expect
|
|||
- Keep `requireWorkspaceTask` using `getOrCreate`
|
||||
- Keep `getTask` using `getOrCreate` with `resolveTaskRepoId`
|
||||
- Keep `getTaskIndexEntry`
|
||||
- Keep `changeWorkspaceTaskOwner` (new action — delegates to task actor's `changeOwner`)
|
||||
- Revert task actor calls from direct actions to queue sends where applicable
|
||||
|
||||
**`actions/task-mutations.ts`:**
|
||||
|
|
@ -109,9 +136,14 @@ For each actor, revert from direct action calls back to queue sends with `expect
|
|||
**`workflow/index.ts`:**
|
||||
- Restore `taskCommandActions` as queue handlers in the workflow command loop
|
||||
- Restore `TASK_QUEUE_NAMES` and dispatch map
|
||||
- Add `changeOwner` to the queue dispatch map (new command, not in `main` — add as `task.command.changeOwner`)
|
||||
|
||||
**`workspace.ts`:**
|
||||
- Revert sandbox/org action calls back to queue sends where they were queue-based before
|
||||
- Keep ALL task owner code: `readTaskOwner`, `upsertTaskOwner`, `maybeSwapTaskOwner`, `changeTaskOwnerManually`, `injectGitCredentials`, `resolveGithubIdentity`
|
||||
- Keep the `authSessionId` param added to `ensureSandboxRepo`
|
||||
- Keep the `maybeSwapTaskOwner` call in `sendWorkspaceMessage`
|
||||
- Keep `primaryUserLogin`/`primaryUserAvatarUrl` in `buildTaskSummary`
|
||||
|
||||
### 3. User actor (`foundry/packages/backend/src/actors/user/`)
|
||||
|
||||
|
|
@ -163,3 +195,8 @@ For each actor, revert from direct action calls back to queue sends with `expect
|
|||
- [ ] No 500 errors in backend logs (except expected E2B sandbox expiry)
|
||||
- [ ] Workflow history visible in RivetKit inspector for org, task, user actors
|
||||
- [ ] CLAUDE.md constraints still documented and respected
|
||||
- [ ] Task owner shows in right sidebar "Overview" tab
|
||||
- [ ] Owner dropdown shows org members and allows switching
|
||||
- [ ] Sending a message as a different user swaps the owner
|
||||
- [ ] Primary user login shown in green on sidebar task items
|
||||
- [ ] Git credentials injected into sandbox on owner swap (check `/home/user/.git-token` exists)
|
||||
|
|
|
|||
|
|
@ -198,6 +198,80 @@ curl -s -X POST 'http://127.0.0.1:6420/gateway/<actor-id>/inspector/action/<acti
|
|||
- `GET /inspector/queue` is reliable for checking pending messages.
|
||||
- `GET /inspector/state` is reliable for checking actor state.
|
||||
|
||||
## Inbox & Notification System
|
||||
|
||||
The user actor owns two per-user systems: a **task feed** (sidebar ordering) and **notifications** (discrete events). These are distinct concepts that share a common "bump" mechanism.
|
||||
|
||||
### Core distinction: bumps vs. notifications
|
||||
|
||||
A **bump** updates the task's position in the user's sidebar feed. A **notification** is a discrete event entry shown in the notification panel. Every notification also triggers a bump, but not every bump creates a notification.
|
||||
|
||||
| Event | Bumps task? | Creates notification? |
|
||||
|-------|-------------|----------------------|
|
||||
| User sends a message | Yes | No |
|
||||
| User opens/clicks a task | Yes | No |
|
||||
| User creates a session | Yes | No |
|
||||
| Agent finishes responding | Yes | Yes |
|
||||
| PR review requested | Yes | Yes |
|
||||
| PR merged | Yes | Yes |
|
||||
| PR comment added | Yes | Yes |
|
||||
| Agent error/needs input | Yes | Yes |
|
||||
|
||||
### Recipient resolution
|
||||
|
||||
Notifications and bumps go to the **task owner** only. Each task has exactly one owner at a time (the user who last sent a message or explicitly took ownership). This is an acceptable race condition — it rarely makes sense for two users to work on the same task simultaneously, and ownership transfer is explicit.
|
||||
|
||||
The system supports multiplayer (multiple users can view the same task), but the notification/bump target is always the single current owner. Each user has their own independent notification and unread state on their own user actor.
|
||||
|
||||
### Tables (on user actor)
|
||||
|
||||
Two new tables:
|
||||
|
||||
- **`userTaskFeed`** — one row per task. Tracks `bumpedAtMs` and `bumpReason` for sidebar sort order. Does NOT denormalize task content (title, repo, etc.) — the frontend queries the org actor for task content and uses the feed only for ordering/filtering.
|
||||
- **`userNotifications`** — discrete notification entries with `type`, `message`, `read` state, and optional `sessionId`. Retention: notifications are retained for a configurable number of days after being marked read, then cleaned up.
|
||||
|
||||
### Queue commands (user actor workflow)
|
||||
|
||||
- `user.bump_task` — upserts `userTaskFeed` row, no notification created. Used for user-initiated actions (send message, open task, create session).
|
||||
- `user.notify` — inserts `userNotifications` row AND upserts `userTaskFeed` (auto-bump). Used for system events (agent finished, PR review requested).
|
||||
- `user.mark_read` — marks notifications read for a given `(taskId, sessionId?)`. Also updates `userTaskState.unread` for the session.
|
||||
|
||||
### Data flow
|
||||
|
||||
Task actor (or org actor) resolves the current task owner, then sends to the owner's user actor queue:
|
||||
1. `user.notify(...)` for notification-worthy events (auto-bumps the feed)
|
||||
2. `user.bump_task(...)` for non-notification bumps (send message, open task)
|
||||
|
||||
The user actor processes the queue message, writes to its local tables, and broadcasts a `userFeedUpdated` event to connected clients.
|
||||
|
||||
### Sidebar architecture change
|
||||
|
||||
The left sidebar changes from showing the repo/PR tree to showing **recent tasks** ordered by `userTaskFeed.bumpedAtMs`. Two new buttons at the top of the sidebar:
|
||||
- **All Repositories** — navigates to a page showing the current repo + PR list (preserving existing functionality)
|
||||
- **Notifications** — navigates to a page showing the full notification list
|
||||
|
||||
The sidebar reads from two sources:
|
||||
- **User actor** (`userTaskFeed`) — provides sort order and "which tasks are relevant to this user"
|
||||
- **Org actor** (`taskSummaries`) — provides task content (title, status, branch, PR state, session summaries)
|
||||
|
||||
The frontend merges these: org snapshot gives task data, user feed gives sort order. Uses the existing subscription system (`useSubscription`) for both initial state fetch and streaming updates.
|
||||
|
||||
### `updatedAtMs` column semantics
|
||||
|
||||
The org actor's `taskSummaries.updatedAtMs` and the user actor's `userTaskFeed.bumpedAtMs` serve different purposes:
|
||||
- `taskSummaries.updatedAtMs` — updated by task actor push. Reflects the last time the task's global state changed (any mutation, any user). Used for "All Repositories" / "All Tasks" views.
|
||||
- `userTaskFeed.bumpedAtMs` — updated by bump/notify commands. Reflects the last time this specific user's attention was drawn to this task. Used for the per-user sidebar sort.
|
||||
|
||||
Add doc comments on both columns clarifying the update source.
|
||||
|
||||
### Unread semantics
|
||||
|
||||
Each user has independent unread state. The existing `userTaskState` table tracks per-`(taskId, sessionId)` unread state. When the user clicks a session:
|
||||
1. `userTaskState.unread` is set to 0 for that session
|
||||
2. All `userNotifications` rows matching `(taskId, sessionId)` are marked `read = 1`
|
||||
|
||||
These two unread systems must stay in sync via the `user.mark_read` queue command.
|
||||
|
||||
## Maintenance
|
||||
|
||||
- Keep this file up to date whenever actor ownership, hierarchy, or lifecycle responsibilities change.
|
||||
|
|
|
|||
|
|
@ -1,7 +1,10 @@
|
|||
// @ts-nocheck
|
||||
import { and, desc, eq } from "drizzle-orm";
|
||||
import { actor } from "rivetkit";
|
||||
import { actor, queue } from "rivetkit";
|
||||
import { workflow, Loop } from "rivetkit/workflow";
|
||||
import type { AuditLogEvent } from "@sandbox-agent/foundry-shared";
|
||||
import { selfAuditLog } from "../handles.js";
|
||||
import { logActorWarning, resolveErrorMessage } from "../logging.js";
|
||||
import { auditLogDb } from "./db/db.js";
|
||||
import { events } from "./db/schema.js";
|
||||
|
||||
|
|
@ -24,6 +27,91 @@ export interface ListAuditLogParams {
|
|||
limit?: number;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Queue names
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const AUDIT_LOG_QUEUE_NAMES = ["auditLog.command.append"] as const;
|
||||
|
||||
type AuditLogQueueName = (typeof AUDIT_LOG_QUEUE_NAMES)[number];
|
||||
|
||||
function auditLogWorkflowQueueName(name: AuditLogQueueName): AuditLogQueueName {
|
||||
return name;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mutation functions
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function appendMutation(c: any, body: AppendAuditLogCommand): Promise<{ ok: true }> {
|
||||
const now = Date.now();
|
||||
await c.db
|
||||
.insert(events)
|
||||
.values({
|
||||
repoId: body.repoId ?? null,
|
||||
taskId: body.taskId ?? null,
|
||||
branchName: body.branchName ?? null,
|
||||
kind: body.kind,
|
||||
payloadJson: JSON.stringify(body.payload),
|
||||
createdAt: now,
|
||||
})
|
||||
.run();
|
||||
return { ok: true };
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workflow command loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type AuditLogWorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
|
||||
|
||||
const AUDIT_LOG_COMMAND_HANDLERS: Record<AuditLogQueueName, AuditLogWorkflowHandler> = {
|
||||
"auditLog.command.append": async (c, body) => appendMutation(c, body),
|
||||
};
|
||||
|
||||
async function runAuditLogWorkflow(ctx: any): Promise<void> {
|
||||
await ctx.loop("audit-log-command-loop", async (loopCtx: any) => {
|
||||
const msg = await loopCtx.queue.next("next-audit-log-command", {
|
||||
names: [...AUDIT_LOG_QUEUE_NAMES],
|
||||
completable: true,
|
||||
});
|
||||
|
||||
if (!msg) {
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
const handler = AUDIT_LOG_COMMAND_HANDLERS[msg.name as AuditLogQueueName];
|
||||
if (!handler) {
|
||||
logActorWarning("auditLog", "unknown audit-log command", { command: msg.name });
|
||||
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
try {
|
||||
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
|
||||
const result = await loopCtx.step({
|
||||
name: msg.name,
|
||||
timeout: 60_000,
|
||||
run: async () => handler(loopCtx, msg.body),
|
||||
});
|
||||
await msg.complete(result);
|
||||
} catch (error) {
|
||||
const message = resolveErrorMessage(error);
|
||||
logActorWarning("auditLog", "audit-log workflow command failed", {
|
||||
command: msg.name,
|
||||
error: message,
|
||||
});
|
||||
await msg.complete({ error: message }).catch(() => {});
|
||||
}
|
||||
|
||||
return Loop.continue(undefined);
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Actor definition
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/**
|
||||
* Organization-scoped audit log. One per org, not one per repo.
|
||||
*
|
||||
|
|
@ -35,6 +123,7 @@ export interface ListAuditLogParams {
|
|||
*/
|
||||
export const auditLog = actor({
|
||||
db: auditLogDb,
|
||||
queues: Object.fromEntries(AUDIT_LOG_QUEUE_NAMES.map((name) => [name, queue()])),
|
||||
options: {
|
||||
name: "Audit Log",
|
||||
icon: "database",
|
||||
|
|
@ -43,22 +132,14 @@ export const auditLog = actor({
|
|||
organizationId: input.organizationId,
|
||||
}),
|
||||
actions: {
|
||||
async append(c, body: AppendAuditLogCommand): Promise<{ ok: true }> {
|
||||
const now = Date.now();
|
||||
await c.db
|
||||
.insert(events)
|
||||
.values({
|
||||
repoId: body.repoId ?? null,
|
||||
taskId: body.taskId ?? null,
|
||||
branchName: body.branchName ?? null,
|
||||
kind: body.kind,
|
||||
payloadJson: JSON.stringify(body.payload),
|
||||
createdAt: now,
|
||||
})
|
||||
.run();
|
||||
// Mutation — self-send to queue for workflow history
|
||||
async append(c: any, body: AppendAuditLogCommand): Promise<{ ok: true }> {
|
||||
const self = selfAuditLog(c);
|
||||
await self.send(auditLogWorkflowQueueName("auditLog.command.append"), body, { wait: false });
|
||||
return { ok: true };
|
||||
},
|
||||
|
||||
// Read — direct action (no queue)
|
||||
async list(c, params?: ListAuditLogParams): Promise<AuditLogEvent[]> {
|
||||
const whereParts = [];
|
||||
if (params?.repoId) {
|
||||
|
|
@ -95,4 +176,5 @@ export const auditLog = actor({
|
|||
}));
|
||||
},
|
||||
},
|
||||
run: workflow(runAuditLogWorkflow),
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,15 +1,17 @@
|
|||
// @ts-nocheck
|
||||
import { eq, inArray } from "drizzle-orm";
|
||||
import { actor } from "rivetkit";
|
||||
import { actor, queue } from "rivetkit";
|
||||
import { workflow, Loop } from "rivetkit/workflow";
|
||||
import type { FoundryOrganization } from "@sandbox-agent/foundry-shared";
|
||||
import { getActorRuntimeContext } from "../context.js";
|
||||
import { getOrCreateOrganization, getTask } from "../handles.js";
|
||||
import { logActorWarning, resolveErrorMessage } from "../logging.js";
|
||||
import { taskWorkflowQueueName } from "../task/workflow/queue.js";
|
||||
import { repoIdFromRemote } from "../../services/repo.js";
|
||||
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
|
||||
// actions called directly (no queue)
|
||||
import { organizationWorkflowQueueName } from "../organization/queues.js";
|
||||
import { githubDataDb } from "./db/db.js";
|
||||
import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js";
|
||||
// workflow.ts is no longer used — commands are actions now
|
||||
|
||||
const META_ROW_ID = 1;
|
||||
const SYNC_REPOSITORY_BATCH_SIZE = 10;
|
||||
|
|
@ -74,7 +76,19 @@ interface ClearStateInput {
|
|||
label: string;
|
||||
}
|
||||
|
||||
// sendOrganizationCommand removed — org actions called directly
|
||||
// Queue names for github-data actor
|
||||
export const GITHUB_DATA_QUEUE_NAMES = [
|
||||
"githubData.command.syncRepos",
|
||||
"githubData.command.handlePullRequestWebhook",
|
||||
"githubData.command.clearState",
|
||||
"githubData.command.reloadRepository",
|
||||
] as const;
|
||||
|
||||
type GithubDataQueueName = (typeof GITHUB_DATA_QUEUE_NAMES)[number];
|
||||
|
||||
export function githubDataWorkflowQueueName(name: GithubDataQueueName): GithubDataQueueName {
|
||||
return name;
|
||||
}
|
||||
|
||||
interface PullRequestWebhookInput {
|
||||
connectedAccount: string;
|
||||
|
|
@ -209,18 +223,22 @@ async function writeMeta(c: any, patch: Partial<GithubMetaState>) {
|
|||
async function publishSyncProgress(c: any, patch: Partial<GithubMetaState>): Promise<GithubMetaState> {
|
||||
const meta = await writeMeta(c, patch);
|
||||
const organization = await getOrCreateOrganization(c, c.state.organizationId);
|
||||
await organization.commandApplyGithubSyncProgress({
|
||||
connectedAccount: meta.connectedAccount,
|
||||
installationStatus: meta.installationStatus,
|
||||
installationId: meta.installationId,
|
||||
syncStatus: meta.syncStatus,
|
||||
lastSyncLabel: meta.lastSyncLabel,
|
||||
lastSyncAt: meta.lastSyncAt,
|
||||
syncGeneration: meta.syncGeneration,
|
||||
syncPhase: meta.syncPhase,
|
||||
processedRepositoryCount: meta.processedRepositoryCount,
|
||||
totalRepositoryCount: meta.totalRepositoryCount,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.github.sync_progress.apply"),
|
||||
{
|
||||
connectedAccount: meta.connectedAccount,
|
||||
installationStatus: meta.installationStatus,
|
||||
installationId: meta.installationId,
|
||||
syncStatus: meta.syncStatus,
|
||||
lastSyncLabel: meta.lastSyncLabel,
|
||||
lastSyncAt: meta.lastSyncAt,
|
||||
syncGeneration: meta.syncGeneration,
|
||||
syncPhase: meta.syncPhase,
|
||||
processedRepositoryCount: meta.processedRepositoryCount,
|
||||
totalRepositoryCount: meta.totalRepositoryCount,
|
||||
},
|
||||
{ wait: false },
|
||||
);
|
||||
return meta;
|
||||
}
|
||||
|
||||
|
|
@ -424,7 +442,13 @@ async function refreshTaskSummaryForBranch(c: any, repoId: string, branchName: s
|
|||
return;
|
||||
}
|
||||
const organization = await getOrCreateOrganization(c, c.state.organizationId);
|
||||
void organization.commandRefreshTaskSummaryForBranch({ repoId, branchName, pullRequest, repoName: repositoryRecord.fullName ?? undefined }).catch(() => {});
|
||||
void organization
|
||||
.send(
|
||||
organizationWorkflowQueueName("organization.command.refreshTaskSummaryForBranch"),
|
||||
{ repoId, branchName, pullRequest, repoName: repositoryRecord.fullName ?? undefined },
|
||||
{ wait: false },
|
||||
)
|
||||
.catch(() => {});
|
||||
}
|
||||
|
||||
async function emitPullRequestChangeEvents(c: any, beforeRows: any[], afterRows: any[]) {
|
||||
|
|
@ -472,7 +496,7 @@ async function autoArchiveTaskForClosedPullRequest(c: any, row: any) {
|
|||
}
|
||||
try {
|
||||
const task = getTask(c, c.state.organizationId, row.repoId, match.taskId);
|
||||
void task.archive({ reason: `PR ${String(row.state).toLowerCase()}` }).catch(() => {});
|
||||
void task.send(taskWorkflowQueueName("task.command.archive"), { reason: `PR ${String(row.state).toLowerCase()}` }, { wait: false }).catch(() => {});
|
||||
} catch {
|
||||
// Best-effort only. Task summary refresh will still clear the PR state.
|
||||
}
|
||||
|
|
@ -877,8 +901,79 @@ export async function fullSyncError(c: any, error: unknown): Promise<void> {
|
|||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workflow command loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type GithubDataWorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
|
||||
|
||||
const GITHUB_DATA_COMMAND_HANDLERS: Record<GithubDataQueueName, GithubDataWorkflowHandler> = {
|
||||
"githubData.command.syncRepos": async (c, body) => {
|
||||
try {
|
||||
await runFullSync(c, body);
|
||||
return { ok: true };
|
||||
} catch (error) {
|
||||
try {
|
||||
await fullSyncError(c, error);
|
||||
} catch {
|
||||
/* best effort */
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
"githubData.command.handlePullRequestWebhook": async (c, body) => {
|
||||
await handlePullRequestWebhookMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
"githubData.command.clearState": async (c, body) => {
|
||||
await clearStateMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
"githubData.command.reloadRepository": async (c, body) => reloadRepositoryMutation(c, body),
|
||||
};
|
||||
|
||||
async function runGithubDataWorkflow(ctx: any): Promise<void> {
|
||||
await ctx.loop("github-data-command-loop", async (loopCtx: any) => {
|
||||
const msg = await loopCtx.queue.next("next-github-data-command", {
|
||||
names: [...GITHUB_DATA_QUEUE_NAMES],
|
||||
completable: true,
|
||||
});
|
||||
|
||||
if (!msg) {
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
const handler = GITHUB_DATA_COMMAND_HANDLERS[msg.name as GithubDataQueueName];
|
||||
if (!handler) {
|
||||
logActorWarning("github-data", "unknown github-data command", { command: msg.name });
|
||||
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
try {
|
||||
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
|
||||
const result = await loopCtx.step({
|
||||
name: msg.name,
|
||||
timeout: 10 * 60_000,
|
||||
run: async () => handler(loopCtx, msg.body),
|
||||
});
|
||||
await msg.complete(result);
|
||||
} catch (error) {
|
||||
const message = resolveErrorMessage(error);
|
||||
logActorWarning("github-data", "github-data workflow command failed", {
|
||||
command: msg.name,
|
||||
error: message,
|
||||
});
|
||||
await msg.complete({ error: message }).catch(() => {});
|
||||
}
|
||||
|
||||
return Loop.continue(undefined);
|
||||
});
|
||||
}
|
||||
|
||||
export const githubData = actor({
|
||||
db: githubDataDb,
|
||||
queues: Object.fromEntries(GITHUB_DATA_QUEUE_NAMES.map((name) => [name, queue()])),
|
||||
options: {
|
||||
name: "GitHub Data",
|
||||
icon: "github",
|
||||
|
|
@ -945,35 +1040,8 @@ export const githubData = actor({
|
|||
}))
|
||||
.sort((left, right) => left.branchName.localeCompare(right.branchName));
|
||||
},
|
||||
|
||||
async syncRepos(c, body: any) {
|
||||
try {
|
||||
await runFullSync(c, body);
|
||||
return { ok: true };
|
||||
} catch (error) {
|
||||
try {
|
||||
await fullSyncError(c, error);
|
||||
} catch {
|
||||
/* best effort */
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
},
|
||||
|
||||
async reloadRepository(c, body: { repoId: string }) {
|
||||
return await reloadRepositoryMutation(c, body);
|
||||
},
|
||||
|
||||
async clearState(c, body: any) {
|
||||
await clearStateMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
|
||||
async handlePullRequestWebhook(c, body: any) {
|
||||
await handlePullRequestWebhookMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
},
|
||||
run: workflow(runGithubDataWorkflow),
|
||||
});
|
||||
|
||||
export async function reloadRepositoryMutation(c: any, input: { repoId: string }) {
|
||||
|
|
|
|||
|
|
@ -79,3 +79,7 @@ export function selfUser(c: any) {
|
|||
export function selfGithubData(c: any) {
|
||||
return actorClient(c).githubData.getForId(c.actorId);
|
||||
}
|
||||
|
||||
export function selfTaskSandbox(c: any) {
|
||||
return actorClient(c).taskSandbox.getForId(c.actorId);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import { desc } from "drizzle-orm";
|
||||
import type { FoundryAppSnapshot } from "@sandbox-agent/foundry-shared";
|
||||
import { getOrCreateGithubData, getOrCreateOrganization } from "../../handles.js";
|
||||
import { githubDataWorkflowQueueName } from "../../github-data/index.js";
|
||||
import { authSessionIndex } from "../db/schema.js";
|
||||
import {
|
||||
assertAppOrganization,
|
||||
|
|
@ -11,6 +12,7 @@ import {
|
|||
} from "../app-shell.js";
|
||||
import { getBetterAuthService } from "../../../services/better-auth.js";
|
||||
import { refreshOrganizationSnapshotMutation } from "../actions.js";
|
||||
import { organizationWorkflowQueueName } from "../queues.js";
|
||||
|
||||
export const organizationGithubActions = {
|
||||
async resolveAppGithubToken(
|
||||
|
|
@ -58,21 +60,27 @@ export const organizationGithubActions = {
|
|||
}
|
||||
|
||||
const organizationHandle = await getOrCreateOrganization(c, input.organizationId);
|
||||
await organizationHandle.commandMarkSyncStarted({ label: "Importing repository catalog..." });
|
||||
await organizationHandle.commandBroadcastSnapshot({});
|
||||
await organizationHandle.send(
|
||||
organizationWorkflowQueueName("organization.command.shell.sync_started.mark"),
|
||||
{ label: "Importing repository catalog..." },
|
||||
{ wait: false },
|
||||
);
|
||||
await organizationHandle.send(organizationWorkflowQueueName("organization.command.snapshot.broadcast"), {}, { wait: false });
|
||||
|
||||
void githubData.syncRepos({ label: "Importing repository catalog..." }).catch(() => {});
|
||||
void githubData
|
||||
.send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Importing repository catalog..." }, { wait: false })
|
||||
.catch(() => {});
|
||||
|
||||
return await buildAppSnapshot(c, input.sessionId);
|
||||
},
|
||||
|
||||
async adminReloadGithubOrganization(c: any): Promise<void> {
|
||||
const githubData = await getOrCreateGithubData(c, c.state.organizationId);
|
||||
await githubData.syncRepos({ label: "Reloading GitHub organization..." });
|
||||
await githubData.send(githubDataWorkflowQueueName("githubData.command.syncRepos"), { label: "Reloading GitHub organization..." }, { wait: false });
|
||||
},
|
||||
|
||||
async adminReloadGithubRepository(c: any, input: { repoId: string }): Promise<void> {
|
||||
const githubData = await getOrCreateGithubData(c, c.state.organizationId);
|
||||
await githubData.reloadRepository(input);
|
||||
await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), input, { wait: false });
|
||||
},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
import type { FoundryAppSnapshot, UpdateFoundryOrganizationProfileInput, WorkspaceModelId } from "@sandbox-agent/foundry-shared";
|
||||
import { getBetterAuthService } from "../../../services/better-auth.js";
|
||||
import { getOrCreateOrganization } from "../../handles.js";
|
||||
// actions called directly (no queue)
|
||||
import {
|
||||
assertAppOrganization,
|
||||
assertOrganizationShell,
|
||||
|
|
@ -11,7 +10,7 @@ import {
|
|||
requireEligibleOrganization,
|
||||
requireSignedInSession,
|
||||
} from "../app-shell.js";
|
||||
// org queue names removed — using direct actions
|
||||
import { organizationWorkflowQueueName } from "../queues.js";
|
||||
|
||||
export const organizationShellActions = {
|
||||
async getAppSnapshot(c: any, input: { sessionId: string }): Promise<FoundryAppSnapshot> {
|
||||
|
|
@ -35,11 +34,15 @@ export const organizationShellActions = {
|
|||
const session = await requireSignedInSession(c, input.sessionId);
|
||||
requireEligibleOrganization(session, input.organizationId);
|
||||
const organization = await getOrCreateOrganization(c, input.organizationId);
|
||||
await organization.commandUpdateShellProfile({
|
||||
displayName: input.displayName,
|
||||
slug: input.slug,
|
||||
primaryDomain: input.primaryDomain,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.shell.profile.update"),
|
||||
{
|
||||
displayName: input.displayName,
|
||||
slug: input.slug,
|
||||
primaryDomain: input.primaryDomain,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
return await buildAppSnapshot(c, input.sessionId);
|
||||
},
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../../service
|
|||
// actions return directly (no queue response unwrapping)
|
||||
import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js";
|
||||
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
|
||||
import { taskWorkflowQueueName } from "../../task/workflow/queue.js";
|
||||
import { expectQueueResponse } from "../../../services/queue.js";
|
||||
import { taskIndex, taskSummaries } from "../db/schema.js";
|
||||
import { refreshOrganizationSnapshotMutation } from "../actions.js";
|
||||
|
||||
|
|
@ -202,12 +204,18 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis
|
|||
throw error;
|
||||
}
|
||||
|
||||
const created = await taskHandle.initialize({
|
||||
sandboxProviderId: cmd.sandboxProviderId,
|
||||
branchName: initialBranchName,
|
||||
title: initialTitle,
|
||||
task: cmd.task,
|
||||
});
|
||||
const created = expectQueueResponse<TaskRecord>(
|
||||
await taskHandle.send(
|
||||
taskWorkflowQueueName("task.command.initialize"),
|
||||
{
|
||||
sandboxProviderId: cmd.sandboxProviderId,
|
||||
branchName: initialBranchName,
|
||||
title: initialTitle,
|
||||
task: cmd.task,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
|
||||
try {
|
||||
await upsertTaskSummary(c, await taskHandle.getTaskSummary({}));
|
||||
|
|
@ -384,7 +392,7 @@ export async function refreshTaskSummaryForBranchMutation(
|
|||
// Best-effort notify the task actor if it exists (fire-and-forget)
|
||||
try {
|
||||
const task = getTask(c, c.state.organizationId, input.repoId, row.taskId);
|
||||
void task.pullRequestSync({ pullRequest }).catch(() => {});
|
||||
void task.send(taskWorkflowQueueName("task.command.pull_request.sync"), { pullRequest }, { wait: false }).catch(() => {});
|
||||
} catch {
|
||||
// Task actor doesn't exist yet — that's fine, it's virtual
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ import { getActorRuntimeContext } from "../../context.js";
|
|||
import { getOrCreateAuditLog, getOrCreateTask, getTask as getTaskHandle } from "../../handles.js";
|
||||
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
|
||||
import { logActorWarning, resolveErrorMessage } from "../../logging.js";
|
||||
import { taskWorkflowQueueName } from "../../task/workflow/queue.js";
|
||||
import { expectQueueResponse } from "../../../services/queue.js";
|
||||
import { taskIndex, taskSummaries } from "../db/schema.js";
|
||||
import {
|
||||
createTaskMutation,
|
||||
|
|
@ -131,11 +133,15 @@ export const organizationTaskActions = {
|
|||
|
||||
const task = await requireWorkspaceTask(c, input.repoId, created.taskId);
|
||||
void task
|
||||
.createSessionAndSend({
|
||||
model: input.model,
|
||||
text: input.task,
|
||||
authSessionId: input.authSessionId,
|
||||
})
|
||||
.send(
|
||||
taskWorkflowQueueName("task.command.workspace.create_session_and_send"),
|
||||
{
|
||||
model: input.model,
|
||||
text: input.task,
|
||||
authSessionId: input.authSessionId,
|
||||
},
|
||||
{ wait: false },
|
||||
)
|
||||
.catch(() => {});
|
||||
|
||||
return { taskId: created.taskId };
|
||||
|
|
@ -143,94 +149,132 @@ export const organizationTaskActions = {
|
|||
|
||||
async markWorkspaceUnread(c: any, input: TaskWorkspaceSelectInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
await task.markUnread({ authSessionId: input.authSessionId });
|
||||
await task.send(taskWorkflowQueueName("task.command.workspace.mark_unread"), { authSessionId: input.authSessionId }, { wait: false });
|
||||
},
|
||||
|
||||
async renameWorkspaceTask(c: any, input: TaskWorkspaceRenameInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
await task.renameTask({ value: input.value });
|
||||
await task.send(taskWorkflowQueueName("task.command.workspace.rename_task"), { value: input.value }, { wait: false });
|
||||
},
|
||||
|
||||
async createWorkspaceSession(c: any, input: TaskWorkspaceSelectInput & { model?: string }): Promise<{ sessionId: string }> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
return await task.createSession({
|
||||
...(input.model ? { model: input.model } : {}),
|
||||
...(input.authSessionId ? { authSessionId: input.authSessionId } : {}),
|
||||
});
|
||||
return expectQueueResponse(
|
||||
await task.send(
|
||||
taskWorkflowQueueName("task.command.workspace.create_session"),
|
||||
{
|
||||
...(input.model ? { model: input.model } : {}),
|
||||
...(input.authSessionId ? { authSessionId: input.authSessionId } : {}),
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
},
|
||||
|
||||
async renameWorkspaceSession(c: any, input: TaskWorkspaceRenameSessionInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
await task.renameSession({ sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId });
|
||||
await task.send(
|
||||
taskWorkflowQueueName("task.command.workspace.rename_session"),
|
||||
{ sessionId: input.sessionId, title: input.title, authSessionId: input.authSessionId },
|
||||
{ wait: false },
|
||||
);
|
||||
},
|
||||
|
||||
async selectWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
await task.selectSession({ sessionId: input.sessionId, authSessionId: input.authSessionId });
|
||||
await task.send(
|
||||
taskWorkflowQueueName("task.command.workspace.select_session"),
|
||||
{ sessionId: input.sessionId, authSessionId: input.authSessionId },
|
||||
{ wait: false },
|
||||
);
|
||||
},
|
||||
|
||||
async setWorkspaceSessionUnread(c: any, input: TaskWorkspaceSetSessionUnreadInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
await task.setSessionUnread({ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId });
|
||||
await task.send(
|
||||
taskWorkflowQueueName("task.command.workspace.set_session_unread"),
|
||||
{ sessionId: input.sessionId, unread: input.unread, authSessionId: input.authSessionId },
|
||||
{ wait: false },
|
||||
);
|
||||
},
|
||||
|
||||
async updateWorkspaceDraft(c: any, input: TaskWorkspaceUpdateDraftInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
void task
|
||||
.updateDraft({
|
||||
sessionId: input.sessionId,
|
||||
text: input.text,
|
||||
attachments: input.attachments,
|
||||
authSessionId: input.authSessionId,
|
||||
})
|
||||
.send(
|
||||
taskWorkflowQueueName("task.command.workspace.update_draft"),
|
||||
{
|
||||
sessionId: input.sessionId,
|
||||
text: input.text,
|
||||
attachments: input.attachments,
|
||||
authSessionId: input.authSessionId,
|
||||
},
|
||||
{ wait: false },
|
||||
)
|
||||
.catch(() => {});
|
||||
},
|
||||
|
||||
async changeWorkspaceModel(c: any, input: TaskWorkspaceChangeModelInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
await task.changeModel({ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId });
|
||||
await task.send(
|
||||
taskWorkflowQueueName("task.command.workspace.change_model"),
|
||||
{ sessionId: input.sessionId, model: input.model, authSessionId: input.authSessionId },
|
||||
{ wait: false },
|
||||
);
|
||||
},
|
||||
|
||||
async sendWorkspaceMessage(c: any, input: TaskWorkspaceSendMessageInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
void task
|
||||
.sendMessage({
|
||||
sessionId: input.sessionId,
|
||||
text: input.text,
|
||||
attachments: input.attachments,
|
||||
authSessionId: input.authSessionId,
|
||||
})
|
||||
.send(
|
||||
taskWorkflowQueueName("task.command.workspace.send_message"),
|
||||
{
|
||||
sessionId: input.sessionId,
|
||||
text: input.text,
|
||||
attachments: input.attachments,
|
||||
authSessionId: input.authSessionId,
|
||||
},
|
||||
{ wait: false },
|
||||
)
|
||||
.catch(() => {});
|
||||
},
|
||||
|
||||
async stopWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
void task.stopSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {});
|
||||
void task
|
||||
.send(taskWorkflowQueueName("task.command.workspace.stop_session"), { sessionId: input.sessionId, authSessionId: input.authSessionId }, { wait: false })
|
||||
.catch(() => {});
|
||||
},
|
||||
|
||||
async closeWorkspaceSession(c: any, input: TaskWorkspaceSessionInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
void task.closeSession({ sessionId: input.sessionId, authSessionId: input.authSessionId }).catch(() => {});
|
||||
void task
|
||||
.send(taskWorkflowQueueName("task.command.workspace.close_session"), { sessionId: input.sessionId, authSessionId: input.authSessionId }, { wait: false })
|
||||
.catch(() => {});
|
||||
},
|
||||
|
||||
async publishWorkspacePr(c: any, input: TaskWorkspaceSelectInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
void task.publishPr({}).catch(() => {});
|
||||
void task.send(taskWorkflowQueueName("task.command.workspace.publish_pr"), {}, { wait: false }).catch(() => {});
|
||||
},
|
||||
|
||||
async changeWorkspaceTaskOwner(c: any, input: TaskWorkspaceChangeOwnerInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
await task.changeOwner({
|
||||
primaryUserId: input.targetUserId,
|
||||
primaryGithubLogin: input.targetUserName,
|
||||
primaryGithubEmail: input.targetUserEmail,
|
||||
primaryGithubAvatarUrl: null,
|
||||
});
|
||||
await task.send(
|
||||
taskWorkflowQueueName("task.command.workspace.change_owner"),
|
||||
{
|
||||
primaryUserId: input.targetUserId,
|
||||
primaryGithubLogin: input.targetUserName,
|
||||
primaryGithubEmail: input.targetUserEmail,
|
||||
primaryGithubAvatarUrl: null,
|
||||
},
|
||||
{ wait: false },
|
||||
);
|
||||
},
|
||||
|
||||
async revertWorkspaceFile(c: any, input: TaskWorkspaceDiffInput): Promise<void> {
|
||||
const task = await requireWorkspaceTask(c, input.repoId, input.taskId);
|
||||
void task.revertFile(input).catch(() => {});
|
||||
void task.send(taskWorkflowQueueName("task.command.workspace.revert_file"), input, { wait: false }).catch(() => {});
|
||||
},
|
||||
|
||||
async getRepoOverview(c: any, input: RepoOverviewInput): Promise<RepoOverview> {
|
||||
|
|
@ -250,7 +294,9 @@ export const organizationTaskActions = {
|
|||
async switchTask(c: any, input: { repoId: string; taskId: string }): Promise<SwitchResult> {
|
||||
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
|
||||
const record = await h.get();
|
||||
const switched = await h.switchTask({});
|
||||
const switched = expectQueueResponse<{ switchTarget: string | null }>(
|
||||
await h.send(taskWorkflowQueueName("task.command.switch"), {}, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
return {
|
||||
organizationId: c.state.organizationId,
|
||||
taskId: input.taskId,
|
||||
|
|
@ -288,42 +334,42 @@ export const organizationTaskActions = {
|
|||
assertOrganization(c, input.organizationId);
|
||||
|
||||
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
|
||||
return await h.attach({ reason: input.reason });
|
||||
return expectQueueResponse(await h.send(taskWorkflowQueueName("task.command.attach"), { reason: input.reason }, { wait: true, timeout: 10_000 }));
|
||||
},
|
||||
|
||||
async pushTask(c: any, input: TaskProxyActionInput): Promise<void> {
|
||||
assertOrganization(c, input.organizationId);
|
||||
|
||||
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
|
||||
void h.push({ reason: input.reason }).catch(() => {});
|
||||
void h.send(taskWorkflowQueueName("task.command.push"), { reason: input.reason }, { wait: false }).catch(() => {});
|
||||
},
|
||||
|
||||
async syncTask(c: any, input: TaskProxyActionInput): Promise<void> {
|
||||
assertOrganization(c, input.organizationId);
|
||||
|
||||
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
|
||||
void h.sync({ reason: input.reason }).catch(() => {});
|
||||
void h.send(taskWorkflowQueueName("task.command.sync"), { reason: input.reason }, { wait: false }).catch(() => {});
|
||||
},
|
||||
|
||||
async mergeTask(c: any, input: TaskProxyActionInput): Promise<void> {
|
||||
assertOrganization(c, input.organizationId);
|
||||
|
||||
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
|
||||
void h.merge({ reason: input.reason }).catch(() => {});
|
||||
void h.send(taskWorkflowQueueName("task.command.merge"), { reason: input.reason }, { wait: false }).catch(() => {});
|
||||
},
|
||||
|
||||
async archiveTask(c: any, input: TaskProxyActionInput): Promise<void> {
|
||||
assertOrganization(c, input.organizationId);
|
||||
|
||||
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
|
||||
void h.archive({ reason: input.reason }).catch(() => {});
|
||||
void h.send(taskWorkflowQueueName("task.command.archive"), { reason: input.reason }, { wait: false }).catch(() => {});
|
||||
},
|
||||
|
||||
async killTask(c: any, input: TaskProxyActionInput): Promise<void> {
|
||||
assertOrganization(c, input.organizationId);
|
||||
|
||||
const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId);
|
||||
void h.kill({ reason: input.reason }).catch(() => {});
|
||||
void h.send(taskWorkflowQueueName("task.command.kill"), { reason: input.reason }, { wait: false }).catch(() => {});
|
||||
},
|
||||
|
||||
async getRepositoryMetadata(c: any, input: { repoId: string }): Promise<{ defaultBranch: string | null; fullName: string | null; remoteUrl: string }> {
|
||||
|
|
|
|||
|
|
@ -17,6 +17,8 @@ import { GitHubAppError } from "../../services/app-github.js";
|
|||
import { getBetterAuthService } from "../../services/better-auth.js";
|
||||
import { repoIdFromRemote, repoLabelFromRemote } from "../../services/repo.js";
|
||||
import { logger } from "../../logging.js";
|
||||
import { githubDataWorkflowQueueName } from "../github-data/index.js";
|
||||
import { organizationWorkflowQueueName } from "./queues.js";
|
||||
import { invoices, organizationMembers, organizationProfile, seatAssignments, stripeLookup } from "./db/schema.js";
|
||||
import { APP_SHELL_ORGANIZATION_ID } from "./constants.js";
|
||||
|
||||
|
|
@ -482,19 +484,23 @@ async function syncGithubOrganizationsInternal(c: any, input: { sessionId: strin
|
|||
const organizationId = organizationOrganizationId(account.kind, account.githubLogin);
|
||||
const installation = installations.find((candidate) => candidate.accountLogin === account.githubLogin) ?? null;
|
||||
const organization = await getOrCreateOrganization(c, organizationId);
|
||||
await organization.commandSyncOrganizationShellFromGithub({
|
||||
userId: githubUserId,
|
||||
userName: viewer.name || viewer.login,
|
||||
userEmail: viewer.email ?? `${viewer.login}@users.noreply.github.com`,
|
||||
githubUserLogin: viewer.login,
|
||||
githubAccountId: account.githubAccountId,
|
||||
githubLogin: account.githubLogin,
|
||||
githubAccountType: account.githubAccountType,
|
||||
kind: account.kind,
|
||||
displayName: account.displayName,
|
||||
installationId: installation?.id ?? null,
|
||||
appConfigured: appShell.github.isAppConfigured(),
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.github.organization_shell.sync_from_github"),
|
||||
{
|
||||
userId: githubUserId,
|
||||
userName: viewer.name || viewer.login,
|
||||
userEmail: viewer.email ?? `${viewer.login}@users.noreply.github.com`,
|
||||
githubUserLogin: viewer.login,
|
||||
githubAccountId: account.githubAccountId,
|
||||
githubLogin: account.githubLogin,
|
||||
githubAccountType: account.githubAccountType,
|
||||
kind: account.kind,
|
||||
displayName: account.displayName,
|
||||
installationId: installation?.id ?? null,
|
||||
appConfigured: appShell.github.isAppConfigured(),
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
linkedOrganizationIds.push(organizationId);
|
||||
}
|
||||
|
||||
|
|
@ -677,10 +683,14 @@ async function applySubscriptionState(
|
|||
},
|
||||
fallbackPlanId: FoundryBillingPlanId,
|
||||
): Promise<void> {
|
||||
await organization.commandApplyStripeSubscription({
|
||||
subscription,
|
||||
fallbackPlanId,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.stripe_subscription.apply"),
|
||||
{
|
||||
subscription,
|
||||
fallbackPlanId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
|
||||
export const organizationAppActions = {
|
||||
|
|
@ -693,9 +703,13 @@ export const organizationAppActions = {
|
|||
const organizationState = await getOrganizationState(organizationHandle);
|
||||
|
||||
if (input.planId === "free") {
|
||||
await organizationHandle.commandApplyFreePlan({
|
||||
clearSubscription: false,
|
||||
});
|
||||
await organizationHandle.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.free_plan.apply"),
|
||||
{
|
||||
clearSubscription: false,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
return {
|
||||
url: `${appShell.appUrl}/organizations/${input.organizationId}/billing`,
|
||||
};
|
||||
|
|
@ -714,9 +728,13 @@ export const organizationAppActions = {
|
|||
email: session.currentUserEmail,
|
||||
})
|
||||
).id;
|
||||
await organizationHandle.commandApplyStripeCustomer({
|
||||
customerId,
|
||||
});
|
||||
await organizationHandle.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"),
|
||||
{
|
||||
customerId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
await upsertStripeLookupEntries(c, input.organizationId, customerId, null);
|
||||
}
|
||||
|
||||
|
|
@ -744,9 +762,13 @@ export const organizationAppActions = {
|
|||
const completion = await appShell.stripe.retrieveCheckoutCompletion(input.checkoutSessionId);
|
||||
|
||||
if (completion.customerId) {
|
||||
await organizationHandle.commandApplyStripeCustomer({
|
||||
customerId: completion.customerId,
|
||||
});
|
||||
await organizationHandle.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"),
|
||||
{
|
||||
customerId: completion.customerId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
await upsertStripeLookupEntries(c, input.organizationId, completion.customerId, completion.subscriptionId);
|
||||
|
||||
|
|
@ -756,9 +778,13 @@ export const organizationAppActions = {
|
|||
}
|
||||
|
||||
if (completion.paymentMethodLabel) {
|
||||
await organizationHandle.commandSetPaymentMethod({
|
||||
label: completion.paymentMethodLabel,
|
||||
});
|
||||
await organizationHandle.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.payment_method.set"),
|
||||
{
|
||||
label: completion.paymentMethodLabel,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
|
|
@ -796,9 +822,13 @@ export const organizationAppActions = {
|
|||
await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId);
|
||||
await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id);
|
||||
} else {
|
||||
await organizationHandle.commandSetBillingStatus({
|
||||
status: "scheduled_cancel",
|
||||
});
|
||||
await organizationHandle.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.status.set"),
|
||||
{
|
||||
status: "scheduled_cancel",
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
|
||||
return await buildAppSnapshot(c, input.sessionId);
|
||||
|
|
@ -817,9 +847,13 @@ export const organizationAppActions = {
|
|||
await applySubscriptionState(organizationHandle, subscription, organizationState.billingPlanId);
|
||||
await upsertStripeLookupEntries(c, input.organizationId, subscription.customerId ?? organizationState.stripeCustomerId, subscription.id);
|
||||
} else {
|
||||
await organizationHandle.commandSetBillingStatus({
|
||||
status: "active",
|
||||
});
|
||||
await organizationHandle.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.status.set"),
|
||||
{
|
||||
status: "active",
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
|
||||
return await buildAppSnapshot(c, input.sessionId);
|
||||
|
|
@ -830,9 +864,13 @@ export const organizationAppActions = {
|
|||
const session = await requireSignedInSession(c, input.sessionId);
|
||||
requireEligibleOrganization(session, input.organizationId);
|
||||
const organization = await getOrCreateOrganization(c, input.organizationId);
|
||||
await organization.commandRecordSeatUsage({
|
||||
email: session.currentUserEmail,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.seat_usage.record"),
|
||||
{
|
||||
email: session.currentUserEmail,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
return await buildAppSnapshot(c, input.sessionId);
|
||||
},
|
||||
|
||||
|
|
@ -853,9 +891,13 @@ export const organizationAppActions = {
|
|||
if (organizationId) {
|
||||
const organization = await getOrCreateOrganization(c, organizationId);
|
||||
if (typeof object.customer === "string") {
|
||||
await organization.commandApplyStripeCustomer({
|
||||
customerId: object.customer,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.stripe_customer.apply"),
|
||||
{
|
||||
customerId: object.customer,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
await upsertStripeLookupEntries(
|
||||
c,
|
||||
|
|
@ -888,9 +930,13 @@ export const organizationAppActions = {
|
|||
const organizationId = await findOrganizationIdForStripeEvent(c, subscription.customerId, subscription.id);
|
||||
if (organizationId) {
|
||||
const organization = await getOrCreateOrganization(c, organizationId);
|
||||
await organization.commandApplyFreePlan({
|
||||
clearSubscription: true,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.free_plan.apply"),
|
||||
{
|
||||
clearSubscription: true,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
return { ok: true };
|
||||
}
|
||||
|
|
@ -902,13 +948,17 @@ export const organizationAppActions = {
|
|||
const organization = await getOrCreateOrganization(c, organizationId);
|
||||
const rawAmount = typeof invoice.amount_paid === "number" ? invoice.amount_paid : invoice.amount_due;
|
||||
const amountUsd = Math.round((typeof rawAmount === "number" ? rawAmount : 0) / 100);
|
||||
await organization.commandUpsertInvoice({
|
||||
id: String(invoice.id),
|
||||
label: typeof invoice.number === "string" ? `Invoice ${invoice.number}` : "Stripe invoice",
|
||||
issuedAt: formatUnixDate(typeof invoice.created === "number" ? invoice.created : Math.floor(Date.now() / 1000)),
|
||||
amountUsd: Number.isFinite(amountUsd) ? amountUsd : 0,
|
||||
status: event.type === "invoice.paid" ? "paid" : "open",
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.billing.invoice.upsert"),
|
||||
{
|
||||
id: String(invoice.id),
|
||||
label: typeof invoice.number === "string" ? `Invoice ${invoice.number}` : "Stripe invoice",
|
||||
issuedAt: formatUnixDate(typeof invoice.created === "number" ? invoice.created : Math.floor(Date.now() / 1000)),
|
||||
amountUsd: Number.isFinite(amountUsd) ? amountUsd : 0,
|
||||
status: event.type === "invoice.paid" ? "paid" : "open",
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -938,12 +988,16 @@ export const organizationAppActions = {
|
|||
const organizationId = organizationOrganizationId(kind, accountLogin);
|
||||
const receivedAt = Date.now();
|
||||
const organization = await getOrCreateOrganization(c, organizationId);
|
||||
await organization.commandRecordGithubWebhookReceipt({
|
||||
organizationId: organizationId,
|
||||
event,
|
||||
action: body.action ?? null,
|
||||
receivedAt,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.github.webhook_receipt.record"),
|
||||
{
|
||||
organizationId: organizationId,
|
||||
event,
|
||||
action: body.action ?? null,
|
||||
receivedAt,
|
||||
},
|
||||
{ wait: false },
|
||||
);
|
||||
const githubData = await getOrCreateGithubData(c, organizationId);
|
||||
|
||||
if (event === "installation" && (body.action === "created" || body.action === "deleted" || body.action === "suspend" || body.action === "unsuspend")) {
|
||||
|
|
@ -957,40 +1011,56 @@ export const organizationAppActions = {
|
|||
"installation_event",
|
||||
);
|
||||
if (body.action === "deleted") {
|
||||
await githubData.clearState({
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "install_required",
|
||||
installationId: null,
|
||||
label: "GitHub App installation removed",
|
||||
});
|
||||
await githubData.send(
|
||||
githubDataWorkflowQueueName("githubData.command.clearState"),
|
||||
{
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "install_required",
|
||||
installationId: null,
|
||||
label: "GitHub App installation removed",
|
||||
},
|
||||
{ wait: false },
|
||||
);
|
||||
} else if (body.action === "created") {
|
||||
void githubData
|
||||
.syncRepos({
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
githubLogin: accountLogin,
|
||||
kind,
|
||||
label: "Syncing GitHub data from installation webhook...",
|
||||
})
|
||||
.send(
|
||||
githubDataWorkflowQueueName("githubData.command.syncRepos"),
|
||||
{
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
githubLogin: accountLogin,
|
||||
kind,
|
||||
label: "Syncing GitHub data from installation webhook...",
|
||||
},
|
||||
{ wait: false },
|
||||
)
|
||||
.catch(() => {});
|
||||
} else if (body.action === "suspend") {
|
||||
await githubData.clearState({
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "reconnect_required",
|
||||
installationId: body.installation?.id ?? null,
|
||||
label: "GitHub App installation suspended",
|
||||
});
|
||||
await githubData.send(
|
||||
githubDataWorkflowQueueName("githubData.command.clearState"),
|
||||
{
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "reconnect_required",
|
||||
installationId: body.installation?.id ?? null,
|
||||
label: "GitHub App installation suspended",
|
||||
},
|
||||
{ wait: false },
|
||||
);
|
||||
} else if (body.action === "unsuspend") {
|
||||
void githubData
|
||||
.syncRepos({
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
githubLogin: accountLogin,
|
||||
kind,
|
||||
label: "Resyncing GitHub data after unsuspend...",
|
||||
})
|
||||
.send(
|
||||
githubDataWorkflowQueueName("githubData.command.syncRepos"),
|
||||
{
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
githubLogin: accountLogin,
|
||||
kind,
|
||||
label: "Resyncing GitHub data after unsuspend...",
|
||||
},
|
||||
{ wait: false },
|
||||
)
|
||||
.catch(() => {});
|
||||
}
|
||||
return { ok: true };
|
||||
|
|
@ -1009,14 +1079,18 @@ export const organizationAppActions = {
|
|||
"repository_membership_changed",
|
||||
);
|
||||
void githubData
|
||||
.syncRepos({
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
githubLogin: accountLogin,
|
||||
kind,
|
||||
label: "Resyncing GitHub data after repository access change...",
|
||||
})
|
||||
.send(
|
||||
githubDataWorkflowQueueName("githubData.command.syncRepos"),
|
||||
{
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
githubLogin: accountLogin,
|
||||
kind,
|
||||
label: "Resyncing GitHub data after repository access change...",
|
||||
},
|
||||
{ wait: false },
|
||||
)
|
||||
.catch(() => {});
|
||||
return { ok: true };
|
||||
}
|
||||
|
|
@ -1045,35 +1119,39 @@ export const organizationAppActions = {
|
|||
"repository_event",
|
||||
);
|
||||
if (event === "pull_request" && body.repository?.clone_url && body.pull_request) {
|
||||
await githubData.handlePullRequestWebhook({
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
repository: {
|
||||
fullName: body.repository.full_name,
|
||||
cloneUrl: body.repository.clone_url,
|
||||
private: Boolean(body.repository.private),
|
||||
await githubData.send(
|
||||
githubDataWorkflowQueueName("githubData.command.handlePullRequestWebhook"),
|
||||
{
|
||||
connectedAccount: accountLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
repository: {
|
||||
fullName: body.repository.full_name,
|
||||
cloneUrl: body.repository.clone_url,
|
||||
private: Boolean(body.repository.private),
|
||||
},
|
||||
pullRequest: {
|
||||
number: body.pull_request.number,
|
||||
status: body.pull_request.draft ? "draft" : "ready",
|
||||
title: body.pull_request.title ?? "",
|
||||
body: body.pull_request.body ?? null,
|
||||
state: body.pull_request.state ?? "open",
|
||||
url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`,
|
||||
headRefName: body.pull_request.head?.ref ?? "",
|
||||
baseRefName: body.pull_request.base?.ref ?? "",
|
||||
authorLogin: body.pull_request.user?.login ?? null,
|
||||
isDraft: Boolean(body.pull_request.draft),
|
||||
merged: Boolean(body.pull_request.merged),
|
||||
},
|
||||
},
|
||||
pullRequest: {
|
||||
number: body.pull_request.number,
|
||||
status: body.pull_request.draft ? "draft" : "ready",
|
||||
title: body.pull_request.title ?? "",
|
||||
body: body.pull_request.body ?? null,
|
||||
state: body.pull_request.state ?? "open",
|
||||
url: body.pull_request.html_url ?? `https://github.com/${body.repository.full_name}/pull/${body.pull_request.number}`,
|
||||
headRefName: body.pull_request.head?.ref ?? "",
|
||||
baseRefName: body.pull_request.base?.ref ?? "",
|
||||
authorLogin: body.pull_request.user?.login ?? null,
|
||||
isDraft: Boolean(body.pull_request.draft),
|
||||
merged: Boolean(body.pull_request.merged),
|
||||
},
|
||||
});
|
||||
{ wait: false },
|
||||
);
|
||||
}
|
||||
if ((event === "push" || event === "create" || event === "delete") && body.repository?.clone_url) {
|
||||
const repoId = repoIdFromRemote(body.repository.clone_url);
|
||||
const knownRepository = await githubData.getRepository({ repoId });
|
||||
if (knownRepository) {
|
||||
await githubData.reloadRepository({ repoId });
|
||||
await githubData.send(githubDataWorkflowQueueName("githubData.command.reloadRepository"), { repoId }, { wait: false });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1232,14 +1310,18 @@ export async function syncOrganizationShellFromGithubMutation(
|
|||
if (needsInitialSync) {
|
||||
const githubData = await getOrCreateGithubData(c, organizationId);
|
||||
void githubData
|
||||
.syncRepos({
|
||||
connectedAccount: input.githubLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: input.installationId,
|
||||
githubLogin: input.githubLogin,
|
||||
kind: input.kind,
|
||||
label: "Initial repository sync...",
|
||||
})
|
||||
.send(
|
||||
githubDataWorkflowQueueName("githubData.command.syncRepos"),
|
||||
{
|
||||
connectedAccount: input.githubLogin,
|
||||
installationStatus: "connected",
|
||||
installationId: input.installationId,
|
||||
githubLogin: input.githubLogin,
|
||||
kind: input.kind,
|
||||
label: "Initial repository sync...",
|
||||
},
|
||||
{ wait: false },
|
||||
)
|
||||
.catch(() => {});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,10 +1,13 @@
|
|||
import { actor } from "rivetkit";
|
||||
import { actor, queue } from "rivetkit";
|
||||
import { workflow } from "rivetkit/workflow";
|
||||
import { organizationDb } from "./db/db.js";
|
||||
import { organizationActions } from "./actions.js";
|
||||
import { organizationCommandActions } from "./workflow.js";
|
||||
import { runOrganizationWorkflow } from "./workflow.js";
|
||||
import { ORGANIZATION_QUEUE_NAMES } from "./queues.js";
|
||||
|
||||
export const organization = actor({
|
||||
db: organizationDb,
|
||||
queues: Object.fromEntries(ORGANIZATION_QUEUE_NAMES.map((name) => [name, queue()])),
|
||||
options: {
|
||||
name: "Organization",
|
||||
icon: "compass",
|
||||
|
|
@ -15,6 +18,6 @@ export const organization = actor({
|
|||
}),
|
||||
actions: {
|
||||
...organizationActions,
|
||||
...organizationCommandActions,
|
||||
},
|
||||
run: workflow(runOrganizationWorkflow),
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,8 +1,17 @@
|
|||
// @ts-nocheck
|
||||
/**
|
||||
* Organization command actions — converted from queue handlers to direct actions.
|
||||
* Each export becomes an action on the organization actor.
|
||||
* Organization workflow — queue-based command loop.
|
||||
*
|
||||
* Mutations are dispatched through named queues and processed inside workflow
|
||||
* steps so that every command appears in the RivetKit inspector's workflow
|
||||
* history. Read actions remain direct (no queue).
|
||||
*
|
||||
* Callers send commands directly via `.send()` to the appropriate queue name.
|
||||
*/
|
||||
import { Loop } from "rivetkit/workflow";
|
||||
import { logActorWarning, resolveErrorMessage } from "../logging.js";
|
||||
import { ORGANIZATION_QUEUE_NAMES, type OrganizationQueueName } from "./queues.js";
|
||||
|
||||
import { applyGithubSyncProgressMutation, recordGithubWebhookReceiptMutation, refreshOrganizationSnapshotMutation } from "./actions.js";
|
||||
import {
|
||||
applyTaskSummaryUpdateMutation,
|
||||
|
|
@ -37,127 +46,164 @@ import {
|
|||
upsertOrganizationInvoiceMutation,
|
||||
} from "./app-shell.js";
|
||||
|
||||
export const organizationCommandActions = {
|
||||
async commandCreateTask(c: any, body: any) {
|
||||
return await createTaskMutation(c, body);
|
||||
},
|
||||
async commandMaterializeTask(c: any, body: any) {
|
||||
return await createTaskMutation(c, body);
|
||||
},
|
||||
async commandRegisterTaskBranch(c: any, body: any) {
|
||||
return await registerTaskBranchMutation(c, body);
|
||||
},
|
||||
async commandApplyTaskSummaryUpdate(c: any, body: any) {
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workflow command loop — runs inside `run: workflow(runOrganizationWorkflow)`
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type WorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
|
||||
|
||||
/**
|
||||
* Maps queue names to their mutation handlers.
|
||||
* Each handler receives the workflow loop context and the message body,
|
||||
* executes the mutation, and returns the result (which is sent back via
|
||||
* msg.complete).
|
||||
*/
|
||||
const COMMAND_HANDLERS: Record<OrganizationQueueName, WorkflowHandler> = {
|
||||
// Task mutations
|
||||
"organization.command.createTask": async (c, body) => createTaskMutation(c, body),
|
||||
"organization.command.materializeTask": async (c, body) => createTaskMutation(c, body),
|
||||
"organization.command.registerTaskBranch": async (c, body) => registerTaskBranchMutation(c, body),
|
||||
"organization.command.applyTaskSummaryUpdate": async (c, body) => {
|
||||
await applyTaskSummaryUpdateMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandRemoveTaskSummary(c: any, body: any) {
|
||||
"organization.command.removeTaskSummary": async (c, body) => {
|
||||
await removeTaskSummaryMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandRefreshTaskSummaryForBranch(c: any, body: any) {
|
||||
"organization.command.refreshTaskSummaryForBranch": async (c, body) => {
|
||||
await refreshTaskSummaryForBranchMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandBroadcastSnapshot(c: any, _body: any) {
|
||||
"organization.command.snapshot.broadcast": async (c, _body) => {
|
||||
await refreshOrganizationSnapshotMutation(c);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandSyncGithubSession(c: any, body: any) {
|
||||
"organization.command.syncGithubSession": async (c, body) => {
|
||||
const { syncGithubOrganizations } = await import("./app-shell.js");
|
||||
await syncGithubOrganizations(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
|
||||
// Better Auth index actions
|
||||
async commandBetterAuthSessionIndexUpsert(c: any, body: any) {
|
||||
return await betterAuthUpsertSessionIndexMutation(c, body);
|
||||
},
|
||||
async commandBetterAuthSessionIndexDelete(c: any, body: any) {
|
||||
// Better Auth index mutations
|
||||
"organization.command.better_auth.session_index.upsert": async (c, body) => betterAuthUpsertSessionIndexMutation(c, body),
|
||||
"organization.command.better_auth.session_index.delete": async (c, body) => {
|
||||
await betterAuthDeleteSessionIndexMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandBetterAuthEmailIndexUpsert(c: any, body: any) {
|
||||
return await betterAuthUpsertEmailIndexMutation(c, body);
|
||||
},
|
||||
async commandBetterAuthEmailIndexDelete(c: any, body: any) {
|
||||
"organization.command.better_auth.email_index.upsert": async (c, body) => betterAuthUpsertEmailIndexMutation(c, body),
|
||||
"organization.command.better_auth.email_index.delete": async (c, body) => {
|
||||
await betterAuthDeleteEmailIndexMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandBetterAuthAccountIndexUpsert(c: any, body: any) {
|
||||
return await betterAuthUpsertAccountIndexMutation(c, body);
|
||||
},
|
||||
async commandBetterAuthAccountIndexDelete(c: any, body: any) {
|
||||
"organization.command.better_auth.account_index.upsert": async (c, body) => betterAuthUpsertAccountIndexMutation(c, body),
|
||||
"organization.command.better_auth.account_index.delete": async (c, body) => {
|
||||
await betterAuthDeleteAccountIndexMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandBetterAuthVerificationCreate(c: any, body: any) {
|
||||
return await betterAuthCreateVerificationMutation(c, body);
|
||||
},
|
||||
async commandBetterAuthVerificationUpdate(c: any, body: any) {
|
||||
return await betterAuthUpdateVerificationMutation(c, body);
|
||||
},
|
||||
async commandBetterAuthVerificationUpdateMany(c: any, body: any) {
|
||||
return await betterAuthUpdateManyVerificationMutation(c, body);
|
||||
},
|
||||
async commandBetterAuthVerificationDelete(c: any, body: any) {
|
||||
"organization.command.better_auth.verification.create": async (c, body) => betterAuthCreateVerificationMutation(c, body),
|
||||
"organization.command.better_auth.verification.update": async (c, body) => betterAuthUpdateVerificationMutation(c, body),
|
||||
"organization.command.better_auth.verification.update_many": async (c, body) => betterAuthUpdateManyVerificationMutation(c, body),
|
||||
"organization.command.better_auth.verification.delete": async (c, body) => {
|
||||
await betterAuthDeleteVerificationMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandBetterAuthVerificationDeleteMany(c: any, body: any) {
|
||||
return await betterAuthDeleteManyVerificationMutation(c, body);
|
||||
},
|
||||
"organization.command.better_auth.verification.delete_many": async (c, body) => betterAuthDeleteManyVerificationMutation(c, body),
|
||||
|
||||
// GitHub sync actions
|
||||
async commandApplyGithubSyncProgress(c: any, body: any) {
|
||||
// GitHub sync mutations
|
||||
"organization.command.github.sync_progress.apply": async (c, body) => {
|
||||
await applyGithubSyncProgressMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandRecordGithubWebhookReceipt(c: any, body: any) {
|
||||
"organization.command.github.webhook_receipt.record": async (c, body) => {
|
||||
await recordGithubWebhookReceiptMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandSyncOrganizationShellFromGithub(c: any, body: any) {
|
||||
return await syncOrganizationShellFromGithubMutation(c, body);
|
||||
},
|
||||
"organization.command.github.organization_shell.sync_from_github": async (c, body) => syncOrganizationShellFromGithubMutation(c, body),
|
||||
|
||||
// Shell/profile actions
|
||||
async commandUpdateShellProfile(c: any, body: any) {
|
||||
// Shell/profile mutations
|
||||
"organization.command.shell.profile.update": async (c, body) => {
|
||||
await updateOrganizationShellProfileMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandMarkSyncStarted(c: any, body: any) {
|
||||
"organization.command.shell.sync_started.mark": async (c, body) => {
|
||||
await markOrganizationSyncStartedMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
|
||||
// Billing actions
|
||||
async commandApplyStripeCustomer(c: any, body: any) {
|
||||
// Billing mutations
|
||||
"organization.command.billing.stripe_customer.apply": async (c, body) => {
|
||||
await applyOrganizationStripeCustomerMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandApplyStripeSubscription(c: any, body: any) {
|
||||
"organization.command.billing.stripe_subscription.apply": async (c, body) => {
|
||||
await applyOrganizationStripeSubscriptionMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandApplyFreePlan(c: any, body: any) {
|
||||
"organization.command.billing.free_plan.apply": async (c, body) => {
|
||||
await applyOrganizationFreePlanMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandSetPaymentMethod(c: any, body: any) {
|
||||
"organization.command.billing.payment_method.set": async (c, body) => {
|
||||
await setOrganizationBillingPaymentMethodMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandSetBillingStatus(c: any, body: any) {
|
||||
"organization.command.billing.status.set": async (c, body) => {
|
||||
await setOrganizationBillingStatusMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandUpsertInvoice(c: any, body: any) {
|
||||
"organization.command.billing.invoice.upsert": async (c, body) => {
|
||||
await upsertOrganizationInvoiceMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async commandRecordSeatUsage(c: any, body: any) {
|
||||
"organization.command.billing.seat_usage.record": async (c, body) => {
|
||||
await recordOrganizationSeatUsageMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
};
|
||||
|
||||
export async function runOrganizationWorkflow(ctx: any): Promise<void> {
|
||||
await ctx.loop("organization-command-loop", async (loopCtx: any) => {
|
||||
const msg = await loopCtx.queue.next("next-organization-command", {
|
||||
names: [...ORGANIZATION_QUEUE_NAMES],
|
||||
completable: true,
|
||||
});
|
||||
|
||||
if (!msg) {
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
const handler = COMMAND_HANDLERS[msg.name as OrganizationQueueName];
|
||||
if (!handler) {
|
||||
logActorWarning("organization", "unknown organization command", { command: msg.name });
|
||||
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
try {
|
||||
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
|
||||
const result = await loopCtx.step({
|
||||
name: msg.name,
|
||||
timeout: 10 * 60_000,
|
||||
run: async () => handler(loopCtx, msg.body),
|
||||
});
|
||||
try {
|
||||
await msg.complete(result);
|
||||
} catch (completeError) {
|
||||
logActorWarning("organization", "organization workflow failed completing response", {
|
||||
command: msg.name,
|
||||
error: resolveErrorMessage(completeError),
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
const message = resolveErrorMessage(error);
|
||||
logActorWarning("organization", "organization workflow command failed", {
|
||||
command: msg.name,
|
||||
error: message,
|
||||
});
|
||||
await msg.complete({ error: message }).catch(() => {});
|
||||
}
|
||||
|
||||
return Loop.continue(undefined);
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +1,6 @@
|
|||
import { actor } from "rivetkit";
|
||||
// @ts-nocheck
|
||||
import { actor, queue } from "rivetkit";
|
||||
import { workflow, Loop } from "rivetkit/workflow";
|
||||
import { e2b, sandboxActor } from "rivetkit/sandbox";
|
||||
import { existsSync } from "node:fs";
|
||||
import Dockerode from "dockerode";
|
||||
|
|
@ -6,7 +8,9 @@ import { DEFAULT_WORKSPACE_MODEL_GROUPS, workspaceModelGroupsFromSandboxAgents,
|
|||
import { SandboxAgent } from "sandbox-agent";
|
||||
import { getActorRuntimeContext } from "../context.js";
|
||||
import { organizationKey } from "../keys.js";
|
||||
import { selfTaskSandbox } from "../handles.js";
|
||||
import { logActorWarning, resolveErrorMessage } from "../logging.js";
|
||||
import { expectQueueResponse } from "../../services/queue.js";
|
||||
import { resolveSandboxProviderId } from "../../sandbox-config.js";
|
||||
|
||||
const SANDBOX_REPO_CWD = "/home/user/repo";
|
||||
|
|
@ -293,36 +297,165 @@ async function listWorkspaceModelGroupsForSandbox(c: any): Promise<WorkspaceMode
|
|||
|
||||
const baseActions = baseTaskSandbox.config.actions as Record<string, (c: any, ...args: any[]) => Promise<any>>;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Queue names for sandbox actor
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
const SANDBOX_QUEUE_NAMES = [
|
||||
"sandbox.command.createSession",
|
||||
"sandbox.command.resumeOrCreateSession",
|
||||
"sandbox.command.destroySession",
|
||||
"sandbox.command.createProcess",
|
||||
"sandbox.command.stopProcess",
|
||||
"sandbox.command.killProcess",
|
||||
"sandbox.command.deleteProcess",
|
||||
] as const;
|
||||
|
||||
type SandboxQueueName = (typeof SANDBOX_QUEUE_NAMES)[number];
|
||||
|
||||
function sandboxWorkflowQueueName(name: SandboxQueueName): SandboxQueueName {
|
||||
return name;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mutation handlers — executed inside the workflow command loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
async function createSessionMutation(c: any, request: any): Promise<any> {
|
||||
const session = await baseActions.createSession(c, request);
|
||||
const sessionId = typeof request?.id === "string" && request.id.length > 0 ? request.id : session?.id;
|
||||
const modeId = modeIdForAgent(request?.agent);
|
||||
if (sessionId && modeId) {
|
||||
try {
|
||||
await baseActions.rawSendSessionMethod(c, sessionId, "session/set_mode", { modeId });
|
||||
} catch {
|
||||
// Session mode updates are best-effort.
|
||||
}
|
||||
}
|
||||
return sanitizeActorResult(session);
|
||||
}
|
||||
|
||||
async function resumeOrCreateSessionMutation(c: any, request: any): Promise<any> {
|
||||
return sanitizeActorResult(await baseActions.resumeOrCreateSession(c, request));
|
||||
}
|
||||
|
||||
async function destroySessionMutation(c: any, sessionId: string): Promise<any> {
|
||||
return sanitizeActorResult(await baseActions.destroySession(c, sessionId));
|
||||
}
|
||||
|
||||
async function createProcessMutation(c: any, request: any): Promise<any> {
|
||||
const created = await baseActions.createProcess(c, request);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
return created;
|
||||
}
|
||||
|
||||
async function runProcessMutation(c: any, request: any): Promise<any> {
|
||||
const result = await baseActions.runProcess(c, request);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
return result;
|
||||
}
|
||||
|
||||
async function stopProcessMutation(c: any, processId: string, query?: any): Promise<any> {
|
||||
const stopped = await baseActions.stopProcess(c, processId, query);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
return stopped;
|
||||
}
|
||||
|
||||
async function killProcessMutation(c: any, processId: string, query?: any): Promise<any> {
|
||||
const killed = await baseActions.killProcess(c, processId, query);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
return killed;
|
||||
}
|
||||
|
||||
async function deleteProcessMutation(c: any, processId: string): Promise<void> {
|
||||
await baseActions.deleteProcess(c, processId);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workflow command loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type SandboxWorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
|
||||
|
||||
const SANDBOX_COMMAND_HANDLERS: Record<SandboxQueueName, SandboxWorkflowHandler> = {
|
||||
"sandbox.command.createSession": async (c, body) => createSessionMutation(c, body),
|
||||
"sandbox.command.resumeOrCreateSession": async (c, body) => resumeOrCreateSessionMutation(c, body),
|
||||
"sandbox.command.destroySession": async (c, body) => destroySessionMutation(c, body?.sessionId),
|
||||
"sandbox.command.createProcess": async (c, body) => createProcessMutation(c, body),
|
||||
"sandbox.command.stopProcess": async (c, body) => stopProcessMutation(c, body?.processId, body?.query),
|
||||
"sandbox.command.killProcess": async (c, body) => killProcessMutation(c, body?.processId, body?.query),
|
||||
"sandbox.command.deleteProcess": async (c, body) => {
|
||||
await deleteProcessMutation(c, body?.processId);
|
||||
return { ok: true };
|
||||
},
|
||||
};
|
||||
|
||||
async function runSandboxWorkflow(ctx: any): Promise<void> {
|
||||
await ctx.loop("sandbox-command-loop", async (loopCtx: any) => {
|
||||
const msg = await loopCtx.queue.next("next-sandbox-command", {
|
||||
names: [...SANDBOX_QUEUE_NAMES],
|
||||
completable: true,
|
||||
});
|
||||
|
||||
if (!msg) {
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
const handler = SANDBOX_COMMAND_HANDLERS[msg.name as SandboxQueueName];
|
||||
if (!handler) {
|
||||
logActorWarning("taskSandbox", "unknown sandbox command", { command: msg.name });
|
||||
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
try {
|
||||
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
|
||||
const result = await loopCtx.step({
|
||||
name: msg.name,
|
||||
timeout: 10 * 60_000,
|
||||
run: async () => handler(loopCtx, msg.body),
|
||||
});
|
||||
try {
|
||||
await msg.complete(result);
|
||||
} catch (completeError) {
|
||||
logActorWarning("taskSandbox", "sandbox workflow failed completing response", {
|
||||
command: msg.name,
|
||||
error: resolveErrorMessage(completeError),
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
const message = resolveErrorMessage(error);
|
||||
logActorWarning("taskSandbox", "sandbox workflow command failed", {
|
||||
command: msg.name,
|
||||
error: message,
|
||||
});
|
||||
await msg.complete({ error: message }).catch(() => {});
|
||||
}
|
||||
|
||||
return Loop.continue(undefined);
|
||||
});
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Actor definition
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export const taskSandbox = actor({
|
||||
...baseTaskSandbox.config,
|
||||
queues: Object.fromEntries(SANDBOX_QUEUE_NAMES.map((name) => [name, queue()])),
|
||||
options: {
|
||||
...baseTaskSandbox.config.options,
|
||||
actionTimeout: 10 * 60_000,
|
||||
},
|
||||
actions: {
|
||||
...baseActions,
|
||||
async createSession(c: any, request: any): Promise<any> {
|
||||
const session = await baseActions.createSession(c, request);
|
||||
const sessionId = typeof request?.id === "string" && request.id.length > 0 ? request.id : session?.id;
|
||||
const modeId = modeIdForAgent(request?.agent);
|
||||
if (sessionId && modeId) {
|
||||
try {
|
||||
await baseActions.rawSendSessionMethod(c, sessionId, "session/set_mode", { modeId });
|
||||
} catch {
|
||||
// Session mode updates are best-effort.
|
||||
}
|
||||
}
|
||||
return sanitizeActorResult(session);
|
||||
},
|
||||
|
||||
// Read actions — direct (no queue)
|
||||
async resumeSession(c: any, sessionId: string): Promise<any> {
|
||||
return sanitizeActorResult(await baseActions.resumeSession(c, sessionId));
|
||||
},
|
||||
|
||||
async resumeOrCreateSession(c: any, request: any): Promise<any> {
|
||||
return sanitizeActorResult(await baseActions.resumeOrCreateSession(c, request));
|
||||
},
|
||||
|
||||
async getSession(c: any, sessionId: string): Promise<any> {
|
||||
return sanitizeActorResult(await baseActions.getSession(c, sessionId));
|
||||
},
|
||||
|
|
@ -331,24 +464,6 @@ export const taskSandbox = actor({
|
|||
return sanitizeActorResult(await baseActions.listSessions(c, query));
|
||||
},
|
||||
|
||||
async destroySession(c: any, sessionId: string): Promise<any> {
|
||||
return sanitizeActorResult(await baseActions.destroySession(c, sessionId));
|
||||
},
|
||||
|
||||
async sendPrompt(c: any, request: { sessionId: string; prompt: string }): Promise<any> {
|
||||
const text = typeof request?.prompt === "string" ? request.prompt.trim() : "";
|
||||
if (!text) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const session = await baseActions.resumeSession(c, request.sessionId);
|
||||
if (!session || typeof session.prompt !== "function") {
|
||||
throw new Error(`session '${request.sessionId}' not found`);
|
||||
}
|
||||
|
||||
return sanitizeActorResult(await session.prompt([{ type: "text", text }]));
|
||||
},
|
||||
|
||||
async listProcesses(c: any): Promise<any> {
|
||||
try {
|
||||
return await baseActions.listProcesses(c);
|
||||
|
|
@ -362,35 +477,6 @@ export const taskSandbox = actor({
|
|||
}
|
||||
},
|
||||
|
||||
async createProcess(c: any, request: any): Promise<any> {
|
||||
const created = await baseActions.createProcess(c, request);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
return created;
|
||||
},
|
||||
|
||||
async runProcess(c: any, request: any): Promise<any> {
|
||||
const result = await baseActions.runProcess(c, request);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
return result;
|
||||
},
|
||||
|
||||
async stopProcess(c: any, processId: string, query?: any): Promise<any> {
|
||||
const stopped = await baseActions.stopProcess(c, processId, query);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
return stopped;
|
||||
},
|
||||
|
||||
async killProcess(c: any, processId: string, query?: any): Promise<any> {
|
||||
const killed = await baseActions.killProcess(c, processId, query);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
return killed;
|
||||
},
|
||||
|
||||
async deleteProcess(c: any, processId: string): Promise<void> {
|
||||
await baseActions.deleteProcess(c, processId);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
},
|
||||
|
||||
async sandboxAgentConnection(c: any): Promise<{ endpoint: string; token?: string }> {
|
||||
const provider = await providerForConnection(c);
|
||||
if (!provider || !c.state.sandboxId) {
|
||||
|
|
@ -445,7 +531,73 @@ export const taskSandbox = actor({
|
|||
async repoCwd(): Promise<{ cwd: string }> {
|
||||
return { cwd: SANDBOX_REPO_CWD };
|
||||
},
|
||||
|
||||
// Long-running action — kept as direct action to avoid blocking the
|
||||
// workflow loop (prompt responses can take minutes).
|
||||
async sendPrompt(c: any, request: { sessionId: string; prompt: string }): Promise<any> {
|
||||
const text = typeof request?.prompt === "string" ? request.prompt.trim() : "";
|
||||
if (!text) {
|
||||
return null;
|
||||
}
|
||||
|
||||
const session = await baseActions.resumeSession(c, request.sessionId);
|
||||
if (!session || typeof session.prompt !== "function") {
|
||||
throw new Error(`session '${request.sessionId}' not found`);
|
||||
}
|
||||
|
||||
return sanitizeActorResult(await session.prompt([{ type: "text", text }]));
|
||||
},
|
||||
|
||||
// Mutation actions — self-send to queue for workflow history
|
||||
async createSession(c: any, request: any): Promise<any> {
|
||||
const self = selfTaskSandbox(c);
|
||||
return expectQueueResponse(await self.send(sandboxWorkflowQueueName("sandbox.command.createSession"), request ?? {}, { wait: true, timeout: 10_000 }));
|
||||
},
|
||||
|
||||
async resumeOrCreateSession(c: any, request: any): Promise<any> {
|
||||
const self = selfTaskSandbox(c);
|
||||
return expectQueueResponse(
|
||||
await self.send(sandboxWorkflowQueueName("sandbox.command.resumeOrCreateSession"), request ?? {}, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
},
|
||||
|
||||
async destroySession(c: any, sessionId: string): Promise<any> {
|
||||
const self = selfTaskSandbox(c);
|
||||
return expectQueueResponse(await self.send(sandboxWorkflowQueueName("sandbox.command.destroySession"), { sessionId }, { wait: true, timeout: 10_000 }));
|
||||
},
|
||||
|
||||
async createProcess(c: any, request: any): Promise<any> {
|
||||
const self = selfTaskSandbox(c);
|
||||
return expectQueueResponse(await self.send(sandboxWorkflowQueueName("sandbox.command.createProcess"), request ?? {}, { wait: true, timeout: 10_000 }));
|
||||
},
|
||||
|
||||
// runProcess kept as direct action — response can exceed 128KB queue limit
|
||||
async runProcess(c: any, request: any): Promise<any> {
|
||||
const result = await baseActions.runProcess(c, request);
|
||||
await broadcastProcesses(c, baseActions);
|
||||
return result;
|
||||
},
|
||||
|
||||
async stopProcess(c: any, processId: string, query?: any): Promise<any> {
|
||||
const self = selfTaskSandbox(c);
|
||||
return expectQueueResponse(
|
||||
await self.send(sandboxWorkflowQueueName("sandbox.command.stopProcess"), { processId, query }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
},
|
||||
|
||||
async killProcess(c: any, processId: string, query?: any): Promise<any> {
|
||||
const self = selfTaskSandbox(c);
|
||||
return expectQueueResponse(
|
||||
await self.send(sandboxWorkflowQueueName("sandbox.command.killProcess"), { processId, query }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
},
|
||||
|
||||
async deleteProcess(c: any, processId: string): Promise<void> {
|
||||
const self = selfTaskSandbox(c);
|
||||
await self.send(sandboxWorkflowQueueName("sandbox.command.deleteProcess"), { processId }, { wait: false });
|
||||
},
|
||||
},
|
||||
run: workflow(runSandboxWorkflow),
|
||||
});
|
||||
|
||||
export { SANDBOX_REPO_CWD };
|
||||
|
|
|
|||
|
|
@ -1,9 +1,11 @@
|
|||
import { actor } from "rivetkit";
|
||||
import { actor, queue } from "rivetkit";
|
||||
import { workflow } from "rivetkit/workflow";
|
||||
import type { TaskRecord } from "@sandbox-agent/foundry-shared";
|
||||
import { taskDb } from "./db/db.js";
|
||||
import { getCurrentRecord } from "./workflow/common.js";
|
||||
import { getSessionDetail, getTaskDetail, getTaskSummary } from "./workspace.js";
|
||||
import { taskCommandActions } from "./workflow/index.js";
|
||||
import { runTaskWorkflow } from "./workflow/index.js";
|
||||
import { TASK_QUEUE_NAMES } from "./workflow/queue.js";
|
||||
|
||||
export interface TaskInput {
|
||||
organizationId: string;
|
||||
|
|
@ -13,6 +15,7 @@ export interface TaskInput {
|
|||
|
||||
export const task = actor({
|
||||
db: taskDb,
|
||||
queues: Object.fromEntries(TASK_QUEUE_NAMES.map((name) => [name, queue()])),
|
||||
options: {
|
||||
name: "Task",
|
||||
icon: "wrench",
|
||||
|
|
@ -39,9 +42,8 @@ export const task = actor({
|
|||
async getSessionDetail(c, input: { sessionId: string; authSessionId?: string }) {
|
||||
return await getSessionDetail(c, input.sessionId, input.authSessionId);
|
||||
},
|
||||
|
||||
...taskCommandActions,
|
||||
},
|
||||
run: workflow(runTaskWorkflow),
|
||||
});
|
||||
|
||||
export { taskWorkflowQueueName } from "./workflow/index.js";
|
||||
|
|
|
|||
|
|
@ -1,4 +1,16 @@
|
|||
// @ts-nocheck
|
||||
/**
|
||||
* Task workflow — queue-based command loop.
|
||||
*
|
||||
* Mutations are dispatched through named queues and processed inside the
|
||||
* workflow command loop so that every command appears in the RivetKit
|
||||
* inspector's workflow history. Read actions remain direct (no queue).
|
||||
*
|
||||
* Callers send commands directly via `.send(taskWorkflowQueueName(...), ...)`.
|
||||
*/
|
||||
import { Loop } from "rivetkit/workflow";
|
||||
import { logActorWarning, resolveErrorMessage } from "../../logging.js";
|
||||
import { TASK_QUEUE_NAMES, type TaskQueueName, taskWorkflowQueueName } from "./queue.js";
|
||||
import { getCurrentRecord } from "./common.js";
|
||||
import { initBootstrapDbActivity, initCompleteActivity, initEnqueueProvisionActivity, initFailedActivity } from "./init.js";
|
||||
import {
|
||||
|
|
@ -35,241 +47,210 @@ import {
|
|||
|
||||
export { taskWorkflowQueueName } from "./queue.js";
|
||||
|
||||
/**
|
||||
* Task command actions — converted from queue/workflow handlers to direct actions.
|
||||
* Each export becomes an action on the task actor.
|
||||
*/
|
||||
export const taskCommandActions = {
|
||||
async initialize(c: any, body: any) {
|
||||
await initBootstrapDbActivity(c, body);
|
||||
await initEnqueueProvisionActivity(c, body);
|
||||
return await getCurrentRecord(c);
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workflow command loop — runs inside `run: workflow(runTaskWorkflow)`
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type WorkflowHandler = (loopCtx: any, msg: any) => Promise<void>;
|
||||
|
||||
const COMMAND_HANDLERS: Record<TaskQueueName, WorkflowHandler> = {
|
||||
"task.command.initialize": async (loopCtx, msg) => {
|
||||
await initBootstrapDbActivity(loopCtx, msg.body);
|
||||
await initEnqueueProvisionActivity(loopCtx, msg.body);
|
||||
const record = await getCurrentRecord(loopCtx);
|
||||
await msg.complete(record);
|
||||
},
|
||||
|
||||
async provision(c: any, body: any) {
|
||||
"task.command.provision": async (loopCtx, msg) => {
|
||||
try {
|
||||
await initCompleteActivity(c, body);
|
||||
return { ok: true };
|
||||
await initCompleteActivity(loopCtx, msg.body);
|
||||
await msg.complete({ ok: true });
|
||||
} catch (error) {
|
||||
await initFailedActivity(c, error, body);
|
||||
return { ok: false, error: resolveErrorMessage(error) };
|
||||
await initFailedActivity(loopCtx, error, msg.body);
|
||||
await msg.complete({ ok: false, error: resolveErrorMessage(error) });
|
||||
}
|
||||
},
|
||||
|
||||
async attach(c: any, body: any) {
|
||||
// handleAttachActivity expects msg with complete — adapt
|
||||
const result = { value: undefined as any };
|
||||
const msg = {
|
||||
name: "task.command.attach",
|
||||
body,
|
||||
complete: async (v: any) => {
|
||||
result.value = v;
|
||||
},
|
||||
};
|
||||
await handleAttachActivity(c, msg);
|
||||
return result.value;
|
||||
"task.command.attach": async (loopCtx, msg) => {
|
||||
await handleAttachActivity(loopCtx, msg);
|
||||
},
|
||||
|
||||
async switchTask(c: any, body: any) {
|
||||
const result = { value: undefined as any };
|
||||
const msg = {
|
||||
name: "task.command.switch",
|
||||
body,
|
||||
complete: async (v: any) => {
|
||||
result.value = v;
|
||||
},
|
||||
};
|
||||
await handleSwitchActivity(c, msg);
|
||||
return result.value;
|
||||
"task.command.switch": async (loopCtx, msg) => {
|
||||
await handleSwitchActivity(loopCtx, msg);
|
||||
},
|
||||
|
||||
async push(c: any, body: any) {
|
||||
const result = { value: undefined as any };
|
||||
const msg = {
|
||||
name: "task.command.push",
|
||||
body,
|
||||
complete: async (v: any) => {
|
||||
result.value = v;
|
||||
},
|
||||
};
|
||||
await handlePushActivity(c, msg);
|
||||
return result.value;
|
||||
"task.command.push": async (loopCtx, msg) => {
|
||||
await handlePushActivity(loopCtx, msg);
|
||||
},
|
||||
|
||||
async sync(c: any, body: any) {
|
||||
const result = { value: undefined as any };
|
||||
const msg = {
|
||||
name: "task.command.sync",
|
||||
body,
|
||||
complete: async (v: any) => {
|
||||
result.value = v;
|
||||
},
|
||||
};
|
||||
await handleSimpleCommandActivity(c, msg, "task.sync");
|
||||
return result.value;
|
||||
"task.command.sync": async (loopCtx, msg) => {
|
||||
await handleSimpleCommandActivity(loopCtx, msg, "task.sync");
|
||||
},
|
||||
|
||||
async merge(c: any, body: any) {
|
||||
const result = { value: undefined as any };
|
||||
const msg = {
|
||||
name: "task.command.merge",
|
||||
body,
|
||||
complete: async (v: any) => {
|
||||
result.value = v;
|
||||
},
|
||||
};
|
||||
await handleSimpleCommandActivity(c, msg, "task.merge");
|
||||
return result.value;
|
||||
"task.command.merge": async (loopCtx, msg) => {
|
||||
await handleSimpleCommandActivity(loopCtx, msg, "task.merge");
|
||||
},
|
||||
|
||||
async archive(c: any, body: any) {
|
||||
const result = { value: undefined as any };
|
||||
const msg = {
|
||||
name: "task.command.archive",
|
||||
body,
|
||||
complete: async (v: any) => {
|
||||
result.value = v;
|
||||
},
|
||||
};
|
||||
await handleArchiveActivity(c, msg);
|
||||
return result.value;
|
||||
"task.command.archive": async (loopCtx, msg) => {
|
||||
await handleArchiveActivity(loopCtx, msg);
|
||||
},
|
||||
|
||||
async kill(c: any, body: any) {
|
||||
const result = { value: undefined as any };
|
||||
const msg = {
|
||||
name: "task.command.kill",
|
||||
body,
|
||||
complete: async (v: any) => {
|
||||
result.value = v;
|
||||
},
|
||||
};
|
||||
await killDestroySandboxActivity(c);
|
||||
await killWriteDbActivity(c, msg);
|
||||
return result.value;
|
||||
"task.command.kill": async (loopCtx, msg) => {
|
||||
await killDestroySandboxActivity(loopCtx);
|
||||
await killWriteDbActivity(loopCtx, msg);
|
||||
},
|
||||
|
||||
async getRecord(c: any, body: any) {
|
||||
const result = { value: undefined as any };
|
||||
const msg = {
|
||||
name: "task.command.get",
|
||||
body,
|
||||
complete: async (v: any) => {
|
||||
result.value = v;
|
||||
},
|
||||
};
|
||||
await handleGetActivity(c, msg);
|
||||
return result.value;
|
||||
"task.command.get": async (loopCtx, msg) => {
|
||||
await handleGetActivity(loopCtx, msg);
|
||||
},
|
||||
|
||||
async pullRequestSync(c: any, body: any) {
|
||||
await syncTaskPullRequest(c, body?.pullRequest ?? null);
|
||||
return { ok: true };
|
||||
"task.command.pull_request.sync": async (loopCtx, msg) => {
|
||||
await syncTaskPullRequest(loopCtx, msg.body?.pullRequest ?? null);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async markUnread(c: any, body: any) {
|
||||
await markWorkspaceUnread(c, body?.authSessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.mark_unread": async (loopCtx, msg) => {
|
||||
await markWorkspaceUnread(loopCtx, msg.body?.authSessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async renameTask(c: any, body: any) {
|
||||
await renameWorkspaceTask(c, body.value);
|
||||
return { ok: true };
|
||||
"task.command.workspace.rename_task": async (loopCtx, msg) => {
|
||||
await renameWorkspaceTask(loopCtx, msg.body.value);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async changeOwner(c: any, body: any) {
|
||||
await changeTaskOwnerManually(c, {
|
||||
primaryUserId: body.primaryUserId,
|
||||
primaryGithubLogin: body.primaryGithubLogin,
|
||||
primaryGithubEmail: body.primaryGithubEmail,
|
||||
primaryGithubAvatarUrl: body.primaryGithubAvatarUrl ?? null,
|
||||
});
|
||||
return { ok: true };
|
||||
"task.command.workspace.create_session": async (loopCtx, msg) => {
|
||||
const result = await createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId);
|
||||
await msg.complete(result);
|
||||
},
|
||||
|
||||
async createSession(c: any, body: any) {
|
||||
return await createWorkspaceSession(c, body?.model, body?.authSessionId);
|
||||
},
|
||||
|
||||
async createSessionAndSend(c: any, body: any) {
|
||||
"task.command.workspace.create_session_and_send": async (loopCtx, msg) => {
|
||||
try {
|
||||
const created = await createWorkspaceSession(c, body?.model, body?.authSessionId);
|
||||
await sendWorkspaceMessage(c, created.sessionId, body.text, [], body?.authSessionId);
|
||||
const created = await createWorkspaceSession(loopCtx, msg.body?.model, msg.body?.authSessionId);
|
||||
await sendWorkspaceMessage(loopCtx, created.sessionId, msg.body.text, [], msg.body?.authSessionId);
|
||||
} catch (error) {
|
||||
logActorWarning("task.workflow", "create_session_and_send failed", {
|
||||
error: resolveErrorMessage(error),
|
||||
});
|
||||
}
|
||||
return { ok: true };
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async ensureSession(c: any, body: any) {
|
||||
await ensureWorkspaceSession(c, body.sessionId, body?.model, body?.authSessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.ensure_session": async (loopCtx, msg) => {
|
||||
await ensureWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.model, msg.body?.authSessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async renameSession(c: any, body: any) {
|
||||
await renameWorkspaceSession(c, body.sessionId, body.title);
|
||||
return { ok: true };
|
||||
"task.command.workspace.rename_session": async (loopCtx, msg) => {
|
||||
await renameWorkspaceSession(loopCtx, msg.body.sessionId, msg.body.title);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async selectSession(c: any, body: any) {
|
||||
await selectWorkspaceSession(c, body.sessionId, body?.authSessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.select_session": async (loopCtx, msg) => {
|
||||
await selectWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async setSessionUnread(c: any, body: any) {
|
||||
await setWorkspaceSessionUnread(c, body.sessionId, body.unread, body?.authSessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.set_session_unread": async (loopCtx, msg) => {
|
||||
await setWorkspaceSessionUnread(loopCtx, msg.body.sessionId, msg.body.unread, msg.body?.authSessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async updateDraft(c: any, body: any) {
|
||||
await updateWorkspaceDraft(c, body.sessionId, body.text, body.attachments, body?.authSessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.update_draft": async (loopCtx, msg) => {
|
||||
await updateWorkspaceDraft(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async changeModel(c: any, body: any) {
|
||||
await changeWorkspaceModel(c, body.sessionId, body.model, body?.authSessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.change_model": async (loopCtx, msg) => {
|
||||
await changeWorkspaceModel(loopCtx, msg.body.sessionId, msg.body.model, msg.body?.authSessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async sendMessage(c: any, body: any) {
|
||||
await sendWorkspaceMessage(c, body.sessionId, body.text, body.attachments, body?.authSessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.send_message": async (loopCtx, msg) => {
|
||||
await sendWorkspaceMessage(loopCtx, msg.body.sessionId, msg.body.text, msg.body.attachments, msg.body?.authSessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async stopSession(c: any, body: any) {
|
||||
await stopWorkspaceSession(c, body.sessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.stop_session": async (loopCtx, msg) => {
|
||||
await stopWorkspaceSession(loopCtx, msg.body.sessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async syncSessionStatus(c: any, body: any) {
|
||||
await syncWorkspaceSessionStatus(c, body.sessionId, body.status, body.at);
|
||||
return { ok: true };
|
||||
"task.command.workspace.sync_session_status": async (loopCtx, msg) => {
|
||||
await syncWorkspaceSessionStatus(loopCtx, msg.body.sessionId, msg.body.status, msg.body.at);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async refreshDerived(c: any, _body: any) {
|
||||
await refreshWorkspaceDerivedState(c);
|
||||
return { ok: true };
|
||||
"task.command.workspace.refresh_derived": async (loopCtx, msg) => {
|
||||
await refreshWorkspaceDerivedState(loopCtx);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async refreshSessionTranscript(c: any, body: any) {
|
||||
await refreshWorkspaceSessionTranscript(c, body.sessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.refresh_session_transcript": async (loopCtx, msg) => {
|
||||
await refreshWorkspaceSessionTranscript(loopCtx, msg.body.sessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async closeSession(c: any, body: any) {
|
||||
await closeWorkspaceSession(c, body.sessionId, body?.authSessionId);
|
||||
return { ok: true };
|
||||
"task.command.workspace.close_session": async (loopCtx, msg) => {
|
||||
await closeWorkspaceSession(loopCtx, msg.body.sessionId, msg.body?.authSessionId);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async publishPr(c: any, _body: any) {
|
||||
await publishWorkspacePr(c);
|
||||
return { ok: true };
|
||||
"task.command.workspace.publish_pr": async (loopCtx, msg) => {
|
||||
await publishWorkspacePr(loopCtx);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
async revertFile(c: any, body: any) {
|
||||
await revertWorkspaceFile(c, body.path);
|
||||
return { ok: true };
|
||||
"task.command.workspace.revert_file": async (loopCtx, msg) => {
|
||||
await revertWorkspaceFile(loopCtx, msg.body.path);
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
"task.command.workspace.change_owner": async (loopCtx, msg) => {
|
||||
await changeTaskOwnerManually(loopCtx, {
|
||||
primaryUserId: msg.body.primaryUserId,
|
||||
primaryGithubLogin: msg.body.primaryGithubLogin,
|
||||
primaryGithubEmail: msg.body.primaryGithubEmail,
|
||||
primaryGithubAvatarUrl: msg.body.primaryGithubAvatarUrl ?? null,
|
||||
});
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
};
|
||||
|
||||
export async function runTaskWorkflow(ctx: any): Promise<void> {
|
||||
await ctx.loop("task-command-loop", async (loopCtx: any) => {
|
||||
const msg = await loopCtx.queue.next("next-task-command", {
|
||||
names: [...TASK_QUEUE_NAMES],
|
||||
completable: true,
|
||||
});
|
||||
|
||||
if (!msg) {
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
const handler = COMMAND_HANDLERS[msg.name as TaskQueueName];
|
||||
if (!handler) {
|
||||
logActorWarning("task.workflow", "unknown task command", { command: msg.name });
|
||||
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
try {
|
||||
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
|
||||
await loopCtx.step({
|
||||
name: msg.name,
|
||||
timeout: 10 * 60_000,
|
||||
run: async () => handler(loopCtx, msg),
|
||||
});
|
||||
} catch (error) {
|
||||
const message = resolveErrorMessage(error);
|
||||
logActorWarning("task.workflow", "task workflow command failed", {
|
||||
command: msg.name,
|
||||
error: message,
|
||||
});
|
||||
await msg.complete({ error: message }).catch(() => {});
|
||||
}
|
||||
|
||||
return Loop.continue(undefined);
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@ import { eq } from "drizzle-orm";
|
|||
import { getActorRuntimeContext } from "../../context.js";
|
||||
import { selfTask } from "../../handles.js";
|
||||
import { resolveErrorMessage } from "../../logging.js";
|
||||
import { taskWorkflowQueueName } from "./queue.js";
|
||||
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
|
||||
import { task as taskTable, taskRuntime } from "../db/schema.js";
|
||||
import { TASK_ROW_ID, appendAuditLog, collectErrorMessages, resolveErrorDetail, setTaskState } from "./common.js";
|
||||
|
|
@ -72,7 +73,7 @@ export async function initEnqueueProvisionActivity(loopCtx: any, body: any): Pro
|
|||
|
||||
const self = selfTask(loopCtx);
|
||||
try {
|
||||
void self.provision(body).catch(() => {});
|
||||
void self.send(taskWorkflowQueueName("task.command.provision"), body ?? {}, { wait: false }).catch(() => {});
|
||||
} catch (error) {
|
||||
logActorWarning("task.init", "background provision command failed", {
|
||||
organizationId: loopCtx.state.organizationId,
|
||||
|
|
|
|||
|
|
@ -28,8 +28,11 @@ export const TASK_QUEUE_NAMES = [
|
|||
"task.command.workspace.close_session",
|
||||
"task.command.workspace.publish_pr",
|
||||
"task.command.workspace.revert_file",
|
||||
"task.command.workspace.change_owner",
|
||||
] as const;
|
||||
|
||||
export type TaskQueueName = (typeof TASK_QUEUE_NAMES)[number];
|
||||
|
||||
export function taskWorkflowQueueName(name: string): string {
|
||||
return name;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,10 +14,12 @@ import { logActorWarning, resolveErrorMessage } from "../logging.js";
|
|||
import { SANDBOX_REPO_CWD } from "../sandbox/index.js";
|
||||
import { resolveSandboxProviderId } from "../../sandbox-config.js";
|
||||
import { getBetterAuthService } from "../../services/better-auth.js";
|
||||
// expectQueueResponse removed — actions return values directly
|
||||
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
|
||||
import { githubRepoFullNameFromRemote } from "../../services/repo.js";
|
||||
// organization actions called directly (no queue)
|
||||
import { taskWorkflowQueueName } from "./workflow/queue.js";
|
||||
import { expectQueueResponse } from "../../services/queue.js";
|
||||
import { userWorkflowQueueName } from "../user/workflow.js";
|
||||
import { organizationWorkflowQueueName } from "../organization/queues.js";
|
||||
|
||||
import { task as taskTable, taskOwner, taskRuntime, taskSandboxes, taskWorkspaceSessions } from "./db/schema.js";
|
||||
import { getCurrentRecord } from "./workflow/common.js";
|
||||
|
|
@ -123,9 +125,7 @@ function parseGitState(value: string | null | undefined): { fileChanges: Array<a
|
|||
}
|
||||
}
|
||||
|
||||
async function readTaskOwner(
|
||||
c: any,
|
||||
): Promise<{
|
||||
async function readTaskOwner(c: any): Promise<{
|
||||
primaryUserId: string | null;
|
||||
primaryGithubLogin: string | null;
|
||||
primaryGithubEmail: string | null;
|
||||
|
|
@ -427,11 +427,17 @@ async function upsertUserTaskState(c: any, authSessionId: string | null | undefi
|
|||
}
|
||||
|
||||
const user = await getOrCreateUser(c, userId);
|
||||
await user.taskStateUpsert({
|
||||
taskId: c.state.taskId,
|
||||
sessionId,
|
||||
patch,
|
||||
});
|
||||
expectQueueResponse(
|
||||
await user.send(
|
||||
userWorkflowQueueName("user.command.task_state.upsert"),
|
||||
{
|
||||
taskId: c.state.taskId,
|
||||
sessionId,
|
||||
patch,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
async function deleteUserTaskState(c: any, authSessionId: string | null | undefined, sessionId: string): Promise<void> {
|
||||
|
|
@ -446,10 +452,14 @@ async function deleteUserTaskState(c: any, authSessionId: string | null | undefi
|
|||
}
|
||||
|
||||
const user = await getOrCreateUser(c, userId);
|
||||
await user.taskStateDelete({
|
||||
taskId: c.state.taskId,
|
||||
sessionId,
|
||||
});
|
||||
await user.send(
|
||||
userWorkflowQueueName("user.command.task_state.delete"),
|
||||
{
|
||||
taskId: c.state.taskId,
|
||||
sessionId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
|
||||
async function resolveDefaultModel(c: any, authSessionId?: string | null): Promise<string> {
|
||||
|
|
@ -932,17 +942,13 @@ async function enqueueWorkspaceRefresh(
|
|||
command: "task.command.workspace.refresh_derived" | "task.command.workspace.refresh_session_transcript",
|
||||
body: Record<string, unknown>,
|
||||
): Promise<void> {
|
||||
// Call directly since we're inside the task actor (no queue needed)
|
||||
if (command === "task.command.workspace.refresh_derived") {
|
||||
void refreshWorkspaceDerivedState(c).catch(() => {});
|
||||
} else {
|
||||
void refreshWorkspaceSessionTranscript(c, body.sessionId as string).catch(() => {});
|
||||
}
|
||||
const self = selfTask(c);
|
||||
await self.send(taskWorkflowQueueName(command as any), body, { wait: false });
|
||||
}
|
||||
|
||||
async function enqueueWorkspaceEnsureSession(c: any, sessionId: string): Promise<void> {
|
||||
// Call directly since we're inside the task actor
|
||||
void ensureWorkspaceSession(c, sessionId).catch(() => {});
|
||||
const self = selfTask(c);
|
||||
await self.send(taskWorkflowQueueName("task.command.workspace.ensure_session" as any), { sessionId }, { wait: false });
|
||||
}
|
||||
|
||||
function pendingWorkspaceSessionStatus(record: any): "pending_provision" | "pending_session_create" {
|
||||
|
|
@ -1166,7 +1172,11 @@ export async function getSessionDetail(c: any, sessionId: string, authSessionId?
|
|||
*/
|
||||
export async function broadcastTaskUpdate(c: any, options?: { sessionId?: string }): Promise<void> {
|
||||
const organization = await getOrCreateOrganization(c, c.state.organizationId);
|
||||
await organization.commandApplyTaskSummaryUpdate({ taskSummary: await buildTaskSummary(c) });
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.applyTaskSummaryUpdate"),
|
||||
{ taskSummary: await buildTaskSummary(c) },
|
||||
{ wait: false },
|
||||
);
|
||||
c.broadcast("taskUpdated", {
|
||||
type: "taskUpdated",
|
||||
detail: await buildTaskDetail(c),
|
||||
|
|
@ -1307,8 +1317,9 @@ export async function enqueuePendingWorkspaceSessions(c: any): Promise<void> {
|
|||
(row) => row.closed !== true && row.status !== "ready" && row.status !== "error",
|
||||
);
|
||||
|
||||
const self = selfTask(c);
|
||||
for (const row of pending) {
|
||||
void ensureWorkspaceSession(c, row.sessionId, row.model).catch(() => {});
|
||||
await self.send(taskWorkflowQueueName("task.command.workspace.ensure_session" as any), { sessionId: row.sessionId, model: row.model }, { wait: false });
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,21 +1,13 @@
|
|||
import { actor } from "rivetkit";
|
||||
import { actor, queue } from "rivetkit";
|
||||
import { workflow } from "rivetkit/workflow";
|
||||
import { userDb } from "./db/db.js";
|
||||
import { betterAuthActions } from "./actions/better-auth.js";
|
||||
import { userActions } from "./actions/user.js";
|
||||
import {
|
||||
createAuthRecordMutation,
|
||||
updateAuthRecordMutation,
|
||||
updateManyAuthRecordsMutation,
|
||||
deleteAuthRecordMutation,
|
||||
deleteManyAuthRecordsMutation,
|
||||
upsertUserProfileMutation,
|
||||
upsertSessionStateMutation,
|
||||
upsertTaskStateMutation,
|
||||
deleteTaskStateMutation,
|
||||
} from "./workflow.js";
|
||||
import { USER_QUEUE_NAMES, runUserWorkflow } from "./workflow.js";
|
||||
|
||||
export const user = actor({
|
||||
db: userDb,
|
||||
queues: Object.fromEntries(USER_QUEUE_NAMES.map((name) => [name, queue()])),
|
||||
options: {
|
||||
name: "User",
|
||||
icon: "shield",
|
||||
|
|
@ -27,34 +19,6 @@ export const user = actor({
|
|||
actions: {
|
||||
...betterAuthActions,
|
||||
...userActions,
|
||||
async authCreate(c, body) {
|
||||
return await createAuthRecordMutation(c, body);
|
||||
},
|
||||
async authUpdate(c, body) {
|
||||
return await updateAuthRecordMutation(c, body);
|
||||
},
|
||||
async authUpdateMany(c, body) {
|
||||
return await updateManyAuthRecordsMutation(c, body);
|
||||
},
|
||||
async authDelete(c, body) {
|
||||
await deleteAuthRecordMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
async authDeleteMany(c, body) {
|
||||
return await deleteManyAuthRecordsMutation(c, body);
|
||||
},
|
||||
async profileUpsert(c, body) {
|
||||
return await upsertUserProfileMutation(c, body);
|
||||
},
|
||||
async sessionStateUpsert(c, body) {
|
||||
return await upsertSessionStateMutation(c, body);
|
||||
},
|
||||
async taskStateUpsert(c, body) {
|
||||
return await upsertTaskStateMutation(c, body);
|
||||
},
|
||||
async taskStateDelete(c, body) {
|
||||
await deleteTaskStateMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
},
|
||||
run: workflow(runUserWorkflow),
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,8 +1,45 @@
|
|||
// @ts-nocheck
|
||||
/**
|
||||
* User workflow — queue-based command loop.
|
||||
*
|
||||
* Auth mutation commands are dispatched through named queues and processed
|
||||
* inside the workflow command loop for observability and replay semantics.
|
||||
*/
|
||||
import { eq, count as sqlCount, and } from "drizzle-orm";
|
||||
import { Loop } from "rivetkit/workflow";
|
||||
import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared";
|
||||
import { logActorWarning, resolveErrorMessage } from "../logging.js";
|
||||
import { selfUser } from "../handles.js";
|
||||
import { expectQueueResponse } from "../../services/queue.js";
|
||||
import { authUsers, sessionState, userProfiles, userTaskState } from "./db/schema.js";
|
||||
import { buildWhere, columnFor, materializeRow, persistInput, persistPatch, tableFor } from "./query-helpers.js";
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Queue names
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export const USER_QUEUE_NAMES = [
|
||||
"user.command.auth.create",
|
||||
"user.command.auth.update",
|
||||
"user.command.auth.update_many",
|
||||
"user.command.auth.delete",
|
||||
"user.command.auth.delete_many",
|
||||
"user.command.profile.upsert",
|
||||
"user.command.session_state.upsert",
|
||||
"user.command.task_state.upsert",
|
||||
"user.command.task_state.delete",
|
||||
] as const;
|
||||
|
||||
export type UserQueueName = (typeof USER_QUEUE_NAMES)[number];
|
||||
|
||||
export function userWorkflowQueueName(name: UserQueueName): UserQueueName {
|
||||
return name;
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Mutation functions
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
export async function createAuthRecordMutation(c: any, input: { model: string; data: Record<string, unknown> }) {
|
||||
const table = tableFor(input.model);
|
||||
const persisted = persistInput(input.model, input.data);
|
||||
|
|
@ -195,3 +232,66 @@ export async function deleteTaskStateMutation(c: any, input: { taskId: string; s
|
|||
}
|
||||
await c.db.delete(userTaskState).where(eq(userTaskState.taskId, input.taskId)).run();
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Workflow command loop
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
type WorkflowHandler = (loopCtx: any, body: any) => Promise<any>;
|
||||
|
||||
const COMMAND_HANDLERS: Record<UserQueueName, WorkflowHandler> = {
|
||||
"user.command.auth.create": async (c, body) => createAuthRecordMutation(c, body),
|
||||
"user.command.auth.update": async (c, body) => updateAuthRecordMutation(c, body),
|
||||
"user.command.auth.update_many": async (c, body) => updateManyAuthRecordsMutation(c, body),
|
||||
"user.command.auth.delete": async (c, body) => {
|
||||
await deleteAuthRecordMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
"user.command.auth.delete_many": async (c, body) => deleteManyAuthRecordsMutation(c, body),
|
||||
"user.command.profile.upsert": async (c, body) => upsertUserProfileMutation(c, body),
|
||||
"user.command.session_state.upsert": async (c, body) => upsertSessionStateMutation(c, body),
|
||||
"user.command.task_state.upsert": async (c, body) => upsertTaskStateMutation(c, body),
|
||||
"user.command.task_state.delete": async (c, body) => {
|
||||
await deleteTaskStateMutation(c, body);
|
||||
return { ok: true };
|
||||
},
|
||||
};
|
||||
|
||||
export async function runUserWorkflow(ctx: any): Promise<void> {
|
||||
await ctx.loop("user-command-loop", async (loopCtx: any) => {
|
||||
const msg = await loopCtx.queue.next("next-user-command", {
|
||||
names: [...USER_QUEUE_NAMES],
|
||||
completable: true,
|
||||
});
|
||||
|
||||
if (!msg) {
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
const handler = COMMAND_HANDLERS[msg.name as UserQueueName];
|
||||
if (!handler) {
|
||||
logActorWarning("user", "unknown user command", { command: msg.name });
|
||||
await msg.complete({ error: `Unknown command: ${msg.name}` }).catch(() => {});
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
try {
|
||||
// Wrap in a step so c.state and c.db are accessible inside mutation functions.
|
||||
const result = await loopCtx.step({
|
||||
name: msg.name,
|
||||
timeout: 60_000,
|
||||
run: async () => handler(loopCtx, msg.body),
|
||||
});
|
||||
await msg.complete(result);
|
||||
} catch (error) {
|
||||
const message = resolveErrorMessage(error);
|
||||
logActorWarning("user", "user workflow command failed", {
|
||||
command: msg.name,
|
||||
error: message,
|
||||
});
|
||||
await msg.complete({ error: message }).catch(() => {});
|
||||
}
|
||||
|
||||
return Loop.continue(undefined);
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
import { betterAuth } from "better-auth";
|
||||
import { createAdapterFactory } from "better-auth/adapters";
|
||||
import { APP_SHELL_ORGANIZATION_ID } from "../actors/organization/constants.js";
|
||||
// organization actions are called directly (no queue)
|
||||
// user actor actions are called directly (no queue)
|
||||
import { organizationKey, userKey } from "../actors/keys.js";
|
||||
import { logger } from "../logging.js";
|
||||
// expectQueueResponse removed — actions return values directly
|
||||
import { expectQueueResponse } from "./queue.js";
|
||||
import { userWorkflowQueueName } from "../actors/user/workflow.js";
|
||||
import { organizationWorkflowQueueName } from "../actors/organization/queues.js";
|
||||
|
||||
const AUTH_BASE_PATH = "/v1/auth";
|
||||
const SESSION_COOKIE = "better-auth.session_token";
|
||||
|
|
@ -62,8 +62,6 @@ function resolveRouteUserId(organization: any, resolved: any): string | null {
|
|||
return null;
|
||||
}
|
||||
|
||||
// sendOrganizationCommand removed — org actions are called directly
|
||||
|
||||
export interface BetterAuthService {
|
||||
auth: any;
|
||||
resolveSession(headers: Headers): Promise<{ session: any; user: any } | null>;
|
||||
|
|
@ -162,9 +160,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
return null;
|
||||
};
|
||||
|
||||
const ensureOrganizationVerification = async (actionName: string, payload: Record<string, unknown>) => {
|
||||
const ensureOrganizationVerification = async (queueName: string, payload: Record<string, unknown>) => {
|
||||
const organization = await appOrganization();
|
||||
return await (organization as any)[actionName](payload);
|
||||
return expectQueueResponse(await organization.send(organizationWorkflowQueueName(queueName as any), payload, { wait: true, timeout: 10_000 }));
|
||||
};
|
||||
|
||||
return {
|
||||
|
|
@ -175,7 +173,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
create: async ({ model, data }) => {
|
||||
const transformed = await transformInput(data, model, "create", true);
|
||||
if (model === "verification") {
|
||||
return await ensureOrganizationVerification("commandBetterAuthVerificationCreate", { data: transformed });
|
||||
return await ensureOrganizationVerification("organization.command.better_auth.verification.create", { data: transformed });
|
||||
}
|
||||
|
||||
const userId = await resolveUserIdForQuery(model, undefined, transformed);
|
||||
|
|
@ -184,31 +182,51 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
}
|
||||
|
||||
const userActor = await getUser(userId);
|
||||
const created = await userActor.authCreate({ model, data: transformed });
|
||||
const created = expectQueueResponse(
|
||||
await userActor.send(userWorkflowQueueName("user.command.auth.create"), { model, data: transformed }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
const organization = await appOrganization();
|
||||
|
||||
if (model === "user" && typeof transformed.email === "string" && transformed.email.length > 0) {
|
||||
await organization.commandBetterAuthEmailIndexUpsert({
|
||||
email: transformed.email.toLowerCase(),
|
||||
userId,
|
||||
});
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.email_index.upsert"),
|
||||
{
|
||||
email: transformed.email.toLowerCase(),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
if (model === "session") {
|
||||
await organization.commandBetterAuthSessionIndexUpsert({
|
||||
sessionId: String(created.id),
|
||||
sessionToken: String(created.token),
|
||||
userId,
|
||||
});
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.session_index.upsert"),
|
||||
{
|
||||
sessionId: String(created.id),
|
||||
sessionToken: String(created.token),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
if (model === "account") {
|
||||
await organization.commandBetterAuthAccountIndexUpsert({
|
||||
id: String(created.id),
|
||||
providerId: String(created.providerId),
|
||||
accountId: String(created.accountId),
|
||||
userId,
|
||||
});
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.account_index.upsert"),
|
||||
{
|
||||
id: String(created.id),
|
||||
providerId: String(created.providerId),
|
||||
accountId: String(created.accountId),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
return (await transformOutput(created, model)) as any;
|
||||
|
|
@ -291,7 +309,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
const transformedWhere = transformWhereClause({ model, where, action: "update" });
|
||||
const transformedUpdate = (await transformInput(update as Record<string, unknown>, model, "update", true)) as Record<string, unknown>;
|
||||
if (model === "verification") {
|
||||
return await ensureOrganizationVerification("commandBetterAuthVerificationUpdate", {
|
||||
return await ensureOrganizationVerification("organization.command.better_auth.verification.update", {
|
||||
where: transformedWhere,
|
||||
update: transformedUpdate,
|
||||
});
|
||||
|
|
@ -311,38 +329,66 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
: model === "session"
|
||||
? await userActor.betterAuthFindOneRecord({ model, where: transformedWhere })
|
||||
: null;
|
||||
const updated = await userActor.authUpdate({ model, where: transformedWhere, update: transformedUpdate });
|
||||
const updated = expectQueueResponse(
|
||||
await userActor.send(
|
||||
userWorkflowQueueName("user.command.auth.update"),
|
||||
{ model, where: transformedWhere, update: transformedUpdate },
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
const organization = await appOrganization();
|
||||
|
||||
if (model === "user" && updated) {
|
||||
if (before?.email && before.email !== updated.email) {
|
||||
await organization.commandBetterAuthEmailIndexDelete({
|
||||
email: before.email.toLowerCase(),
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.email_index.delete"),
|
||||
{
|
||||
email: before.email.toLowerCase(),
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
if (updated.email) {
|
||||
await organization.commandBetterAuthEmailIndexUpsert({
|
||||
email: updated.email.toLowerCase(),
|
||||
userId,
|
||||
});
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.email_index.upsert"),
|
||||
{
|
||||
email: updated.email.toLowerCase(),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (model === "session" && updated) {
|
||||
await organization.commandBetterAuthSessionIndexUpsert({
|
||||
sessionId: String(updated.id),
|
||||
sessionToken: String(updated.token),
|
||||
userId,
|
||||
});
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.session_index.upsert"),
|
||||
{
|
||||
sessionId: String(updated.id),
|
||||
sessionToken: String(updated.token),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
if (model === "account" && updated) {
|
||||
await organization.commandBetterAuthAccountIndexUpsert({
|
||||
id: String(updated.id),
|
||||
providerId: String(updated.providerId),
|
||||
accountId: String(updated.accountId),
|
||||
userId,
|
||||
});
|
||||
expectQueueResponse(
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.account_index.upsert"),
|
||||
{
|
||||
id: String(updated.id),
|
||||
providerId: String(updated.providerId),
|
||||
accountId: String(updated.accountId),
|
||||
userId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
return updated ? ((await transformOutput(updated, model)) as any) : null;
|
||||
|
|
@ -352,7 +398,7 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
const transformedWhere = transformWhereClause({ model, where, action: "updateMany" });
|
||||
const transformedUpdate = (await transformInput(update as Record<string, unknown>, model, "update", true)) as Record<string, unknown>;
|
||||
if (model === "verification") {
|
||||
return await ensureOrganizationVerification("commandBetterAuthVerificationUpdateMany", {
|
||||
return await ensureOrganizationVerification("organization.command.better_auth.verification.update_many", {
|
||||
where: transformedWhere,
|
||||
update: transformedUpdate,
|
||||
});
|
||||
|
|
@ -364,14 +410,24 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
}
|
||||
|
||||
const userActor = await getUser(userId);
|
||||
return await userActor.authUpdateMany({ model, where: transformedWhere, update: transformedUpdate });
|
||||
return expectQueueResponse(
|
||||
await userActor.send(
|
||||
userWorkflowQueueName("user.command.auth.update_many"),
|
||||
{ model, where: transformedWhere, update: transformedUpdate },
|
||||
{ wait: true, timeout: 10_000 },
|
||||
),
|
||||
);
|
||||
},
|
||||
|
||||
delete: async ({ model, where }) => {
|
||||
const transformedWhere = transformWhereClause({ model, where, action: "delete" });
|
||||
if (model === "verification") {
|
||||
const organization = await appOrganization();
|
||||
await organization.commandBetterAuthVerificationDelete({ where: transformedWhere });
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.verification.delete"),
|
||||
{ where: transformedWhere },
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
@ -383,34 +439,46 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
const userActor = await getUser(userId);
|
||||
const organization = await appOrganization();
|
||||
const before = await userActor.betterAuthFindOneRecord({ model, where: transformedWhere });
|
||||
await userActor.authDelete({ model, where: transformedWhere });
|
||||
await userActor.send(userWorkflowQueueName("user.command.auth.delete"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 });
|
||||
|
||||
if (model === "session" && before) {
|
||||
await organization.commandBetterAuthSessionIndexDelete({
|
||||
sessionId: before.id,
|
||||
sessionToken: before.token,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.session_index.delete"),
|
||||
{
|
||||
sessionId: before.id,
|
||||
sessionToken: before.token,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
|
||||
if (model === "account" && before) {
|
||||
await organization.commandBetterAuthAccountIndexDelete({
|
||||
id: before.id,
|
||||
providerId: before.providerId,
|
||||
accountId: before.accountId,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.account_index.delete"),
|
||||
{
|
||||
id: before.id,
|
||||
providerId: before.providerId,
|
||||
accountId: before.accountId,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
|
||||
if (model === "user" && before?.email) {
|
||||
await organization.commandBetterAuthEmailIndexDelete({
|
||||
email: before.email.toLowerCase(),
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.email_index.delete"),
|
||||
{
|
||||
email: before.email.toLowerCase(),
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
},
|
||||
|
||||
deleteMany: async ({ model, where }) => {
|
||||
const transformedWhere = transformWhereClause({ model, where, action: "deleteMany" });
|
||||
if (model === "verification") {
|
||||
return await ensureOrganizationVerification("commandBetterAuthVerificationDeleteMany", { where: transformedWhere });
|
||||
return await ensureOrganizationVerification("organization.command.better_auth.verification.delete_many", { where: transformedWhere });
|
||||
}
|
||||
|
||||
if (model === "session") {
|
||||
|
|
@ -421,12 +489,18 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
const userActor = await getUser(userId);
|
||||
const organization = await appOrganization();
|
||||
const sessions = await userActor.betterAuthFindManyRecords({ model, where: transformedWhere, limit: 5000 });
|
||||
const deleted = await userActor.authDeleteMany({ model, where: transformedWhere });
|
||||
const deleted = expectQueueResponse(
|
||||
await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
for (const session of sessions) {
|
||||
await organization.commandBetterAuthSessionIndexDelete({
|
||||
sessionId: session.id,
|
||||
sessionToken: session.token,
|
||||
});
|
||||
await organization.send(
|
||||
organizationWorkflowQueueName("organization.command.better_auth.session_index.delete"),
|
||||
{
|
||||
sessionId: session.id,
|
||||
sessionToken: session.token,
|
||||
},
|
||||
{ wait: true, timeout: 10_000 },
|
||||
);
|
||||
}
|
||||
return deleted;
|
||||
}
|
||||
|
|
@ -437,7 +511,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
}
|
||||
|
||||
const userActor = await getUser(userId);
|
||||
const deleted = await userActor.authDeleteMany({ model, where: transformedWhere });
|
||||
const deleted = expectQueueResponse(
|
||||
await userActor.send(userWorkflowQueueName("user.command.auth.delete_many"), { model, where: transformedWhere }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
return deleted;
|
||||
},
|
||||
|
||||
|
|
@ -509,7 +585,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
|
||||
async upsertUserProfile(userId: string, patch: Record<string, unknown>) {
|
||||
const userActor = await getUser(userId);
|
||||
return await userActor.profileUpsert({ userId, patch });
|
||||
return expectQueueResponse(
|
||||
await userActor.send(userWorkflowQueueName("user.command.profile.upsert"), { userId, patch }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
},
|
||||
|
||||
async setActiveOrganization(sessionId: string, activeOrganizationId: string | null) {
|
||||
|
|
@ -518,7 +596,9 @@ export function initBetterAuthService(actorClient: any, options: { apiUrl: strin
|
|||
throw new Error(`Unknown auth session ${sessionId}`);
|
||||
}
|
||||
const userActor = await getUser(authState.user.id);
|
||||
return await userActor.sessionStateUpsert({ sessionId, activeOrganizationId });
|
||||
return expectQueueResponse(
|
||||
await userActor.send(userWorkflowQueueName("user.command.session_state.upsert"), { sessionId, activeOrganizationId }, { wait: true, timeout: 10_000 }),
|
||||
);
|
||||
},
|
||||
|
||||
async getAccessTokenForSession(sessionId: string) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue