diff --git a/foundry/CLAUDE.md b/foundry/CLAUDE.md index 067dfda..ab51fc6 100644 --- a/foundry/CLAUDE.md +++ b/foundry/CLAUDE.md @@ -99,7 +99,7 @@ Do not use polling (`refetchInterval`), empty "go re-fetch" broadcast events, or - **Organization actor** materializes sidebar-level data in its own SQLite: repo catalog, task summaries (title, status, branch, PR, updatedAt), repo summaries (overview/branch state), and session summaries (id, name, status, unread, model — no transcript). Task actors push summary changes to the organization actor when they mutate. The organization actor broadcasts the updated entity to connected clients. `getOrganizationSummary` reads from local tables only — no fan-out to child actors. - **Task actor** materializes its own detail state (session summaries, sandbox info, diffs, file tree). `getTaskDetail` reads from the task actor's own SQLite. The task actor broadcasts updates directly to clients connected to it. - **Session data** lives on the task actor but is a separate subscription topic. The task topic includes `sessions_summary` (list without content). The `session` topic provides full transcript and draft state. Clients subscribe to the `session` topic for whichever session is active, and filter `sessionUpdated` events by session ID (ignoring events for other sessions on the same actor). -- The expensive fan-out (querying every repository/task actor) only exists as a background reconciliation/rebuild path, never on the hot read path. +- There is no fan-out on the read path. The organization actor owns all task summaries locally. ### Subscription manager @@ -240,11 +240,11 @@ All `wait: true` sends must have an explicit `timeout`. Maximum timeout for any ### Task creation: resolve metadata before creating the actor -When creating a task, all deterministic metadata (title, branch name) must be resolved synchronously in the parent actor (repository) *before* the task actor is created. The task actor must never be created with null `branchName` or `title`. +When creating a task, all deterministic metadata (title, branch name) must be resolved synchronously in the organization actor *before* the task actor is created. The task actor must never be created with null `branchName` or `title`. - Title is derived from the task description via `deriveFallbackTitle()` — pure string manipulation, no external I/O. - Branch name is derived from the title via `sanitizeBranchName()` + conflict checking against the repository's task index. -- The repository actor already has the task index and GitHub-backed default branch metadata. Resolve the branch name there without local git fetches. +- The organization actor owns the task index and reads GitHub-backed default branch metadata from the github-data actor. Resolve the branch name there without local git fetches. - Do not defer naming to a background provision workflow. Do not poll for names to become available. - The `onBranch` path (attaching to an existing branch) and the new-task path should both produce a fully-named task record on return. - Actor handle policy: diff --git a/foundry/docker/frontend.dev.Dockerfile b/foundry/docker/frontend.dev.Dockerfile index 3b0d8e4..dd74dd0 100644 --- a/foundry/docker/frontend.dev.Dockerfile +++ b/foundry/docker/frontend.dev.Dockerfile @@ -8,4 +8,4 @@ RUN npm install -g pnpm@10.28.2 WORKDIR /app -CMD ["bash", "-lc", "pnpm install --force --frozen-lockfile --filter @sandbox-agent/foundry-frontend... && cd foundry/packages/frontend && exec pnpm vite --host 0.0.0.0 --port 4173"] +CMD ["bash", "-lc", "pnpm install --frozen-lockfile --filter @sandbox-agent/foundry-frontend... && cd foundry/packages/frontend && exec pnpm vite --host 0.0.0.0 --port 4173"] diff --git a/foundry/packages/backend/CLAUDE.md b/foundry/packages/backend/CLAUDE.md index 245788e..7018642 100644 --- a/foundry/packages/backend/CLAUDE.md +++ b/foundry/packages/backend/CLAUDE.md @@ -5,13 +5,12 @@ Keep the backend actor tree aligned with this shape unless we explicitly decide to change it: ```text -OrganizationActor -├─ AuditLogActor(organization-scoped global feed) +OrganizationActor (direct coordinator for tasks) +├─ AuditLogActor (organization-scoped global feed) ├─ GithubDataActor -├─ RepositoryActor(repo) -│ └─ TaskActor(task) -│ ├─ taskSessions → session metadata/transcripts -│ └─ taskSandboxes → sandbox instance index +├─ TaskActor(task) +│ ├─ taskSessions → session metadata/transcripts +│ └─ taskSandboxes → sandbox instance index └─ SandboxInstanceActor(sandboxProviderId, sandboxId) × N ``` @@ -27,27 +26,23 @@ Children push updates **up** to their direct coordinator only. Coordinators broa ### Coordinator hierarchy and index tables ```text -OrganizationActor (coordinator for repos + auth users) +OrganizationActor (coordinator for tasks + auth users) │ │ Index tables: -│ ├─ repos → RepositoryActor index (repo catalog) +│ ├─ repos → Repository catalog (GitHub sync) +│ ├─ taskIndex → TaskActor index (taskId → repoId + branchName) +│ ├─ taskSummaries → TaskActor materialized sidebar projection │ ├─ authSessionIndex → UserActor index (session token → userId) │ ├─ authEmailIndex → UserActor index (email → userId) │ └─ authAccountIndex → UserActor index (OAuth account → userId) │ -├─ RepositoryActor (coordinator for tasks) +├─ TaskActor (coordinator for sessions + sandboxes) │ │ │ │ Index tables: -│ │ ├─ taskIndex → TaskActor index (taskId → branchName) -│ │ └─ tasks → TaskActor materialized sidebar projection +│ │ ├─ taskWorkspaceSessions → Session index (session metadata + transcript) +│ │ └─ taskSandboxes → SandboxInstanceActor index (sandbox history) │ │ -│ └─ TaskActor (coordinator for sessions + sandboxes) -│ │ -│ │ Index tables: -│ │ ├─ taskWorkspaceSessions → Session index (session metadata + transcript) -│ │ └─ taskSandboxes → SandboxInstanceActor index (sandbox history) -│ │ -│ └─ SandboxInstanceActor (leaf) +│ └─ SandboxInstanceActor (leaf) │ ├─ AuditLogActor (organization-scoped audit log, not a coordinator) └─ GithubDataActor (GitHub API cache, not a coordinator) @@ -57,9 +52,8 @@ When adding a new index table, annotate it in the schema file with a doc comment ## Ownership Rules -- `OrganizationActor` is the organization coordinator and lookup/index owner. +- `OrganizationActor` is the organization coordinator, direct coordinator for tasks, and lookup/index owner. It owns the task index, task summaries, and repo catalog. - `AuditLogActor` is organization-scoped. There is one organization-level audit log feed. -- `RepositoryActor` is the repo coordinator and owns repo-local caches/indexes. - `TaskActor` is one branch. Treat `1 task = 1 branch` once branch assignment is finalized. - `TaskActor` can have many sessions. - `TaskActor` can reference many sandbox instances historically, but should have only one active sandbox/session at a time. @@ -69,10 +63,47 @@ When adding a new index table, annotate it in the schema file with a doc comment - The backend stores no local git state. No clones, no refs, no working trees, and no git-spice. Repository metadata comes from GitHub API data and webhook events. Any working-tree git operation runs inside a sandbox via `executeInSandbox()`. - When a backend request path must aggregate multiple independent actor calls or reads, prefer bounded parallelism over sequential fan-out when correctness permits. Do not serialize independent work by default. - Only a coordinator creates/destroys its children. Do not create child actors from outside the coordinator. -- Children push state changes up to their direct coordinator only — never skip levels (e.g., task pushes to repo, not directly to org, unless org is the direct coordinator for that index). +- Children push state changes up to their direct coordinator only. Task actors push summary updates directly to the organization actor. - Read paths must use the coordinator's local index tables. Do not fan out to child actors on the hot read path. - Never build "enriched" read actions that chain through multiple actors (e.g., coordinator → child actor → sibling actor). If data from multiple actors is needed for a read, it should already be materialized in the coordinator's index tables via push updates. If it's not there, fix the write path to push it — do not add a fan-out read path. +## Drizzle Migration Maintenance + +After changing any actor's `db/schema.ts`, you **must** regenerate the corresponding migration so the runtime creates the tables that match the schema. Forgetting this step causes `no such table` errors at runtime. + +1. **Generate a new drizzle migration.** Run from `packages/backend`: + ```bash + npx drizzle-kit generate --config=./src/actors//db/drizzle.config.ts + ``` + If the interactive prompt is unavailable (e.g. in a non-TTY), manually create a new `.sql` file under `./src/actors//db/drizzle/` and add the corresponding entry to `meta/_journal.json`. + +2. **Regenerate the compiled `migrations.ts`.** Run from the foundry root: + ```bash + npx tsx packages/backend/src/actors/_scripts/generate-actor-migrations.ts + ``` + +3. **Verify insert/upsert calls.** Every column with `.notNull()` (and no `.default(...)`) must be provided a value in all `insert()` and `onConflictDoUpdate()` calls. Missing a NOT NULL column causes a runtime constraint violation, not a type error. + +4. **Nuke RivetKit state in dev** after migration changes to start fresh: + ```bash + docker compose -f compose.dev.yaml down + docker volume rm foundry_foundry_rivetkit_storage + docker compose -f compose.dev.yaml up -d + ``` + +Actors with drizzle migrations: `organization`, `audit-log`, `task`. Other actors (`user`, `github-data`) use inline migrations without drizzle. + +## Workflow Step Nesting — FORBIDDEN + +**Never call `c.step()` / `ctx.step()` from inside another step's `run` callback.** RivetKit workflow steps cannot be nested. Doing so causes the runtime error: *"Cannot start a new workflow entry while another is in progress."* + +This means: +- Functions called from within a step `run` callback must NOT use `c.step()`, `c.loop()`, `c.sleep()`, or `c.queue.next()`. +- If a mutation function needs to be called both from a step and standalone, it must only do plain DB/API work — no workflow primitives. The workflow step wrapping belongs in the workflow file, not in the mutation. +- Helper wrappers that conditionally call `c.step()` (like a `runSyncStep` pattern) are dangerous — if the caller is already inside a step, the nested `c.step()` will crash at runtime with no compile-time warning. + +**Rule of thumb:** Workflow primitives (`step`, `loop`, `sleep`, `queue.next`) may only appear at the top level of a workflow function or inside a `loop` callback — never inside a step's `run`. + ## SQLite Constraints - Single-row tables must use an integer primary key with `CHECK (id = 1)` to enforce the singleton invariant at the database level. @@ -92,6 +123,45 @@ Do not store per-user preferences, selections, or ephemeral UI state on shared a Every new action or command handler that represents a user-visible or workflow-significant event must append to the audit log actor. The audit log must remain a comprehensive record of significant operations. +## Debugging Actors + +### RivetKit Inspector UI + +The RivetKit inspector UI at `http://localhost:6420/ui/` is the most reliable way to debug actor state in local development. The inspector HTTP API (`/inspector/workflow-history`) has a known bug where it returns empty `{}` even when the workflow has entries — always cross-check with the UI. + +**Useful inspector URL pattern:** +``` +http://localhost:6420/ui/?u=http%3A%2F%2F127.0.0.1%3A6420&ns=default&r=default&n=[%22%22]&actorId=&tab= +``` + +Tabs: `workflow`, `database`, `state`, `queue`, `connections`, `metadata`. + +**To find actor IDs:** +```bash +curl -s 'http://127.0.0.1:6420/actors?name=organization' +``` + +**To query actor DB via bun (inside container):** +```bash +docker compose -f compose.dev.yaml exec -T backend bun -e ' + var Database = require("bun:sqlite"); + var db = new Database("/root/.local/share/foundry/rivetkit/databases/.db", { readonly: true }); + console.log(JSON.stringify(db.query("SELECT name FROM sqlite_master WHERE type=?").all("table"))); +' +``` + +**To call actor actions via inspector:** +```bash +curl -s -X POST 'http://127.0.0.1:6420/gateway//inspector/action/' \ + -H 'Content-Type: application/json' -d '{"args":[{}]}' +``` + +### Known inspector API bugs + +- `GET /inspector/workflow-history` may return `{"history":{}}` even when workflow has run. Use the UI's Workflow tab instead. +- `GET /inspector/queue` is reliable for checking pending messages. +- `GET /inspector/state` is reliable for checking actor state. + ## Maintenance - Keep this file up to date whenever actor ownership, hierarchy, or lifecycle responsibilities change. diff --git a/foundry/packages/backend/src/actors/audit-log/db/drizzle/0001_add_repo_id.sql b/foundry/packages/backend/src/actors/audit-log/db/drizzle/0001_add_repo_id.sql new file mode 100644 index 0000000..9ada559 --- /dev/null +++ b/foundry/packages/backend/src/actors/audit-log/db/drizzle/0001_add_repo_id.sql @@ -0,0 +1 @@ +ALTER TABLE `events` ADD COLUMN `repo_id` text; diff --git a/foundry/packages/backend/src/actors/repository/db/drizzle/meta/0000_snapshot.json b/foundry/packages/backend/src/actors/audit-log/db/drizzle/meta/0001_snapshot.json similarity index 64% rename from foundry/packages/backend/src/actors/repository/db/drizzle/meta/0000_snapshot.json rename to foundry/packages/backend/src/actors/audit-log/db/drizzle/meta/0001_snapshot.json index 940b4e6..cf2910c 100644 --- a/foundry/packages/backend/src/actors/repository/db/drizzle/meta/0000_snapshot.json +++ b/foundry/packages/backend/src/actors/audit-log/db/drizzle/meta/0001_snapshot.json @@ -1,48 +1,31 @@ { "version": "6", "dialect": "sqlite", - "id": "6ffd6acb-e737-46ee-a8fe-fcfddcdd6ea9", - "prevId": "00000000-0000-0000-0000-000000000000", + "id": "a1b2c3d4-0001-4000-8000-000000000001", + "prevId": "e592c829-141f-4740-88b7-09cf957a4405", "tables": { - "repo_meta": { - "name": "repo_meta", + "events": { + "name": "events", "columns": { "id": { "name": "id", "type": "integer", "primaryKey": true, "notNull": true, - "autoincrement": false + "autoincrement": true }, - "remote_url": { - "name": "remote_url", + "repo_id": { + "name": "repo_id", "type": "text", "primaryKey": false, - "notNull": true, + "notNull": false, "autoincrement": false }, - "updated_at": { - "name": "updated_at", - "type": "integer", - "primaryKey": false, - "notNull": true, - "autoincrement": false - } - }, - "indexes": {}, - "foreignKeys": {}, - "compositePrimaryKeys": {}, - "uniqueConstraints": {}, - "checkConstraints": {} - }, - "task_index": { - "name": "task_index", - "columns": { "task_id": { "name": "task_id", "type": "text", - "primaryKey": true, - "notNull": true, + "primaryKey": false, + "notNull": false, "autoincrement": false }, "branch_name": { @@ -52,15 +35,22 @@ "notNull": false, "autoincrement": false }, - "created_at": { - "name": "created_at", - "type": "integer", + "kind": { + "name": "kind", + "type": "text", "primaryKey": false, "notNull": true, "autoincrement": false }, - "updated_at": { - "name": "updated_at", + "payload_json": { + "name": "payload_json", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "created_at": { + "name": "created_at", "type": "integer", "primaryKey": false, "notNull": true, diff --git a/foundry/packages/backend/src/actors/audit-log/db/drizzle/meta/_journal.json b/foundry/packages/backend/src/actors/audit-log/db/drizzle/meta/_journal.json index 93cf8ce..0393be2 100644 --- a/foundry/packages/backend/src/actors/audit-log/db/drizzle/meta/_journal.json +++ b/foundry/packages/backend/src/actors/audit-log/db/drizzle/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1773376223815, "tag": "0000_fluffy_kid_colt", "breakpoints": true + }, + { + "idx": 1, + "version": "6", + "when": 1773376223816, + "tag": "0001_add_repo_id", + "breakpoints": true } ] } diff --git a/foundry/packages/backend/src/actors/audit-log/db/migrations.ts b/foundry/packages/backend/src/actors/audit-log/db/migrations.ts index 766c225..5bf9b5a 100644 --- a/foundry/packages/backend/src/actors/audit-log/db/migrations.ts +++ b/foundry/packages/backend/src/actors/audit-log/db/migrations.ts @@ -10,6 +10,12 @@ const journal = { tag: "0000_fluffy_kid_colt", breakpoints: true, }, + { + idx: 1, + when: 1773376223816, + tag: "0001_add_repo_id", + breakpoints: true, + }, ], } as const; @@ -24,6 +30,8 @@ export default { \`payload_json\` text NOT NULL, \`created_at\` integer NOT NULL ); +`, + m0001: `ALTER TABLE \`events\` ADD COLUMN \`repo_id\` text; `, } as const, }; diff --git a/foundry/packages/backend/src/actors/audit-log/db/schema.ts b/foundry/packages/backend/src/actors/audit-log/db/schema.ts index 7ba2fa8..d275dd4 100644 --- a/foundry/packages/backend/src/actors/audit-log/db/schema.ts +++ b/foundry/packages/backend/src/actors/audit-log/db/schema.ts @@ -2,6 +2,7 @@ import { integer, sqliteTable, text } from "rivetkit/db/drizzle"; export const events = sqliteTable("events", { id: integer("id").primaryKey({ autoIncrement: true }), + repoId: text("repo_id"), taskId: text("task_id"), branchName: text("branch_name"), kind: text("kind").notNull(), diff --git a/foundry/packages/backend/src/actors/audit-log/index.ts b/foundry/packages/backend/src/actors/audit-log/index.ts index 0889040..ce0a654 100644 --- a/foundry/packages/backend/src/actors/audit-log/index.ts +++ b/foundry/packages/backend/src/actors/audit-log/index.ts @@ -9,22 +9,32 @@ import { AUDIT_LOG_QUEUE_NAMES, runAuditLogWorkflow } from "./workflow.js"; export interface AuditLogInput { organizationId: string; - repoId: string; } export interface AppendAuditLogCommand { kind: string; + repoId?: string; taskId?: string; branchName?: string; payload: Record; } export interface ListAuditLogParams { + repoId?: string; branch?: string; taskId?: string; limit?: number; } +/** + * Organization-scoped audit log. One per org, not one per repo. + * + * The org is the coordinator for all tasks across repos, and we frequently need + * to query the full audit trail across repos (e.g. org-wide activity feed, + * compliance). A per-repo audit log would require fan-out reads every time. + * Keeping it org-scoped gives us a single queryable feed with optional repoId + * filtering when callers want a narrower view. + */ export const auditLog = actor({ db: auditLogDb, queues: Object.fromEntries(AUDIT_LOG_QUEUE_NAMES.map((name) => [name, queue()])), @@ -34,11 +44,13 @@ export const auditLog = actor({ }, createState: (_c, input: AuditLogInput) => ({ organizationId: input.organizationId, - repoId: input.repoId, }), actions: { async list(c, params?: ListAuditLogParams): Promise { const whereParts = []; + if (params?.repoId) { + whereParts.push(eq(events.repoId, params.repoId)); + } if (params?.taskId) { whereParts.push(eq(events.taskId, params.taskId)); } @@ -49,6 +61,7 @@ export const auditLog = actor({ const base = c.db .select({ id: events.id, + repoId: events.repoId, taskId: events.taskId, branchName: events.branchName, kind: events.kind, @@ -65,7 +78,7 @@ export const auditLog = actor({ return rows.map((row) => ({ ...row, organizationId: c.state.organizationId, - repoId: c.state.repoId, + repoId: row.repoId ?? null, })); }, }, diff --git a/foundry/packages/backend/src/actors/audit-log/workflow.ts b/foundry/packages/backend/src/actors/audit-log/workflow.ts index dc87ad5..3c2437a 100644 --- a/foundry/packages/backend/src/actors/audit-log/workflow.ts +++ b/foundry/packages/backend/src/actors/audit-log/workflow.ts @@ -10,6 +10,7 @@ async function appendAuditLogRow(loopCtx: any, body: AppendAuditLogCommand): Pro await loopCtx.db .insert(events) .values({ + repoId: body.repoId ?? null, taskId: body.taskId ?? null, branchName: body.branchName ?? null, kind: body.kind, diff --git a/foundry/packages/backend/src/actors/github-data/index.ts b/foundry/packages/backend/src/actors/github-data/index.ts index 1fd1a64..d477c81 100644 --- a/foundry/packages/backend/src/actors/github-data/index.ts +++ b/foundry/packages/backend/src/actors/github-data/index.ts @@ -4,12 +4,11 @@ import { actor, queue } from "rivetkit"; import { workflow } from "rivetkit/workflow"; import type { FoundryOrganization } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../context.js"; -import { getOrCreateOrganization, getOrCreateRepository, getTask } from "../handles.js"; +import { getOrCreateOrganization, getTask } from "../handles.js"; import { repoIdFromRemote } from "../../services/repo.js"; import { resolveOrganizationGithubAuth } from "../../services/github-auth.js"; import { expectQueueResponse } from "../../services/queue.js"; import { organizationWorkflowQueueName } from "../organization/queues.js"; -import { repositoryWorkflowQueueName } from "../repository/workflow.js"; import { taskWorkflowQueueName } from "../task/workflow/index.js"; import { githubDataDb } from "./db/db.js"; import { githubBranches, githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js"; @@ -18,12 +17,7 @@ import { GITHUB_DATA_QUEUE_NAMES, runGithubDataWorkflow } from "./workflow.js"; const META_ROW_ID = 1; const SYNC_REPOSITORY_BATCH_SIZE = 10; -type GithubSyncPhase = - | "discovering_repositories" - | "syncing_repositories" - | "syncing_branches" - | "syncing_members" - | "syncing_pull_requests"; +type GithubSyncPhase = "discovering_repositories" | "syncing_repositories" | "syncing_branches" | "syncing_members" | "syncing_pull_requests"; interface GithubDataInput { organizationId: string; @@ -84,9 +78,7 @@ interface ClearStateInput { } async function sendOrganizationCommand(organization: any, name: Parameters[0], body: unknown): Promise { - await expectQueueResponse<{ ok: true }>( - await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 }), - ); + await expectQueueResponse<{ ok: true }>(await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 })); } interface PullRequestWebhookInput { @@ -237,17 +229,6 @@ async function publishSyncProgress(c: any, patch: Partial): Pro return meta; } -async function runSyncStep(c: any, name: string, run: () => Promise): Promise { - if (typeof c.step !== "function") { - return await run(); - } - return await c.step({ - name, - timeout: 90_000, - run, - }); -} - async function getOrganizationContext(c: any, overrides?: FullSyncInput) { const organizationHandle = await getOrCreateOrganization(c, c.state.organizationId); const organizationState = await organizationHandle.getOrganizationShellStateIfInitialized({}); @@ -439,13 +420,11 @@ async function refreshTaskSummaryForBranch(c: any, repoId: string, branchName: s if (!repositoryRecord) { return; } - const repository = await getOrCreateRepository(c, c.state.organizationId, repoId); - await expectQueueResponse<{ ok: true }>( - await repository.send( - repositoryWorkflowQueueName("repository.command.refreshTaskSummaryForBranch"), - { branchName, pullRequest }, - { wait: true, timeout: 10_000 }, - ), + const organization = await getOrCreateOrganization(c, c.state.organizationId); + await organization.send( + organizationWorkflowQueueName("organization.command.refreshTaskSummaryForBranch"), + { repoId, branchName, pullRequest }, + { wait: false }, ); } @@ -484,8 +463,9 @@ async function autoArchiveTaskForClosedPullRequest(c: any, row: any) { if (!repositoryRecord) { return; } - const repository = await getOrCreateRepository(c, c.state.organizationId, row.repoId); - const match = await repository.findTaskForBranch({ + const organization = await getOrCreateOrganization(c, c.state.organizationId); + const match = await organization.findTaskForBranch({ + repoId: row.repoId, branchName: row.headRefName, }); if (!match?.taskId) { @@ -628,54 +608,6 @@ async function listRepositoryBranchesForContext( })); } -async function resolveBranches( - c: any, - context: Awaited>, - repositories: GithubRepositoryRecord[], - onBatch?: (branches: GithubBranchRecord[]) => Promise, - onProgress?: (processedRepositoryCount: number, totalRepositoryCount: number) => Promise, -): Promise { - const batches = chunkItems(repositories, SYNC_REPOSITORY_BATCH_SIZE); - let processedRepositoryCount = 0; - - for (const batch of batches) { - const batchBranches = await runSyncStep(c, `github-sync-branches-${processedRepositoryCount / SYNC_REPOSITORY_BATCH_SIZE + 1}`, async () => - (await Promise.all(batch.map((repository) => listRepositoryBranchesForContext(context, repository)))).flat(), - ); - if (onBatch) { - await onBatch(batchBranches); - } - processedRepositoryCount += batch.length; - if (onProgress) { - await onProgress(processedRepositoryCount, repositories.length); - } - } -} - -async function resolvePullRequests( - c: any, - context: Awaited>, - repositories: GithubRepositoryRecord[], - onBatch?: (pullRequests: GithubPullRequestRecord[]) => Promise, - onProgress?: (processedRepositoryCount: number, totalRepositoryCount: number) => Promise, -): Promise { - const batches = chunkItems(repositories, SYNC_REPOSITORY_BATCH_SIZE); - let processedRepositoryCount = 0; - - for (const batch of batches) { - const batchPullRequests = await runSyncStep(c, `github-sync-pull-requests-${processedRepositoryCount / SYNC_REPOSITORY_BATCH_SIZE + 1}`, async () => - listPullRequestsForRepositories(context, batch), - ); - if (onBatch) { - await onBatch(batchPullRequests); - } - processedRepositoryCount += batch.length; - if (onProgress) { - await onProgress(processedRepositoryCount, repositories.length); - } - } -} - async function refreshRepositoryBranches( c: any, context: Awaited>, @@ -708,176 +640,261 @@ async function readAllPullRequestRows(c: any) { return await c.db.select().from(githubPullRequests).all(); } -export async function runFullSync(c: any, input: FullSyncInput = {}) { +/** Config returned by fullSyncSetup, passed to subsequent sync phases. */ +export interface FullSyncConfig { + syncGeneration: number; + startedAt: number; + totalRepositoryCount: number; + connectedAccount: string; + installationStatus: string; + installationId: number | null; + beforePrRows: any[]; +} + +async function readRepositoriesFromDb(c: any): Promise { + const rows = await c.db.select().from(githubRepositories).all(); + return rows.map((r: any) => ({ + fullName: r.fullName, + cloneUrl: r.cloneUrl, + private: Boolean(r.private), + defaultBranch: r.defaultBranch, + })); +} + +/** + * Phase 1: Discover repositories and persist them. + * Returns the config needed by all subsequent phases, or null if nothing to do. + */ +export async function fullSyncSetup(c: any, input: FullSyncInput = {}): Promise { const startedAt = Date.now(); - const beforeRows = await readAllPullRequestRows(c); + const beforePrRows = await readAllPullRequestRows(c); const currentMeta = await readMeta(c); - let context: Awaited> | null = null; - let syncGeneration = currentMeta.syncGeneration + 1; + const context = await getOrganizationContext(c, input); + const syncGeneration = currentMeta.syncGeneration + 1; - try { - context = await getOrganizationContext(c, input); - syncGeneration = currentMeta.syncGeneration + 1; + await publishSyncProgress(c, { + connectedAccount: context.connectedAccount, + installationStatus: context.installationStatus, + installationId: context.installationId, + syncStatus: "syncing", + lastSyncLabel: input.label?.trim() || "Syncing GitHub data...", + syncGeneration, + syncPhase: "discovering_repositories", + processedRepositoryCount: 0, + totalRepositoryCount: 0, + }); - await publishSyncProgress(c, { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "syncing", - lastSyncLabel: input.label?.trim() || "Syncing GitHub data...", - syncGeneration, - syncPhase: "discovering_repositories", - processedRepositoryCount: 0, - totalRepositoryCount: 0, - }); + const repositories = await resolveRepositories(c, context); + const totalRepositoryCount = repositories.length; - const repositories = await runSyncStep(c, "github-sync-repositories", async () => resolveRepositories(c, context)); - const totalRepositoryCount = repositories.length; + await publishSyncProgress(c, { + connectedAccount: context.connectedAccount, + installationStatus: context.installationStatus, + installationId: context.installationId, + syncStatus: "syncing", + lastSyncLabel: totalRepositoryCount > 0 ? `Importing ${totalRepositoryCount} repositories...` : "No repositories available", + syncGeneration, + syncPhase: "syncing_repositories", + processedRepositoryCount: totalRepositoryCount, + totalRepositoryCount, + }); - await publishSyncProgress(c, { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "syncing", - lastSyncLabel: totalRepositoryCount > 0 ? `Importing ${totalRepositoryCount} repositories...` : "No repositories available", - syncGeneration, - syncPhase: "syncing_repositories", - processedRepositoryCount: totalRepositoryCount, - totalRepositoryCount, - }); + await upsertRepositories(c, repositories, startedAt, syncGeneration); - await upsertRepositories(c, repositories, startedAt, syncGeneration); + const organization = await getOrCreateOrganization(c, c.state.organizationId); + await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { + connectedAccount: context.connectedAccount, + installationStatus: context.installationStatus, + installationId: context.installationId, + syncStatus: "syncing", + lastSyncLabel: totalRepositoryCount > 0 ? `Imported ${totalRepositoryCount} repositories` : "No repositories available", + lastSyncAt: currentMeta.lastSyncAt, + syncGeneration, + syncPhase: totalRepositoryCount > 0 ? "syncing_branches" : null, + processedRepositoryCount: 0, + totalRepositoryCount, + repositories, + }); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "syncing", - lastSyncLabel: totalRepositoryCount > 0 ? `Imported ${totalRepositoryCount} repositories` : "No repositories available", - lastSyncAt: currentMeta.lastSyncAt, - syncGeneration, - syncPhase: totalRepositoryCount > 0 ? "syncing_branches" : null, - processedRepositoryCount: 0, - totalRepositoryCount, - repositories, - }); + return { + syncGeneration, + startedAt, + totalRepositoryCount, + connectedAccount: context.connectedAccount, + installationStatus: context.installationStatus, + installationId: context.installationId, + beforePrRows, + }; +} - await resolveBranches( - c, - context, - repositories, - async (batchBranches) => { - await upsertBranches(c, batchBranches, startedAt, syncGeneration); - }, - async (processedRepositoryCount, repositoryCount) => { - await publishSyncProgress(c, { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "syncing", - lastSyncLabel: `Synced branches for ${processedRepositoryCount} of ${repositoryCount} repositories`, - syncGeneration, - syncPhase: "syncing_branches", - processedRepositoryCount, - totalRepositoryCount: repositoryCount, - }); - }, - ); +/** + * Phase 2 (per-batch): Fetch and upsert branches for one batch of repos. + * Returns true when all batches have been processed. + */ +export async function fullSyncBranchBatch(c: any, config: FullSyncConfig, batchIndex: number): Promise { + const repos = await readRepositoriesFromDb(c); + const batches = chunkItems(repos, SYNC_REPOSITORY_BATCH_SIZE); + if (batchIndex >= batches.length) return true; - await publishSyncProgress(c, { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "syncing", - lastSyncLabel: "Syncing GitHub members...", - syncGeneration, - syncPhase: "syncing_members", - processedRepositoryCount: totalRepositoryCount, - totalRepositoryCount, - }); + const batch = batches[batchIndex]!; + const context = await getOrganizationContext(c); + const batchBranches = (await Promise.all(batch.map((repo) => listRepositoryBranchesForContext(context, repo)))).flat(); + await upsertBranches(c, batchBranches, config.startedAt, config.syncGeneration); - const members = await runSyncStep(c, "github-sync-members", async () => resolveMembers(c, context)); - await upsertMembers(c, members, startedAt, syncGeneration); - await sweepMembers(c, syncGeneration); + const processedCount = Math.min((batchIndex + 1) * SYNC_REPOSITORY_BATCH_SIZE, repos.length); + await publishSyncProgress(c, { + connectedAccount: config.connectedAccount, + installationStatus: config.installationStatus, + installationId: config.installationId, + syncStatus: "syncing", + lastSyncLabel: `Synced branches for ${processedCount} of ${repos.length} repositories`, + syncGeneration: config.syncGeneration, + syncPhase: "syncing_branches", + processedRepositoryCount: processedCount, + totalRepositoryCount: repos.length, + }); - await resolvePullRequests( - c, - context, - repositories, - async (batchPullRequests) => { - await upsertPullRequests(c, batchPullRequests, syncGeneration); - }, - async (processedRepositoryCount, repositoryCount) => { - await publishSyncProgress(c, { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "syncing", - lastSyncLabel: `Synced pull requests for ${processedRepositoryCount} of ${repositoryCount} repositories`, - syncGeneration, - syncPhase: "syncing_pull_requests", - processedRepositoryCount, - totalRepositoryCount: repositoryCount, - }); - }, - ); + return false; +} - await sweepBranches(c, syncGeneration); - await sweepPullRequests(c, syncGeneration); - await sweepRepositories(c, syncGeneration); +/** + * Phase 3: Resolve, upsert, and sweep members. + */ +export async function fullSyncMembers(c: any, config: FullSyncConfig): Promise { + await publishSyncProgress(c, { + connectedAccount: config.connectedAccount, + installationStatus: config.installationStatus, + installationId: config.installationId, + syncStatus: "syncing", + lastSyncLabel: "Syncing GitHub members...", + syncGeneration: config.syncGeneration, + syncPhase: "syncing_members", + processedRepositoryCount: config.totalRepositoryCount, + totalRepositoryCount: config.totalRepositoryCount, + }); - await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "synced", - lastSyncLabel: totalRepositoryCount > 0 ? `Synced ${totalRepositoryCount} repositories` : "No repositories available", - lastSyncAt: startedAt, - syncGeneration, - syncPhase: null, - processedRepositoryCount: totalRepositoryCount, - totalRepositoryCount, - repositories, - }); + const context = await getOrganizationContext(c); + const members = await resolveMembers(c, context); + await upsertMembers(c, members, config.startedAt, config.syncGeneration); + await sweepMembers(c, config.syncGeneration); +} - const meta = await writeMeta(c, { - connectedAccount: context.connectedAccount, - installationStatus: context.installationStatus, - installationId: context.installationId, - syncStatus: "synced", - lastSyncLabel: totalRepositoryCount > 0 ? `Synced ${totalRepositoryCount} repositories` : "No repositories available", - lastSyncAt: startedAt, - syncGeneration, - syncPhase: null, - processedRepositoryCount: totalRepositoryCount, - totalRepositoryCount, - }); +/** + * Phase 4 (per-batch): Fetch and upsert pull requests for one batch of repos. + * Returns true when all batches have been processed. + */ +export async function fullSyncPullRequestBatch(c: any, config: FullSyncConfig, batchIndex: number): Promise { + const repos = readRepositoriesFromDb(c); + const batches = chunkItems(repos, SYNC_REPOSITORY_BATCH_SIZE); + if (batchIndex >= batches.length) return true; - const afterRows = await readAllPullRequestRows(c); - await emitPullRequestChangeEvents(c, beforeRows, afterRows); + const batch = batches[batchIndex]!; + const context = await getOrganizationContext(c); + const batchPRs = await listPullRequestsForRepositories(context, batch); + await upsertPullRequests(c, batchPRs, config.syncGeneration); - return { - ...meta, - repositoryCount: repositories.length, - memberCount: members.length, - pullRequestCount: afterRows.length, - }; - } catch (error) { - const message = error instanceof Error ? error.message : "GitHub import failed"; - await publishSyncProgress(c, { - connectedAccount: context?.connectedAccount ?? currentMeta.connectedAccount, - installationStatus: context?.installationStatus ?? currentMeta.installationStatus, - installationId: context?.installationId ?? currentMeta.installationId, - syncStatus: "error", - lastSyncLabel: message, - syncGeneration, - syncPhase: null, - processedRepositoryCount: 0, - totalRepositoryCount: 0, - }); - throw error; + const processedCount = Math.min((batchIndex + 1) * SYNC_REPOSITORY_BATCH_SIZE, repos.length); + await publishSyncProgress(c, { + connectedAccount: config.connectedAccount, + installationStatus: config.installationStatus, + installationId: config.installationId, + syncStatus: "syncing", + lastSyncLabel: `Synced pull requests for ${processedCount} of ${repos.length} repositories`, + syncGeneration: config.syncGeneration, + syncPhase: "syncing_pull_requests", + processedRepositoryCount: processedCount, + totalRepositoryCount: repos.length, + }); + + return false; +} + +/** + * Phase 5: Sweep stale data, publish final state, emit PR change events. + */ +export async function fullSyncFinalize(c: any, config: FullSyncConfig): Promise { + await sweepBranches(c, config.syncGeneration); + await sweepPullRequests(c, config.syncGeneration); + await sweepRepositories(c, config.syncGeneration); + + const repos = readRepositoriesFromDb(c); + const organization = await getOrCreateOrganization(c, c.state.organizationId); + await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { + connectedAccount: config.connectedAccount, + installationStatus: config.installationStatus, + installationId: config.installationId, + syncStatus: "synced", + lastSyncLabel: config.totalRepositoryCount > 0 ? `Synced ${config.totalRepositoryCount} repositories` : "No repositories available", + lastSyncAt: config.startedAt, + syncGeneration: config.syncGeneration, + syncPhase: null, + processedRepositoryCount: config.totalRepositoryCount, + totalRepositoryCount: config.totalRepositoryCount, + repositories: repos, + }); + + await writeMeta(c, { + connectedAccount: config.connectedAccount, + installationStatus: config.installationStatus, + installationId: config.installationId, + syncStatus: "synced", + lastSyncLabel: config.totalRepositoryCount > 0 ? `Synced ${config.totalRepositoryCount} repositories` : "No repositories available", + lastSyncAt: config.startedAt, + syncGeneration: config.syncGeneration, + syncPhase: null, + processedRepositoryCount: config.totalRepositoryCount, + totalRepositoryCount: config.totalRepositoryCount, + }); + + const afterRows = await readAllPullRequestRows(c); + await emitPullRequestChangeEvents(c, config.beforePrRows, afterRows); +} + +/** + * Error handler: publish error sync state when a full sync fails. + */ +/** + * Single-shot full sync: runs all phases (setup, branches, members, PRs, finalize) + * using native JS loops. This must NOT use workflow primitives (step/loop/sleep) + * because it runs inside a workflow step. See workflow.ts for context on why + * sub-loops cause HistoryDivergedError. + */ +export async function runFullSync(c: any, input: FullSyncInput = {}): Promise { + const config = await fullSyncSetup(c, input); + + // Branches — native loop over batches + for (let i = 0; ; i++) { + const done = await fullSyncBranchBatch(c, config, i); + if (done) break; } + + // Members + await fullSyncMembers(c, config); + + // Pull requests — native loop over batches + for (let i = 0; ; i++) { + const done = await fullSyncPullRequestBatch(c, config, i); + if (done) break; + } + + // Finalize + await fullSyncFinalize(c, config); +} + +export async function fullSyncError(c: any, error: unknown): Promise { + const currentMeta = await readMeta(c); + const message = error instanceof Error ? error.message : "GitHub import failed"; + await publishSyncProgress(c, { + connectedAccount: currentMeta.connectedAccount, + installationStatus: currentMeta.installationStatus, + installationId: currentMeta.installationId, + syncStatus: "error", + lastSyncLabel: message, + syncGeneration: currentMeta.syncGeneration, + syncPhase: null, + processedRepositoryCount: 0, + totalRepositoryCount: 0, + }); } export const githubData = actor({ @@ -941,214 +958,213 @@ export const githubData = actor({ })) .sort((left, right) => left.branchName.localeCompare(right.branchName)); }, - }, }); export async function reloadRepositoryMutation(c: any, input: { repoId: string }) { - const context = await getOrganizationContext(c); - const current = await c.db.select().from(githubRepositories).where(eq(githubRepositories.repoId, input.repoId)).get(); - if (!current) { - throw new Error(`Unknown GitHub repository: ${input.repoId}`); - } - const { appShell } = getActorRuntimeContext(); - const repository = - context.installationId != null - ? await appShell.github.getInstallationRepository(context.installationId, current.fullName) - : context.accessToken - ? await appShell.github.getUserRepository(context.accessToken, current.fullName) - : null; - if (!repository) { - throw new Error(`Unable to reload repository: ${current.fullName}`); - } + const context = await getOrganizationContext(c); + const current = await c.db.select().from(githubRepositories).where(eq(githubRepositories.repoId, input.repoId)).get(); + if (!current) { + throw new Error(`Unknown GitHub repository: ${input.repoId}`); + } + const { appShell } = getActorRuntimeContext(); + const repository = + context.installationId != null + ? await appShell.github.getInstallationRepository(context.installationId, current.fullName) + : context.accessToken + ? await appShell.github.getUserRepository(context.accessToken, current.fullName) + : null; + if (!repository) { + throw new Error(`Unable to reload repository: ${current.fullName}`); + } - const updatedAt = Date.now(); - const currentMeta = await readMeta(c); - await c.db - .insert(githubRepositories) - .values({ - repoId: input.repoId, - fullName: repository.fullName, - cloneUrl: repository.cloneUrl, - private: repository.private ? 1 : 0, - defaultBranch: repository.defaultBranch, - syncGeneration: currentMeta.syncGeneration, - updatedAt, - }) - .onConflictDoUpdate({ - target: githubRepositories.repoId, - set: { - fullName: repository.fullName, - cloneUrl: repository.cloneUrl, - private: repository.private ? 1 : 0, - defaultBranch: repository.defaultBranch, - syncGeneration: currentMeta.syncGeneration, - updatedAt, - }, - }) - .run(); - await refreshRepositoryBranches( - c, - context, - { - fullName: repository.fullName, - cloneUrl: repository.cloneUrl, - private: repository.private, - defaultBranch: repository.defaultBranch, - }, - updatedAt, - ); - - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", { - repoId: input.repoId, - remoteUrl: repository.cloneUrl, - }); - return { - repoId: input.repoId, + const updatedAt = Date.now(); + const currentMeta = await readMeta(c); + await c.db + .insert(githubRepositories) + .values({ + repoId: input.repoId, + fullName: repository.fullName, + cloneUrl: repository.cloneUrl, + private: repository.private ? 1 : 0, + defaultBranch: repository.defaultBranch, + syncGeneration: currentMeta.syncGeneration, + updatedAt, + }) + .onConflictDoUpdate({ + target: githubRepositories.repoId, + set: { fullName: repository.fullName, cloneUrl: repository.cloneUrl, - private: repository.private, + private: repository.private ? 1 : 0, defaultBranch: repository.defaultBranch, - }; + syncGeneration: currentMeta.syncGeneration, + updatedAt, + }, + }) + .run(); + await refreshRepositoryBranches( + c, + context, + { + fullName: repository.fullName, + cloneUrl: repository.cloneUrl, + private: repository.private, + defaultBranch: repository.defaultBranch, + }, + updatedAt, + ); + + const organization = await getOrCreateOrganization(c, c.state.organizationId); + await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", { + repoId: input.repoId, + remoteUrl: repository.cloneUrl, + }); + return { + repoId: input.repoId, + fullName: repository.fullName, + cloneUrl: repository.cloneUrl, + private: repository.private, + defaultBranch: repository.defaultBranch, + }; } export async function clearStateMutation(c: any, input: ClearStateInput) { - const beforeRows = await readAllPullRequestRows(c); - const currentMeta = await readMeta(c); - await c.db.delete(githubPullRequests).run(); - await c.db.delete(githubBranches).run(); - await c.db.delete(githubRepositories).run(); - await c.db.delete(githubMembers).run(); - await writeMeta(c, { - connectedAccount: input.connectedAccount, - installationStatus: input.installationStatus, - installationId: input.installationId, - syncStatus: "pending", - lastSyncLabel: input.label, - lastSyncAt: null, - syncGeneration: currentMeta.syncGeneration, - syncPhase: null, - processedRepositoryCount: 0, - totalRepositoryCount: 0, - }); + const beforeRows = await readAllPullRequestRows(c); + const currentMeta = await readMeta(c); + await c.db.delete(githubPullRequests).run(); + await c.db.delete(githubBranches).run(); + await c.db.delete(githubRepositories).run(); + await c.db.delete(githubMembers).run(); + await writeMeta(c, { + connectedAccount: input.connectedAccount, + installationStatus: input.installationStatus, + installationId: input.installationId, + syncStatus: "pending", + lastSyncLabel: input.label, + lastSyncAt: null, + syncGeneration: currentMeta.syncGeneration, + syncPhase: null, + processedRepositoryCount: 0, + totalRepositoryCount: 0, + }); - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { - connectedAccount: input.connectedAccount, - installationStatus: input.installationStatus, - installationId: input.installationId, - syncStatus: "pending", - lastSyncLabel: input.label, - lastSyncAt: null, - syncGeneration: currentMeta.syncGeneration, - syncPhase: null, - processedRepositoryCount: 0, - totalRepositoryCount: 0, - repositories: [], - }); - await emitPullRequestChangeEvents(c, beforeRows, []); + const organization = await getOrCreateOrganization(c, c.state.organizationId); + await sendOrganizationCommand(organization, "organization.command.github.data_projection.apply", { + connectedAccount: input.connectedAccount, + installationStatus: input.installationStatus, + installationId: input.installationId, + syncStatus: "pending", + lastSyncLabel: input.label, + lastSyncAt: null, + syncGeneration: currentMeta.syncGeneration, + syncPhase: null, + processedRepositoryCount: 0, + totalRepositoryCount: 0, + repositories: [], + }); + await emitPullRequestChangeEvents(c, beforeRows, []); } export async function handlePullRequestWebhookMutation(c: any, input: PullRequestWebhookInput) { - const beforeRows = await readAllPullRequestRows(c); - const repoId = repoIdFromRemote(input.repository.cloneUrl); - const currentRepository = await c.db.select().from(githubRepositories).where(eq(githubRepositories.repoId, repoId)).get(); - const updatedAt = Date.now(); - const currentMeta = await readMeta(c); - const state = normalizePrStatus(input.pullRequest); - const prId = `${repoId}#${input.pullRequest.number}`; + const beforeRows = await readAllPullRequestRows(c); + const repoId = repoIdFromRemote(input.repository.cloneUrl); + const currentRepository = await c.db.select().from(githubRepositories).where(eq(githubRepositories.repoId, repoId)).get(); + const updatedAt = Date.now(); + const currentMeta = await readMeta(c); + const state = normalizePrStatus(input.pullRequest); + const prId = `${repoId}#${input.pullRequest.number}`; - await c.db - .insert(githubRepositories) - .values({ - repoId, - fullName: input.repository.fullName, - cloneUrl: input.repository.cloneUrl, - private: input.repository.private ? 1 : 0, - defaultBranch: currentRepository?.defaultBranch ?? input.pullRequest.baseRefName ?? "main", + await c.db + .insert(githubRepositories) + .values({ + repoId, + fullName: input.repository.fullName, + cloneUrl: input.repository.cloneUrl, + private: input.repository.private ? 1 : 0, + defaultBranch: currentRepository?.defaultBranch ?? input.pullRequest.baseRefName ?? "main", + syncGeneration: currentMeta.syncGeneration, + updatedAt, + }) + .onConflictDoUpdate({ + target: githubRepositories.repoId, + set: { + fullName: input.repository.fullName, + cloneUrl: input.repository.cloneUrl, + private: input.repository.private ? 1 : 0, + defaultBranch: currentRepository?.defaultBranch ?? input.pullRequest.baseRefName ?? "main", + syncGeneration: currentMeta.syncGeneration, + updatedAt, + }, + }) + .run(); + + if (state === "CLOSED" || state === "MERGED") { + await c.db.delete(githubPullRequests).where(eq(githubPullRequests.prId, prId)).run(); + } else { + await c.db + .insert(githubPullRequests) + .values({ + prId, + repoId, + repoFullName: input.repository.fullName, + number: input.pullRequest.number, + title: input.pullRequest.title, + body: input.pullRequest.body ?? null, + state, + url: input.pullRequest.url, + headRefName: input.pullRequest.headRefName, + baseRefName: input.pullRequest.baseRefName, + authorLogin: input.pullRequest.authorLogin ?? null, + isDraft: input.pullRequest.isDraft ? 1 : 0, + syncGeneration: currentMeta.syncGeneration, + updatedAt, + }) + .onConflictDoUpdate({ + target: githubPullRequests.prId, + set: { + title: input.pullRequest.title, + body: input.pullRequest.body ?? null, + state, + url: input.pullRequest.url, + headRefName: input.pullRequest.headRefName, + baseRefName: input.pullRequest.baseRefName, + authorLogin: input.pullRequest.authorLogin ?? null, + isDraft: input.pullRequest.isDraft ? 1 : 0, syncGeneration: currentMeta.syncGeneration, updatedAt, - }) - .onConflictDoUpdate({ - target: githubRepositories.repoId, - set: { - fullName: input.repository.fullName, - cloneUrl: input.repository.cloneUrl, - private: input.repository.private ? 1 : 0, - defaultBranch: currentRepository?.defaultBranch ?? input.pullRequest.baseRefName ?? "main", - syncGeneration: currentMeta.syncGeneration, - updatedAt, - }, - }) - .run(); + }, + }) + .run(); + } - if (state === "CLOSED" || state === "MERGED") { - await c.db.delete(githubPullRequests).where(eq(githubPullRequests.prId, prId)).run(); - } else { - await c.db - .insert(githubPullRequests) - .values({ - prId, - repoId, - repoFullName: input.repository.fullName, - number: input.pullRequest.number, - title: input.pullRequest.title, - body: input.pullRequest.body ?? null, - state, - url: input.pullRequest.url, - headRefName: input.pullRequest.headRefName, - baseRefName: input.pullRequest.baseRefName, - authorLogin: input.pullRequest.authorLogin ?? null, - isDraft: input.pullRequest.isDraft ? 1 : 0, - syncGeneration: currentMeta.syncGeneration, - updatedAt, - }) - .onConflictDoUpdate({ - target: githubPullRequests.prId, - set: { - title: input.pullRequest.title, - body: input.pullRequest.body ?? null, - state, - url: input.pullRequest.url, - headRefName: input.pullRequest.headRefName, - baseRefName: input.pullRequest.baseRefName, - authorLogin: input.pullRequest.authorLogin ?? null, - isDraft: input.pullRequest.isDraft ? 1 : 0, - syncGeneration: currentMeta.syncGeneration, - updatedAt, - }, - }) - .run(); - } + await publishSyncProgress(c, { + connectedAccount: input.connectedAccount, + installationStatus: input.installationStatus, + installationId: input.installationId, + syncStatus: "synced", + lastSyncLabel: "GitHub webhook received", + lastSyncAt: updatedAt, + syncPhase: null, + processedRepositoryCount: 0, + totalRepositoryCount: 0, + }); - await publishSyncProgress(c, { - connectedAccount: input.connectedAccount, - installationStatus: input.installationStatus, - installationId: input.installationId, - syncStatus: "synced", - lastSyncLabel: "GitHub webhook received", - lastSyncAt: updatedAt, - syncPhase: null, - processedRepositoryCount: 0, - totalRepositoryCount: 0, + const organization = await getOrCreateOrganization(c, c.state.organizationId); + await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", { + repoId, + remoteUrl: input.repository.cloneUrl, + }); + + const afterRows = await readAllPullRequestRows(c); + await emitPullRequestChangeEvents(c, beforeRows, afterRows); + if (state === "CLOSED" || state === "MERGED") { + const previous = beforeRows.find((row) => row.prId === prId); + if (previous) { + await autoArchiveTaskForClosedPullRequest(c, { + ...previous, + state, }); - - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await sendOrganizationCommand(organization, "organization.command.github.repository_projection.apply", { - repoId, - remoteUrl: input.repository.cloneUrl, - }); - - const afterRows = await readAllPullRequestRows(c); - await emitPullRequestChangeEvents(c, beforeRows, afterRows); - if (state === "CLOSED" || state === "MERGED") { - const previous = beforeRows.find((row) => row.prId === prId); - if (previous) { - await autoArchiveTaskForClosedPullRequest(c, { - ...previous, - state, - }); - } - } + } + } } diff --git a/foundry/packages/backend/src/actors/github-data/workflow.ts b/foundry/packages/backend/src/actors/github-data/workflow.ts index ded4412..5e7c2cb 100644 --- a/foundry/packages/backend/src/actors/github-data/workflow.ts +++ b/foundry/packages/backend/src/actors/github-data/workflow.ts @@ -1,6 +1,6 @@ // @ts-nocheck import { Loop } from "rivetkit/workflow"; -import { clearStateMutation, handlePullRequestWebhookMutation, reloadRepositoryMutation, runFullSync } from "./index.js"; +import { clearStateMutation, handlePullRequestWebhookMutation, reloadRepositoryMutation, runFullSync, fullSyncError } from "./index.js"; export const GITHUB_DATA_QUEUE_NAMES = [ "githubData.command.syncRepos", @@ -16,22 +16,14 @@ export function githubDataWorkflowQueueName(name: GithubDataQueueName): GithubDa } export async function runGithubDataWorkflow(ctx: any): Promise { - const meta = await ctx.step({ - name: "github-data-read-meta", - timeout: 30_000, - run: async () => { - const { readMeta } = await import("./index.js"); - return await readMeta(ctx); - }, - }); - - if (meta.syncStatus === "pending") { - try { - await runFullSync(ctx, { label: "Importing repository catalog..." }); - } catch { - // Best-effort initial sync. runFullSync persists the failure state. - } - } + // The org actor sends a "githubData.command.syncRepos" queue message when it + // creates this actor, so the command loop below handles the initial sync. + // + // IMPORTANT: Do NOT use workflow sub-loops (ctx.loop) inside command handlers. + // RivetKit workflow sub-loops inside a parent loop cause HistoryDivergedError + // on the second iteration because entries from the first iteration's sub-loop + // are still in history but not visited during replay of iteration 2. Use native + // JS loops inside a single step instead. See .context/rivetkit-subloop-bug.md. await ctx.loop("github-data-command-loop", async (loopCtx: any) => { const msg = await loopCtx.queue.next("next-github-data-command", { @@ -44,25 +36,49 @@ export async function runGithubDataWorkflow(ctx: any): Promise { try { if (msg.name === "githubData.command.syncRepos") { - await runFullSync(loopCtx, msg.body); - await msg.complete({ ok: true }); + try { + // Single opaque step for the entire sync. Do NOT decompose into + // sub-loops/sub-steps — see comment at top of function. + await loopCtx.step({ + name: "github-data-sync-repos", + timeout: 5 * 60_000, + run: async () => runFullSync(loopCtx, msg.body), + }); + await msg.complete({ ok: true }); + } catch (error) { + await loopCtx.step("sync-repos-error", async () => fullSyncError(loopCtx, error)); + const message = error instanceof Error ? error.message : String(error); + await msg.complete({ error: message }).catch(() => {}); + } return Loop.continue(undefined); } if (msg.name === "githubData.command.reloadRepository") { - const result = await reloadRepositoryMutation(loopCtx, msg.body); + const result = await loopCtx.step({ + name: "github-data-reload-repository", + timeout: 5 * 60_000, + run: async () => reloadRepositoryMutation(loopCtx, msg.body), + }); await msg.complete(result); return Loop.continue(undefined); } if (msg.name === "githubData.command.clearState") { - await clearStateMutation(loopCtx, msg.body); + await loopCtx.step({ + name: "github-data-clear-state", + timeout: 60_000, + run: async () => clearStateMutation(loopCtx, msg.body), + }); await msg.complete({ ok: true }); return Loop.continue(undefined); } if (msg.name === "githubData.command.handlePullRequestWebhook") { - await handlePullRequestWebhookMutation(loopCtx, msg.body); + await loopCtx.step({ + name: "github-data-handle-pull-request-webhook", + timeout: 60_000, + run: async () => handlePullRequestWebhookMutation(loopCtx, msg.body), + }); await msg.complete({ ok: true }); return Loop.continue(undefined); } diff --git a/foundry/packages/backend/src/actors/handles.ts b/foundry/packages/backend/src/actors/handles.ts index 0020f77..2cc83d9 100644 --- a/foundry/packages/backend/src/actors/handles.ts +++ b/foundry/packages/backend/src/actors/handles.ts @@ -1,4 +1,4 @@ -import { auditLogKey, githubDataKey, organizationKey, repositoryKey, taskKey, taskSandboxKey, userKey } from "./keys.js"; +import { auditLogKey, githubDataKey, organizationKey, taskKey, taskSandboxKey, userKey } from "./keys.js"; export function actorClient(c: any) { return c.client(); @@ -20,19 +20,6 @@ export function getUser(c: any, userId: string) { return actorClient(c).user.get(userKey(userId)); } -export async function getOrCreateRepository(c: any, organizationId: string, repoId: string) { - return await actorClient(c).repository.getOrCreate(repositoryKey(organizationId, repoId), { - createWithInput: { - organizationId, - repoId, - }, - }); -} - -export function getRepository(c: any, organizationId: string, repoId: string) { - return actorClient(c).repository.get(repositoryKey(organizationId, repoId)); -} - export function getTask(c: any, organizationId: string, repoId: string, taskId: string) { return actorClient(c).task.get(taskKey(organizationId, repoId, taskId)); } @@ -43,11 +30,10 @@ export async function getOrCreateTask(c: any, organizationId: string, repoId: st }); } -export async function getOrCreateAuditLog(c: any, organizationId: string, repoId: string) { - return await actorClient(c).auditLog.getOrCreate(auditLogKey(organizationId, repoId), { +export async function getOrCreateAuditLog(c: any, organizationId: string) { + return await actorClient(c).auditLog.getOrCreate(auditLogKey(organizationId), { createWithInput: { organizationId, - repoId, }, }); } @@ -86,10 +72,6 @@ export function selfOrganization(c: any) { return actorClient(c).organization.getForId(c.actorId); } -export function selfRepository(c: any) { - return actorClient(c).repository.getForId(c.actorId); -} - export function selfUser(c: any) { return actorClient(c).user.getForId(c.actorId); } diff --git a/foundry/packages/backend/src/actors/index.ts b/foundry/packages/backend/src/actors/index.ts index 7717d05..52bb914 100644 --- a/foundry/packages/backend/src/actors/index.ts +++ b/foundry/packages/backend/src/actors/index.ts @@ -3,7 +3,6 @@ import { setup } from "rivetkit"; import { githubData } from "./github-data/index.js"; import { task } from "./task/index.js"; import { auditLog } from "./audit-log/index.js"; -import { repository } from "./repository/index.js"; import { taskSandbox } from "./sandbox/index.js"; import { organization } from "./organization/index.js"; import { logger } from "../logging.js"; @@ -23,7 +22,6 @@ export const registry = setup({ use: { user, organization, - repository, task, taskSandbox, auditLog, @@ -37,6 +35,5 @@ export * from "./user/index.js"; export * from "./github-data/index.js"; export * from "./task/index.js"; export * from "./keys.js"; -export * from "./repository/index.js"; export * from "./sandbox/index.js"; export * from "./organization/index.js"; diff --git a/foundry/packages/backend/src/actors/keys.ts b/foundry/packages/backend/src/actors/keys.ts index 537f3b2..03bd014 100644 --- a/foundry/packages/backend/src/actors/keys.ts +++ b/foundry/packages/backend/src/actors/keys.ts @@ -8,20 +8,17 @@ export function userKey(userId: string): ActorKey { return ["org", "app", "user", userId]; } -export function repositoryKey(organizationId: string, repoId: string): ActorKey { - return ["org", organizationId, "repository", repoId]; -} - export function taskKey(organizationId: string, repoId: string, taskId: string): ActorKey { - return ["org", organizationId, "repository", repoId, "task", taskId]; + return ["org", organizationId, "task", repoId, taskId]; } export function taskSandboxKey(organizationId: string, sandboxId: string): ActorKey { return ["org", organizationId, "sandbox", sandboxId]; } -export function auditLogKey(organizationId: string, repoId: string): ActorKey { - return ["org", organizationId, "repository", repoId, "audit-log"]; +/** One audit log per org (not per repo) — see audit-log/index.ts for rationale. */ +export function auditLogKey(organizationId: string): ActorKey { + return ["org", organizationId, "audit-log"]; } export function githubDataKey(organizationId: string): ActorKey { diff --git a/foundry/packages/backend/src/actors/organization/actions.ts b/foundry/packages/backend/src/actors/organization/actions.ts index 40756d4..860fdc9 100644 --- a/foundry/packages/backend/src/actors/organization/actions.ts +++ b/foundry/packages/backend/src/actors/organization/actions.ts @@ -9,10 +9,9 @@ import type { OrganizationSummarySnapshot, OrganizationUseInput, } from "@sandbox-agent/foundry-shared"; -import { getOrCreateRepository } from "../handles.js"; import { logActorWarning, resolveErrorMessage } from "../logging.js"; import { repoIdFromRemote } from "../../services/repo.js"; -import { organizationProfile, repos } from "./db/schema.js"; +import { organizationProfile, repos, taskSummaries } from "./db/schema.js"; import { organizationAppActions } from "./actions/app.js"; import { organizationBetterAuthActions } from "./actions/better-auth.js"; import { organizationOnboardingActions } from "./actions/onboarding.js"; @@ -20,8 +19,6 @@ import { organizationGithubActions } from "./actions/github.js"; import { organizationShellActions } from "./actions/organization.js"; import { organizationTaskActions } from "./actions/tasks.js"; -export { createTaskMutation } from "./actions/tasks.js"; - interface OrganizationState { organizationId: string; } @@ -78,9 +75,9 @@ function buildGithubSummary(profile: any, importedRepoCount: number): Organizati } /** - * Reads the organization sidebar snapshot by fanning out one level to the - * repository coordinators. Task summaries are repository-owned; organization - * only aggregates them. + * Reads the organization sidebar snapshot from local tables only — no fan-out + * to child actors. Task summaries are organization-owned and updated via push + * from task actors. */ async function getOrganizationSummarySnapshot(c: any): Promise { const profile = await c.db.select().from(organizationProfile).where(eq(organizationProfile.id, ORGANIZATION_PROFILE_ROW_ID)).get(); @@ -93,20 +90,35 @@ async function getOrganizationSummarySnapshot(c: any): Promise right.updatedAtMs - left.updatedAtMs); + + const summaryRows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all(); + const summaries: WorkspaceTaskSummary[] = summaryRows.map((row) => ({ + id: row.taskId, + repoId: row.repoId, + title: row.title, + status: row.status, + repoName: row.repoName, + updatedAtMs: row.updatedAtMs, + branch: row.branch ?? null, + pullRequest: row.pullRequestJson + ? (() => { + try { + return JSON.parse(row.pullRequestJson); + } catch { + return null; + } + })() + : null, + sessionsSummary: row.sessionsSummaryJson + ? (() => { + try { + return JSON.parse(row.sessionsSummaryJson); + } catch { + return []; + } + })() + : [], + })); return { organizationId: c.state.organizationId, diff --git a/foundry/packages/backend/src/actors/repository/actions.ts b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts similarity index 52% rename from foundry/packages/backend/src/actors/repository/actions.ts rename to foundry/packages/backend/src/actors/organization/actions/task-mutations.ts index 739112d..72edf92 100644 --- a/foundry/packages/backend/src/actors/repository/actions.ts +++ b/foundry/packages/backend/src/actors/organization/actions/task-mutations.ts @@ -10,17 +10,18 @@ import type { WorkspaceSessionSummary, WorkspaceTaskSummary, } from "@sandbox-agent/foundry-shared"; -import { getActorRuntimeContext } from "../context.js"; -import { getOrCreateAuditLog, getOrCreateOrganization, getOrCreateTask, getTask } from "../handles.js"; -import { organizationWorkflowQueueName } from "../organization/queues.js"; -import { taskWorkflowQueueName } from "../task/workflow/index.js"; -import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../services/create-flow.js"; -import { expectQueueResponse } from "../../services/queue.js"; -import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../logging.js"; -import { defaultSandboxProviderId } from "../../sandbox-config.js"; -import { repoMeta, taskIndex, tasks } from "./db/schema.js"; +import { getActorRuntimeContext } from "../../context.js"; +import { getGithubData, getOrCreateAuditLog, getOrCreateTask, getTask } from "../../handles.js"; +import { taskWorkflowQueueName } from "../../task/workflow/index.js"; +import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../../services/create-flow.js"; +import { expectQueueResponse } from "../../../services/queue.js"; +import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../../logging.js"; +import { defaultSandboxProviderId } from "../../../sandbox-config.js"; +import { taskIndex, taskSummaries, repos } from "../db/schema.js"; +import { refreshOrganizationSnapshotMutation } from "../actions.js"; interface CreateTaskCommand { + repoId: string; task: string; sandboxProviderId: SandboxProviderId; explicitTitle: string | null; @@ -29,19 +30,12 @@ interface CreateTaskCommand { } interface RegisterTaskBranchCommand { + repoId: string; taskId: string; branchName: string; requireExistingRemote?: boolean; } -interface ListTaskSummariesCommand { - includeArchived?: boolean; -} - -interface GetProjectedTaskSummaryCommand { - taskId: string; -} - function isStaleTaskReferenceError(error: unknown): boolean { const message = resolveErrorMessage(error); return isActorNotFoundError(error) || message.startsWith("Task not found:"); @@ -62,6 +56,7 @@ function parseJsonValue(value: string | null | undefined, fallback: T): T { function taskSummaryRowFromSummary(taskSummary: WorkspaceTaskSummary) { return { taskId: taskSummary.id, + repoId: taskSummary.repoId, title: taskSummary.title, status: taskSummary.status, repoName: taskSummary.repoName, @@ -72,10 +67,10 @@ function taskSummaryRowFromSummary(taskSummary: WorkspaceTaskSummary) { }; } -function taskSummaryFromRow(c: any, row: any): WorkspaceTaskSummary { +export function taskSummaryFromRow(repoId: string, row: any): WorkspaceTaskSummary { return { id: row.taskId, - repoId: c.state.repoId, + repoId, title: row.title, status: row.status, repoName: row.repoName, @@ -86,29 +81,17 @@ function taskSummaryFromRow(c: any, row: any): WorkspaceTaskSummary { }; } -async function upsertTaskSummary(c: any, taskSummary: WorkspaceTaskSummary): Promise { +export async function upsertTaskSummary(c: any, taskSummary: WorkspaceTaskSummary): Promise { await c.db - .insert(tasks) + .insert(taskSummaries) .values(taskSummaryRowFromSummary(taskSummary)) .onConflictDoUpdate({ - target: tasks.taskId, + target: taskSummaries.taskId, set: taskSummaryRowFromSummary(taskSummary), }) .run(); } -async function notifyOrganizationSnapshotChanged(c: any): Promise { - const organization = await getOrCreateOrganization(c, c.state.organizationId); - await expectQueueResponse<{ ok: true }>( - await organization.send(organizationWorkflowQueueName("organization.command.snapshot.broadcast"), {}, { wait: true, timeout: 10_000 }), - ); -} - -async function readStoredRemoteUrl(c: any): Promise { - const row = await c.db.select({ remoteUrl: repoMeta.remoteUrl }).from(repoMeta).where(eq(repoMeta.id, 1)).get(); - return row?.remoteUrl ?? null; -} - async function deleteStaleTaskIndexRow(c: any, taskId: string): Promise { try { await c.db.delete(taskIndex).where(eq(taskIndex.taskId, taskId)).run(); @@ -117,59 +100,43 @@ async function deleteStaleTaskIndexRow(c: any, taskId: string): Promise { } } -async function reinsertTaskIndexRow(c: any, taskId: string, branchName: string | null, updatedAt: number): Promise { - const now = Date.now(); - await c.db - .insert(taskIndex) - .values({ - taskId, - branchName, - createdAt: updatedAt || now, - updatedAt: now, - }) - .onConflictDoUpdate({ - target: taskIndex.taskId, - set: { - branchName, - updatedAt: now, - }, - }) - .run(); -} - -async function listKnownTaskBranches(c: any): Promise { - const rows = await c.db.select({ branchName: taskIndex.branchName }).from(taskIndex).where(isNotNull(taskIndex.branchName)).all(); +async function listKnownTaskBranches(c: any, repoId: string): Promise { + const rows = await c.db + .select({ branchName: taskIndex.branchName }) + .from(taskIndex) + .where(and(eq(taskIndex.repoId, repoId), isNotNull(taskIndex.branchName))) + .all(); return rows.map((row) => row.branchName).filter((value): value is string => typeof value === "string" && value.trim().length > 0); } -async function resolveGitHubRepository(c: any) { +async function resolveGitHubRepository(c: any, repoId: string) { const githubData = getGithubData(c, c.state.organizationId); - return await githubData.getRepository({ repoId: c.state.repoId }).catch(() => null); + return await githubData.getRepository({ repoId }).catch(() => null); } -async function listGitHubBranches(c: any): Promise> { +async function listGitHubBranches(c: any, repoId: string): Promise> { const githubData = getGithubData(c, c.state.organizationId); - return await githubData.listBranchesForRepository({ repoId: c.state.repoId }).catch(() => []); + return await githubData.listBranchesForRepository({ repoId }).catch(() => []); } -async function resolveRepositoryRemoteUrl(c: any): Promise { - const storedRemoteUrl = await readStoredRemoteUrl(c); - if (storedRemoteUrl) { - return storedRemoteUrl; +async function resolveRepositoryRemoteUrl(c: any, repoId: string): Promise { + const repoRow = await c.db.select({ remoteUrl: repos.remoteUrl }).from(repos).where(eq(repos.repoId, repoId)).get(); + if (repoRow?.remoteUrl) { + return repoRow.remoteUrl; } - const repository = await resolveGitHubRepository(c); + const repository = await resolveGitHubRepository(c, repoId); const remoteUrl = repository?.cloneUrl?.trim(); if (!remoteUrl) { - throw new Error(`Missing remote URL for repo ${c.state.repoId}`); + throw new Error(`Missing remote URL for repo ${repoId}`); } return remoteUrl; } export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promise { const organizationId = c.state.organizationId; - const repoId = c.state.repoId; - await resolveRepositoryRemoteUrl(c); + const repoId = cmd.repoId; + await resolveRepositoryRemoteUrl(c, repoId); const onBranch = cmd.onBranch?.trim() || null; const taskId = randomUUID(); let initialBranchName: string | null = null; @@ -180,12 +147,13 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis initialTitle = deriveFallbackTitle(cmd.task, cmd.explicitTitle ?? undefined); await registerTaskBranchMutation(c, { + repoId, taskId, branchName: onBranch, requireExistingRemote: true, }); } else { - const reservedBranches = await listKnownTaskBranches(c); + const reservedBranches = await listKnownTaskBranches(c, repoId); const resolved = resolveCreateFlowDecision({ task: cmd.task, explicitTitle: cmd.explicitTitle ?? undefined, @@ -202,6 +170,7 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis .insert(taskIndex) .values({ taskId, + repoId, branchName: resolved.branchName, createdAt: now, updatedAt: now, @@ -242,9 +211,9 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis try { await upsertTaskSummary(c, await taskHandle.getTaskSummary({})); - await notifyOrganizationSnapshotChanged(c); + await refreshOrganizationSnapshotMutation(c); } catch (error) { - logActorWarning("repository", "failed seeding task summary after task creation", { + logActorWarning("organization", "failed seeding task summary after task creation", { organizationId, repoId, taskId, @@ -252,11 +221,12 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis }); } - const auditLog = await getOrCreateAuditLog(c, organizationId, repoId); + const auditLog = await getOrCreateAuditLog(c, organizationId); await auditLog.send( "auditLog.command.append", { kind: "task.created", + repoId, taskId, payload: { repoId, @@ -272,7 +242,7 @@ export async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promis const taskSummary = await taskHandle.getTaskSummary({}); await upsertTaskSummary(c, taskSummary); } catch (error) { - logActorWarning("repository", "failed seeding repository task projection", { + logActorWarning("organization", "failed seeding organization task projection", { organizationId, repoId, taskId, @@ -292,13 +262,13 @@ export async function registerTaskBranchMutation(c: any, cmd: RegisterTaskBranch const existingOwner = await c.db .select({ taskId: taskIndex.taskId }) .from(taskIndex) - .where(and(eq(taskIndex.branchName, branchName), ne(taskIndex.taskId, cmd.taskId))) + .where(and(eq(taskIndex.branchName, branchName), eq(taskIndex.repoId, cmd.repoId), ne(taskIndex.taskId, cmd.taskId))) .get(); if (existingOwner) { let ownerMissing = false; try { - await getTask(c, c.state.organizationId, c.state.repoId, existingOwner.taskId).get(); + await getTask(c, c.state.organizationId, cmd.repoId, existingOwner.taskId).get(); } catch (error) { if (isStaleTaskReferenceError(error)) { ownerMissing = true; @@ -312,13 +282,13 @@ export async function registerTaskBranchMutation(c: any, cmd: RegisterTaskBranch } } - const branches = await listGitHubBranches(c); + const branches = await listGitHubBranches(c, cmd.repoId); const branchMatch = branches.find((branch) => branch.branchName === branchName) ?? null; if (cmd.requireExistingRemote && !branchMatch) { throw new Error(`Remote branch not found: ${branchName}`); } - const repository = await resolveGitHubRepository(c); + const repository = await resolveGitHubRepository(c, cmd.repoId); const defaultBranch = repository?.defaultBranch ?? "main"; const headSha = branchMatch?.commitSha ?? branches.find((branch) => branch.branchName === defaultBranch)?.commitSha ?? ""; @@ -327,6 +297,7 @@ export async function registerTaskBranchMutation(c: any, cmd: RegisterTaskBranch .insert(taskIndex) .values({ taskId: cmd.taskId, + repoId: cmd.repoId, branchName, createdAt: now, updatedAt: now, @@ -343,28 +314,59 @@ export async function registerTaskBranchMutation(c: any, cmd: RegisterTaskBranch return { branchName, headSha }; } -async function listTaskSummaries(c: any, includeArchived = false): Promise { - const rows = await c.db.select().from(tasks).orderBy(desc(tasks.updatedAtMs)).all(); - return rows - .map((row) => ({ - organizationId: c.state.organizationId, - repoId: c.state.repoId, - taskId: row.taskId, - branchName: row.branch ?? null, - title: row.title, - status: row.status, - updatedAt: row.updatedAtMs, - pullRequest: parseJsonValue(row.pullRequestJson, null), - })) - .filter((row) => includeArchived || row.status !== "archived"); +export async function applyTaskSummaryUpdateMutation(c: any, input: { taskSummary: WorkspaceTaskSummary }): Promise { + await upsertTaskSummary(c, input.taskSummary); + await refreshOrganizationSnapshotMutation(c); } -async function listWorkspaceTaskSummaries(c: any): Promise { - const rows = await c.db.select().from(tasks).orderBy(desc(tasks.updatedAtMs)).all(); - return rows.map((row) => taskSummaryFromRow(c, row)); +export async function removeTaskSummaryMutation(c: any, input: { taskId: string }): Promise { + await c.db.delete(taskSummaries).where(eq(taskSummaries.taskId, input.taskId)).run(); + await refreshOrganizationSnapshotMutation(c); } -function sortOverviewBranches( +export async function refreshTaskSummaryForBranchMutation( + c: any, + input: { repoId: string; branchName: string; pullRequest?: WorkspacePullRequestSummary | null }, +): Promise { + const pullRequest = input.pullRequest ?? null; + let rows = await c.db + .select({ taskId: taskSummaries.taskId }) + .from(taskSummaries) + .where(and(eq(taskSummaries.branch, input.branchName), eq(taskSummaries.repoId, input.repoId))) + .all(); + + if (rows.length === 0 && pullRequest) { + const { config } = getActorRuntimeContext(); + const created = await createTaskMutation(c, { + repoId: input.repoId, + task: pullRequest.title?.trim() || `Review ${input.branchName}`, + sandboxProviderId: defaultSandboxProviderId(config), + explicitTitle: pullRequest.title?.trim() || input.branchName, + explicitBranchName: null, + onBranch: input.branchName, + }); + rows = [{ taskId: created.taskId }]; + } + + for (const row of rows) { + try { + const task = getTask(c, c.state.organizationId, input.repoId, row.taskId); + await expectQueueResponse<{ ok: true }>( + await task.send(taskWorkflowQueueName("task.command.pull_request.sync"), { pullRequest }, { wait: true, timeout: 10_000 }), + ); + } catch (error) { + logActorWarning("organization", "failed refreshing task summary for branch", { + organizationId: c.state.organizationId, + repoId: input.repoId, + branchName: input.branchName, + taskId: row.taskId, + error: resolveErrorMessage(error), + }); + } + } +} + +export function sortOverviewBranches( branches: Array<{ branchName: string; commitSha: string; @@ -392,157 +394,126 @@ function sortOverviewBranches( }); } -export async function applyTaskSummaryUpdateMutation(c: any, input: { taskSummary: WorkspaceTaskSummary }): Promise { - await upsertTaskSummary(c, input.taskSummary); - await notifyOrganizationSnapshotChanged(c); -} - -export async function removeTaskSummaryMutation(c: any, input: { taskId: string }): Promise { - await c.db.delete(tasks).where(eq(tasks.taskId, input.taskId)).run(); - await notifyOrganizationSnapshotChanged(c); -} - -export async function refreshTaskSummaryForBranchMutation( - c: any, - input: { branchName: string; pullRequest?: WorkspacePullRequestSummary | null }, -): Promise { - const pullRequest = input.pullRequest ?? null; - let rows = await c.db.select({ taskId: tasks.taskId }).from(tasks).where(eq(tasks.branch, input.branchName)).all(); - - if (rows.length === 0 && pullRequest) { - const { config } = getActorRuntimeContext(); - const created = await createTaskMutation(c, { - task: pullRequest.title?.trim() || `Review ${input.branchName}`, - sandboxProviderId: defaultSandboxProviderId(config), - explicitTitle: pullRequest.title?.trim() || input.branchName, - explicitBranchName: null, - onBranch: input.branchName, - }); - rows = [{ taskId: created.taskId }]; - } - - for (const row of rows) { - try { - const task = getTask(c, c.state.organizationId, c.state.repoId, row.taskId); - await expectQueueResponse<{ ok: true }>( - await task.send( - taskWorkflowQueueName("task.command.pull_request.sync"), - { pullRequest }, - { wait: true, timeout: 10_000 }, - ), - ); - } catch (error) { - logActorWarning("repository", "failed refreshing task summary for branch", { - organizationId: c.state.organizationId, - repoId: c.state.repoId, - branchName: input.branchName, - taskId: row.taskId, - error: resolveErrorMessage(error), - }); - } - } - -} - -export const repositoryActions = { - async listReservedBranches(c: any): Promise { - return await listKnownTaskBranches(c); - }, - - async listTaskSummaries(c: any, cmd?: ListTaskSummariesCommand): Promise { - return await listTaskSummaries(c, cmd?.includeArchived === true); - }, - - async listWorkspaceTaskSummaries(c: any): Promise { - return await listWorkspaceTaskSummaries(c); - }, - - async getRepositoryMetadata(c: any): Promise<{ defaultBranch: string | null; fullName: string | null; remoteUrl: string }> { - const repository = await resolveGitHubRepository(c); - const remoteUrl = await resolveRepositoryRemoteUrl(c); - return { - defaultBranch: repository?.defaultBranch ?? null, - fullName: repository?.fullName ?? null, - remoteUrl, - }; - }, - - async getRepoOverview(c: any): Promise { - const now = Date.now(); - const repository = await resolveGitHubRepository(c); - const remoteUrl = await resolveRepositoryRemoteUrl(c); - const githubBranches = await listGitHubBranches(c).catch(() => []); - const taskRows = await c.db.select().from(tasks).all(); - - const taskMetaByBranch = new Map< - string, - { taskId: string; title: string | null; status: TaskRecord["status"] | null; updatedAt: number; pullRequest: WorkspacePullRequestSummary | null } - >(); - for (const row of taskRows) { - if (!row.branch) { - continue; - } - taskMetaByBranch.set(row.branch, { - taskId: row.taskId, - title: row.title ?? null, - status: row.status, - updatedAt: row.updatedAtMs, - pullRequest: parseJsonValue(row.pullRequestJson, null), - }); - } - - const branchMap = new Map(); - for (const branch of githubBranches) { - branchMap.set(branch.branchName, branch); - } - for (const branchName of taskMetaByBranch.keys()) { - if (!branchMap.has(branchName)) { - branchMap.set(branchName, { branchName, commitSha: "" }); - } - } - if (repository?.defaultBranch && !branchMap.has(repository.defaultBranch)) { - branchMap.set(repository.defaultBranch, { branchName: repository.defaultBranch, commitSha: "" }); - } - - const branches = sortOverviewBranches( - [...branchMap.values()].map((branch) => { - const taskMeta = taskMetaByBranch.get(branch.branchName); - const pr = taskMeta?.pullRequest ?? null; - return { - branchName: branch.branchName, - commitSha: branch.commitSha, - taskId: taskMeta?.taskId ?? null, - taskTitle: taskMeta?.title ?? null, - taskStatus: taskMeta?.status ?? null, - pullRequest: pr, - ciStatus: null, - updatedAt: Math.max(taskMeta?.updatedAt ?? 0, pr?.updatedAtMs ?? 0, now), - }; - }), - repository?.defaultBranch ?? null, - ); - - return { +export async function listTaskSummariesForRepo(c: any, repoId: string, includeArchived = false): Promise { + const rows = await c.db.select().from(taskSummaries).where(eq(taskSummaries.repoId, repoId)).orderBy(desc(taskSummaries.updatedAtMs)).all(); + return rows + .map((row) => ({ organizationId: c.state.organizationId, - repoId: c.state.repoId, - remoteUrl, - baseRef: repository?.defaultBranch ?? null, - fetchedAt: now, - branches, - }; - }, + repoId, + taskId: row.taskId, + branchName: row.branch ?? null, + title: row.title, + status: row.status, + updatedAt: row.updatedAtMs, + pullRequest: parseJsonValue(row.pullRequestJson, null), + })) + .filter((row) => includeArchived || row.status !== "archived"); +} - async findTaskForBranch(c: any, input: { branchName: string }): Promise<{ taskId: string | null }> { - const row = await c.db.select({ taskId: tasks.taskId }).from(tasks).where(eq(tasks.branch, input.branchName)).get(); - return { taskId: row?.taskId ?? null }; - }, +export async function listAllTaskSummaries(c: any, includeArchived = false): Promise { + const rows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all(); + return rows + .map((row) => ({ + organizationId: c.state.organizationId, + repoId: row.repoId, + taskId: row.taskId, + branchName: row.branch ?? null, + title: row.title, + status: row.status, + updatedAt: row.updatedAtMs, + pullRequest: parseJsonValue(row.pullRequestJson, null), + })) + .filter((row) => includeArchived || row.status !== "archived"); +} - async getProjectedTaskSummary(c: any, input: GetProjectedTaskSummaryCommand): Promise { - const taskId = input.taskId?.trim(); - if (!taskId) { - return null; +export async function listWorkspaceTaskSummaries(c: any): Promise { + const rows = await c.db.select().from(taskSummaries).orderBy(desc(taskSummaries.updatedAtMs)).all(); + return rows.map((row) => taskSummaryFromRow(row.repoId, row)); +} + +export async function getRepoOverviewFromOrg(c: any, repoId: string): Promise { + const now = Date.now(); + const repository = await resolveGitHubRepository(c, repoId); + const remoteUrl = await resolveRepositoryRemoteUrl(c, repoId); + const githubBranches = await listGitHubBranches(c, repoId).catch(() => []); + const taskRows = await c.db.select().from(taskSummaries).where(eq(taskSummaries.repoId, repoId)).all(); + + const taskMetaByBranch = new Map< + string, + { taskId: string; title: string | null; status: TaskRecord["status"] | null; updatedAt: number; pullRequest: WorkspacePullRequestSummary | null } + >(); + for (const row of taskRows) { + if (!row.branch) { + continue; } - const row = await c.db.select().from(tasks).where(eq(tasks.taskId, taskId)).get(); - return row ? taskSummaryFromRow(c, row) : null; - }, -}; + taskMetaByBranch.set(row.branch, { + taskId: row.taskId, + title: row.title ?? null, + status: row.status, + updatedAt: row.updatedAtMs, + pullRequest: parseJsonValue(row.pullRequestJson, null), + }); + } + + const branchMap = new Map(); + for (const branch of githubBranches) { + branchMap.set(branch.branchName, branch); + } + for (const branchName of taskMetaByBranch.keys()) { + if (!branchMap.has(branchName)) { + branchMap.set(branchName, { branchName, commitSha: "" }); + } + } + if (repository?.defaultBranch && !branchMap.has(repository.defaultBranch)) { + branchMap.set(repository.defaultBranch, { branchName: repository.defaultBranch, commitSha: "" }); + } + + const branches = sortOverviewBranches( + [...branchMap.values()].map((branch) => { + const taskMeta = taskMetaByBranch.get(branch.branchName); + const pr = taskMeta?.pullRequest ?? null; + return { + branchName: branch.branchName, + commitSha: branch.commitSha, + taskId: taskMeta?.taskId ?? null, + taskTitle: taskMeta?.title ?? null, + taskStatus: taskMeta?.status ?? null, + pullRequest: pr, + ciStatus: null, + updatedAt: Math.max(taskMeta?.updatedAt ?? 0, pr?.updatedAtMs ?? 0, now), + }; + }), + repository?.defaultBranch ?? null, + ); + + return { + organizationId: c.state.organizationId, + repoId, + remoteUrl, + baseRef: repository?.defaultBranch ?? null, + fetchedAt: now, + branches, + }; +} + +export async function getRepositoryMetadataFromOrg( + c: any, + repoId: string, +): Promise<{ defaultBranch: string | null; fullName: string | null; remoteUrl: string }> { + const repository = await resolveGitHubRepository(c, repoId); + const remoteUrl = await resolveRepositoryRemoteUrl(c, repoId); + return { + defaultBranch: repository?.defaultBranch ?? null, + fullName: repository?.fullName ?? null, + remoteUrl, + }; +} + +export async function findTaskForBranch(c: any, repoId: string, branchName: string): Promise<{ taskId: string | null }> { + const row = await c.db + .select({ taskId: taskSummaries.taskId }) + .from(taskSummaries) + .where(and(eq(taskSummaries.branch, branchName), eq(taskSummaries.repoId, repoId))) + .get(); + return { taskId: row?.taskId ?? null }; +} diff --git a/foundry/packages/backend/src/actors/organization/actions/tasks.ts b/foundry/packages/backend/src/actors/organization/actions/tasks.ts index 7d54bcf..be2cd62 100644 --- a/foundry/packages/backend/src/actors/organization/actions/tasks.ts +++ b/foundry/packages/backend/src/actors/organization/actions/tasks.ts @@ -21,14 +21,21 @@ import type { TaskWorkspaceUpdateDraftInput, } from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../../context.js"; -import { getOrCreateAuditLog, getOrCreateRepository, getTask as getTaskHandle, selfOrganization } from "../../handles.js"; +import { getOrCreateAuditLog, getTask as getTaskHandle, selfOrganization } from "../../handles.js"; import { defaultSandboxProviderId } from "../../../sandbox-config.js"; import { expectQueueResponse } from "../../../services/queue.js"; import { logActorWarning, resolveErrorMessage } from "../../logging.js"; -import { repositoryWorkflowQueueName } from "../../repository/workflow.js"; import { taskWorkflowQueueName } from "../../task/workflow/index.js"; import { repos } from "../db/schema.js"; import { organizationWorkflowQueueName } from "../queues.js"; +import { + createTaskMutation, + getRepoOverviewFromOrg, + getRepositoryMetadataFromOrg, + findTaskForBranch, + listTaskSummariesForRepo, + listAllTaskSummaries, +} from "./task-mutations.js"; function assertOrganization(c: { state: { organizationId: string } }, organizationId: string): void { if (organizationId !== c.state.organizationId) { @@ -36,40 +43,17 @@ function assertOrganization(c: { state: { organizationId: string } }, organizati } } -async function requireRepositoryForTask(c: any, repoId: string) { +async function requireRepoExists(c: any, repoId: string): Promise { const repoRow = await c.db.select({ repoId: repos.repoId }).from(repos).where(eq(repos.repoId, repoId)).get(); if (!repoRow) { throw new Error(`Unknown repo: ${repoId}`); } - return await getOrCreateRepository(c, c.state.organizationId, repoId); } async function requireWorkspaceTask(c: any, repoId: string, taskId: string) { return getTaskHandle(c, c.state.organizationId, repoId, taskId); } -async function collectAllTaskSummaries(c: any): Promise { - const repoRows = await c.db.select({ repoId: repos.repoId, remoteUrl: repos.remoteUrl }).from(repos).orderBy(desc(repos.updatedAt)).all(); - - const all: TaskSummary[] = []; - for (const row of repoRows) { - try { - const repository = await getOrCreateRepository(c, c.state.organizationId, row.repoId); - const snapshot = await repository.listTaskSummaries({ includeArchived: true }); - all.push(...snapshot); - } catch (error) { - logActorWarning("organization", "failed collecting tasks for repo", { - organizationId: c.state.organizationId, - repoId: row.repoId, - error: resolveErrorMessage(error), - }); - } - } - - all.sort((a, b) => b.updatedAt - a.updatedAt); - return all; -} - interface GetTaskInput { organizationId: string; repoId: string; @@ -85,40 +69,52 @@ interface RepoOverviewInput { repoId: string; } -export async function createTaskMutation(c: any, input: CreateTaskInput): Promise { - assertOrganization(c, input.organizationId); - - const { config } = getActorRuntimeContext(); - const sandboxProviderId = input.sandboxProviderId ?? defaultSandboxProviderId(config); - await requireRepositoryForTask(c, input.repoId); - - const repository = await getOrCreateRepository(c, c.state.organizationId, input.repoId); - return expectQueueResponse( - await repository.send( - repositoryWorkflowQueueName("repository.command.createTask"), - { - task: input.task, - sandboxProviderId, - explicitTitle: input.explicitTitle ?? null, - explicitBranchName: input.explicitBranchName ?? null, - onBranch: input.onBranch ?? null, - }, - { - wait: true, - timeout: 10_000, - }, - ), - ); -} +export { createTaskMutation }; export const organizationTaskActions = { async createTask(c: any, input: CreateTaskInput): Promise { + assertOrganization(c, input.organizationId); + const { config } = getActorRuntimeContext(); + const sandboxProviderId = input.sandboxProviderId ?? defaultSandboxProviderId(config); + await requireRepoExists(c, input.repoId); + const self = selfOrganization(c); return expectQueueResponse( - await self.send(organizationWorkflowQueueName("organization.command.createTask"), input, { - wait: true, - timeout: 10_000, - }), + await self.send( + organizationWorkflowQueueName("organization.command.createTask"), + { + repoId: input.repoId, + task: input.task, + sandboxProviderId, + explicitTitle: input.explicitTitle ?? null, + explicitBranchName: input.explicitBranchName ?? null, + onBranch: input.onBranch ?? null, + }, + { + wait: true, + timeout: 10_000, + }, + ), + ); + }, + + async materializeTask(c: any, input: { organizationId: string; repoId: string; virtualTaskId: string }): Promise { + assertOrganization(c, input.organizationId); + const { config } = getActorRuntimeContext(); + const self = selfOrganization(c); + return expectQueueResponse( + await self.send( + organizationWorkflowQueueName("organization.command.materializeTask"), + { + repoId: input.repoId, + task: input.virtualTaskId, + sandboxProviderId: defaultSandboxProviderId(config), + explicitTitle: null, + explicitBranchName: null, + onBranch: null, + }, + { wait: true, timeout: 10_000 }, + ), ); }, @@ -275,23 +271,22 @@ export const organizationTaskActions = { async getRepoOverview(c: any, input: RepoOverviewInput): Promise { assertOrganization(c, input.organizationId); - const repository = await requireRepositoryForTask(c, input.repoId); - return await repository.getRepoOverview({}); + await requireRepoExists(c, input.repoId); + return await getRepoOverviewFromOrg(c, input.repoId); }, async listTasks(c: any, input: ListTasksInput): Promise { assertOrganization(c, input.organizationId); if (input.repoId) { - const repository = await requireRepositoryForTask(c, input.repoId); - return await repository.listTaskSummaries({ includeArchived: true }); + return await listTaskSummariesForRepo(c, input.repoId, true); } - return await collectAllTaskSummaries(c); + return await listAllTaskSummaries(c, true); }, async switchTask(c: any, input: { repoId: string; taskId: string }): Promise { - await requireRepositoryForTask(c, input.repoId); + await requireRepoExists(c, input.repoId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); const record = await h.get(); const switched = await expectQueueResponse<{ switchTarget: string }>( @@ -309,41 +304,24 @@ export const organizationTaskActions = { async auditLog(c: any, input: HistoryQueryInput): Promise { assertOrganization(c, input.organizationId); - const limit = input.limit ?? 20; - const repoRows = await c.db.select({ repoId: repos.repoId }).from(repos).orderBy(desc(repos.updatedAt)).all(); - const allEvents: AuditLogEvent[] = []; - - for (const row of repoRows) { - try { - const auditLog = await getOrCreateAuditLog(c, c.state.organizationId, row.repoId); - const items = await auditLog.list({ - branch: input.branch, - taskId: input.taskId, - limit, - }); - allEvents.push(...items); - } catch (error) { - logActorWarning("organization", "audit log lookup failed for repo", { - organizationId: c.state.organizationId, - repoId: row.repoId, - error: resolveErrorMessage(error), - }); - } - } - - allEvents.sort((a, b) => b.createdAt - a.createdAt); - return allEvents.slice(0, limit); + const auditLog = await getOrCreateAuditLog(c, c.state.organizationId); + return await auditLog.list({ + repoId: input.repoId, + branch: input.branch, + taskId: input.taskId, + limit: input.limit ?? 20, + }); }, async getTask(c: any, input: GetTaskInput): Promise { assertOrganization(c, input.organizationId); - await requireRepositoryForTask(c, input.repoId); + await requireRepoExists(c, input.repoId); return await getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId).get(); }, async attachTask(c: any, input: TaskProxyActionInput): Promise<{ target: string; sessionId: string | null }> { assertOrganization(c, input.organizationId); - await requireRepositoryForTask(c, input.repoId); + await requireRepoExists(c, input.repoId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); return await expectQueueResponse<{ target: string; sessionId: string | null }>( await h.send(taskWorkflowQueueName("task.command.attach"), { reason: input.reason }, { wait: true, timeout: 10_000 }), @@ -352,36 +330,44 @@ export const organizationTaskActions = { async pushTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepositoryForTask(c, input.repoId); + await requireRepoExists(c, input.repoId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); await h.send(taskWorkflowQueueName("task.command.push"), { reason: input.reason }, { wait: false }); }, async syncTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepositoryForTask(c, input.repoId); + await requireRepoExists(c, input.repoId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); await h.send(taskWorkflowQueueName("task.command.sync"), { reason: input.reason }, { wait: false }); }, async mergeTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepositoryForTask(c, input.repoId); + await requireRepoExists(c, input.repoId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); await h.send(taskWorkflowQueueName("task.command.merge"), { reason: input.reason }, { wait: false }); }, async archiveTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepositoryForTask(c, input.repoId); + await requireRepoExists(c, input.repoId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); await h.send(taskWorkflowQueueName("task.command.archive"), { reason: input.reason }, { wait: false }); }, async killTask(c: any, input: TaskProxyActionInput): Promise { assertOrganization(c, input.organizationId); - await requireRepositoryForTask(c, input.repoId); + await requireRepoExists(c, input.repoId); const h = getTaskHandle(c, c.state.organizationId, input.repoId, input.taskId); await h.send(taskWorkflowQueueName("task.command.kill"), { reason: input.reason }, { wait: false }); }, + + async getRepositoryMetadata(c: any, input: { repoId: string }): Promise<{ defaultBranch: string | null; fullName: string | null; remoteUrl: string }> { + return await getRepositoryMetadataFromOrg(c, input.repoId); + }, + + async findTaskForBranch(c: any, input: { repoId: string; branchName: string }): Promise<{ taskId: string | null }> { + return await findTaskForBranch(c, input.repoId, input.branchName); + }, }; diff --git a/foundry/packages/backend/src/actors/organization/app-shell.ts b/foundry/packages/backend/src/actors/organization/app-shell.ts index 1522685..4a95681 100644 --- a/foundry/packages/backend/src/actors/organization/app-shell.ts +++ b/foundry/packages/backend/src/actors/organization/app-shell.ts @@ -147,9 +147,7 @@ async function sendOrganizationCommand( name: Parameters[0], body: unknown, ): Promise { - return expectQueueResponse( - await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 }), - ); + return expectQueueResponse(await organization.send(organizationWorkflowQueueName(name), body, { wait: true, timeout: 60_000 })); } export async function getOrganizationState(organization: any) { @@ -1129,7 +1127,6 @@ export const organizationAppActions = { ); return { ok: true }; }, - }; export async function syncOrganizationShellFromGithubMutation( @@ -1188,6 +1185,7 @@ export async function syncOrganizationShellFromGithubMutation( githubAccountType: input.githubAccountType, displayName: input.displayName, slug, + defaultModel: existing?.defaultModel ?? DEFAULT_WORKSPACE_MODEL_ID, primaryDomain: existing?.primaryDomain ?? (input.kind === "personal" ? "personal" : `${slug}.github`), autoImportRepos: existing?.autoImportRepos ?? 1, repoImportStatus: existing?.repoImportStatus ?? "not_started", @@ -1261,6 +1259,28 @@ export async function syncOrganizationShellFromGithubMutation( }) .run(); + // Auto-trigger github-data sync when the org has a connected installation + // but hasn't synced yet. This handles the common case where a personal + // account or an org with an existing GitHub App installation signs in for + // the first time on a fresh DB — the installation webhook already fired + // before the org actor existed, so we kick off the sync here instead. + const needsInitialSync = installationStatus === "connected" && syncStatus === "pending"; + if (needsInitialSync) { + const githubData = await getOrCreateGithubData(c, organizationId); + await githubData.send( + githubDataWorkflowQueueName("githubData.command.syncRepos"), + { + connectedAccount: input.githubLogin, + installationStatus: "connected", + installationId: input.installationId, + githubLogin: input.githubLogin, + kind: input.kind, + label: "Initial repository sync...", + }, + { wait: false }, + ); + } + return { organizationId }; } diff --git a/foundry/packages/backend/src/actors/organization/db/drizzle/0001_add_auth_and_task_tables.sql b/foundry/packages/backend/src/actors/organization/db/drizzle/0001_add_auth_and_task_tables.sql new file mode 100644 index 0000000..74d63ef --- /dev/null +++ b/foundry/packages/backend/src/actors/organization/db/drizzle/0001_add_auth_and_task_tables.sql @@ -0,0 +1,50 @@ +CREATE TABLE `auth_session_index` ( + `session_id` text PRIMARY KEY NOT NULL, + `session_token` text NOT NULL, + `user_id` text NOT NULL, + `created_at` integer NOT NULL, + `updated_at` integer NOT NULL +); +--> statement-breakpoint +CREATE TABLE `auth_email_index` ( + `email` text PRIMARY KEY NOT NULL, + `user_id` text NOT NULL, + `updated_at` integer NOT NULL +); +--> statement-breakpoint +CREATE TABLE `auth_account_index` ( + `id` text PRIMARY KEY NOT NULL, + `provider_id` text NOT NULL, + `account_id` text NOT NULL, + `user_id` text NOT NULL, + `updated_at` integer NOT NULL +); +--> statement-breakpoint +CREATE TABLE `auth_verification` ( + `id` text PRIMARY KEY NOT NULL, + `identifier` text NOT NULL, + `value` text NOT NULL, + `expires_at` integer NOT NULL, + `created_at` integer NOT NULL, + `updated_at` integer NOT NULL +); +--> statement-breakpoint +CREATE TABLE `task_index` ( + `task_id` text PRIMARY KEY NOT NULL, + `repo_id` text NOT NULL, + `branch_name` text, + `created_at` integer NOT NULL, + `updated_at` integer NOT NULL +); +--> statement-breakpoint +CREATE TABLE `task_summaries` ( + `task_id` text PRIMARY KEY NOT NULL, + `repo_id` text NOT NULL, + `title` text NOT NULL, + `status` text NOT NULL, + `repo_name` text NOT NULL, + `updated_at_ms` integer NOT NULL, + `branch` text, + `pull_request_json` text, + `sessions_summary_json` text DEFAULT '[]' NOT NULL +); diff --git a/foundry/packages/backend/src/actors/organization/db/drizzle/meta/_journal.json b/foundry/packages/backend/src/actors/organization/db/drizzle/meta/_journal.json index e3668a1..41ea23b 100644 --- a/foundry/packages/backend/src/actors/organization/db/drizzle/meta/_journal.json +++ b/foundry/packages/backend/src/actors/organization/db/drizzle/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1773376221152, "tag": "0000_melted_viper", "breakpoints": true + }, + { + "idx": 1, + "version": "6", + "when": 1773840000000, + "tag": "0001_add_auth_and_task_tables", + "breakpoints": true } ] } diff --git a/foundry/packages/backend/src/actors/organization/db/migrations.ts b/foundry/packages/backend/src/actors/organization/db/migrations.ts index 350ada0..a7e8abc 100644 --- a/foundry/packages/backend/src/actors/organization/db/migrations.ts +++ b/foundry/packages/backend/src/actors/organization/db/migrations.ts @@ -12,8 +12,8 @@ const journal = { }, { idx: 1, - when: 1773907201000, - tag: "0001_github_sync_progress", + when: 1773840000000, + tag: "0001_add_auth_and_task_tables", breakpoints: true, }, ], @@ -61,7 +61,7 @@ CREATE TABLE \`organization_members\` ( ); --> statement-breakpoint CREATE TABLE \`organization_profile\` ( - \`id\` integer PRIMARY KEY NOT NULL, + \`id\` text PRIMARY KEY NOT NULL, \`kind\` text NOT NULL, \`github_account_id\` text NOT NULL, \`github_login\` text NOT NULL, @@ -69,6 +69,7 @@ CREATE TABLE \`organization_profile\` ( \`display_name\` text NOT NULL, \`slug\` text NOT NULL, \`primary_domain\` text NOT NULL, + \`default_model\` text NOT NULL, \`auto_import_repos\` integer NOT NULL, \`repo_import_status\` text NOT NULL, \`github_connected_account\` text NOT NULL, @@ -79,6 +80,10 @@ CREATE TABLE \`organization_profile\` ( \`github_last_sync_at\` integer, \`github_last_webhook_at\` integer, \`github_last_webhook_event\` text, + \`github_sync_generation\` integer NOT NULL, + \`github_sync_phase\` text, + \`github_processed_repository_count\` integer NOT NULL, + \`github_total_repository_count\` integer NOT NULL, \`stripe_customer_id\` text, \`stripe_subscription_id\` text, \`stripe_price_id\` text, @@ -89,8 +94,7 @@ CREATE TABLE \`organization_profile\` ( \`billing_renewal_at\` text, \`billing_payment_method_label\` text NOT NULL, \`created_at\` integer NOT NULL, - \`updated_at\` integer NOT NULL, - CONSTRAINT \`organization_profile_singleton_id_check\` CHECK(\`id\` = 1) + \`updated_at\` integer NOT NULL ); --> statement-breakpoint CREATE TABLE \`repos\` ( @@ -111,13 +115,56 @@ CREATE TABLE \`stripe_lookup\` ( \`updated_at\` integer NOT NULL ); `, - m0001: `ALTER TABLE \`organization_profile\` ADD \`github_sync_generation\` integer NOT NULL DEFAULT 0; + m0001: `CREATE TABLE \`auth_session_index\` ( + \`session_id\` text PRIMARY KEY NOT NULL, + \`session_token\` text NOT NULL, + \`user_id\` text NOT NULL, + \`created_at\` integer NOT NULL, + \`updated_at\` integer NOT NULL +); --> statement-breakpoint -ALTER TABLE \`organization_profile\` ADD \`github_sync_phase\` text; +CREATE TABLE \`auth_email_index\` ( + \`email\` text PRIMARY KEY NOT NULL, + \`user_id\` text NOT NULL, + \`updated_at\` integer NOT NULL +); --> statement-breakpoint -ALTER TABLE \`organization_profile\` ADD \`github_processed_repository_count\` integer NOT NULL DEFAULT 0; +CREATE TABLE \`auth_account_index\` ( + \`id\` text PRIMARY KEY NOT NULL, + \`provider_id\` text NOT NULL, + \`account_id\` text NOT NULL, + \`user_id\` text NOT NULL, + \`updated_at\` integer NOT NULL +); --> statement-breakpoint -ALTER TABLE \`organization_profile\` ADD \`github_total_repository_count\` integer NOT NULL DEFAULT 0; +CREATE TABLE \`auth_verification\` ( + \`id\` text PRIMARY KEY NOT NULL, + \`identifier\` text NOT NULL, + \`value\` text NOT NULL, + \`expires_at\` integer NOT NULL, + \`created_at\` integer NOT NULL, + \`updated_at\` integer NOT NULL +); +--> statement-breakpoint +CREATE TABLE \`task_index\` ( + \`task_id\` text PRIMARY KEY NOT NULL, + \`repo_id\` text NOT NULL, + \`branch_name\` text, + \`created_at\` integer NOT NULL, + \`updated_at\` integer NOT NULL +); +--> statement-breakpoint +CREATE TABLE \`task_summaries\` ( + \`task_id\` text PRIMARY KEY NOT NULL, + \`repo_id\` text NOT NULL, + \`title\` text NOT NULL, + \`status\` text NOT NULL, + \`repo_name\` text NOT NULL, + \`updated_at_ms\` integer NOT NULL, + \`branch\` text, + \`pull_request_json\` text, + \`sessions_summary_json\` text DEFAULT '[]' NOT NULL +); `, } as const, }; diff --git a/foundry/packages/backend/src/actors/organization/db/schema.ts b/foundry/packages/backend/src/actors/organization/db/schema.ts index ddda4d7..2f19fe5 100644 --- a/foundry/packages/backend/src/actors/organization/db/schema.ts +++ b/foundry/packages/backend/src/actors/organization/db/schema.ts @@ -1,12 +1,12 @@ import { check, integer, sqliteTable, text } from "rivetkit/db/drizzle"; import { sql } from "drizzle-orm"; +import { DEFAULT_WORKSPACE_MODEL_ID } from "@sandbox-agent/foundry-shared"; // SQLite is per organization actor instance, so no organizationId column needed. /** - * Coordinator index of RepositoryActor instances. - * The organization actor is the coordinator for repositories. - * Rows are created/removed when repos are added/removed from the organization. + * Repository catalog. Rows are created/removed when repos are added/removed + * from the organization via GitHub sync. */ export const repos = sqliteTable("repos", { repoId: text("repo_id").notNull().primaryKey(), @@ -15,6 +15,44 @@ export const repos = sqliteTable("repos", { updatedAt: integer("updated_at").notNull(), }); +/** + * Coordinator index of TaskActor instances. + * The organization actor is the direct coordinator for tasks (not a per-repo + * actor) because the sidebar needs to query all tasks across all repos on + * every snapshot. With many repos, fanning out to N repo actors on the hot + * read path is too expensive — owning the index here keeps that a single + * local table scan. Each row maps a taskId to its repo and immutable branch + * name. Used for branch conflict checking (scoped by repoId) and + * task-by-branch lookups. + */ +export const taskIndex = sqliteTable("task_index", { + taskId: text("task_id").notNull().primaryKey(), + repoId: text("repo_id").notNull(), + branchName: text("branch_name"), + createdAt: integer("created_at").notNull(), + updatedAt: integer("updated_at").notNull(), +}); + +/** + * Organization-owned materialized task summary projection. + * Task actors push summary updates directly to the organization coordinator, + * which keeps this table local for fast list/lookups without fan-out. + * Same rationale as taskIndex: the sidebar repeatedly reads all tasks across + * all repos, so the org must own the materialized view to avoid O(repos) + * actor fan-out on the hot read path. + */ +export const taskSummaries = sqliteTable("task_summaries", { + taskId: text("task_id").notNull().primaryKey(), + repoId: text("repo_id").notNull(), + title: text("title").notNull(), + status: text("status").notNull(), + repoName: text("repo_name").notNull(), + updatedAtMs: integer("updated_at_ms").notNull(), + branch: text("branch"), + pullRequestJson: text("pull_request_json"), + sessionsSummaryJson: text("sessions_summary_json").notNull().default("[]"), +}); + export const organizationProfile = sqliteTable( "organization_profile", { @@ -25,6 +63,7 @@ export const organizationProfile = sqliteTable( githubAccountType: text("github_account_type").notNull(), displayName: text("display_name").notNull(), slug: text("slug").notNull(), + defaultModel: text("default_model").notNull().default(DEFAULT_WORKSPACE_MODEL_ID), primaryDomain: text("primary_domain").notNull(), autoImportRepos: integer("auto_import_repos").notNull(), repoImportStatus: text("repo_import_status").notNull(), diff --git a/foundry/packages/backend/src/actors/organization/queues.ts b/foundry/packages/backend/src/actors/organization/queues.ts index 986a234..4fbfe0a 100644 --- a/foundry/packages/backend/src/actors/organization/queues.ts +++ b/foundry/packages/backend/src/actors/organization/queues.ts @@ -1,5 +1,10 @@ export const ORGANIZATION_QUEUE_NAMES = [ "organization.command.createTask", + "organization.command.materializeTask", + "organization.command.registerTaskBranch", + "organization.command.applyTaskSummaryUpdate", + "organization.command.removeTaskSummary", + "organization.command.refreshTaskSummaryForBranch", "organization.command.snapshot.broadcast", "organization.command.syncGithubSession", "organization.command.better_auth.session_index.upsert", diff --git a/foundry/packages/backend/src/actors/organization/workflow.ts b/foundry/packages/backend/src/actors/organization/workflow.ts index 85084d9..5e10c99 100644 --- a/foundry/packages/backend/src/actors/organization/workflow.ts +++ b/foundry/packages/backend/src/actors/organization/workflow.ts @@ -1,15 +1,20 @@ // @ts-nocheck import { Loop } from "rivetkit/workflow"; import { logActorWarning, resolveErrorMessage } from "../logging.js"; -import type { CreateTaskInput } from "@sandbox-agent/foundry-shared"; import { applyGithubDataProjectionMutation, applyGithubRepositoryProjectionMutation, applyGithubSyncProgressMutation, - createTaskMutation, recordGithubWebhookReceiptMutation, refreshOrganizationSnapshotMutation, } from "./actions.js"; +import { + applyTaskSummaryUpdateMutation, + createTaskMutation, + refreshTaskSummaryForBranchMutation, + registerTaskBranchMutation, + removeTaskSummaryMutation, +} from "./actions/task-mutations.js"; import { betterAuthCreateVerificationMutation, betterAuthDeleteAccountIndexMutation, @@ -52,12 +57,62 @@ export async function runOrganizationWorkflow(ctx: any): Promise { const result = await loopCtx.step({ name: "organization-create-task", timeout: 5 * 60_000, - run: async () => createTaskMutation(loopCtx, msg.body as CreateTaskInput), + run: async () => createTaskMutation(loopCtx, msg.body), }); await msg.complete(result); return Loop.continue(undefined); } + if (msg.name === "organization.command.materializeTask") { + const result = await loopCtx.step({ + name: "organization-materialize-task", + timeout: 5 * 60_000, + run: async () => createTaskMutation(loopCtx, msg.body), + }); + await msg.complete(result); + return Loop.continue(undefined); + } + + if (msg.name === "organization.command.registerTaskBranch") { + const result = await loopCtx.step({ + name: "organization-register-task-branch", + timeout: 60_000, + run: async () => registerTaskBranchMutation(loopCtx, msg.body), + }); + await msg.complete(result); + return Loop.continue(undefined); + } + + if (msg.name === "organization.command.applyTaskSummaryUpdate") { + await loopCtx.step({ + name: "organization-apply-task-summary-update", + timeout: 30_000, + run: async () => applyTaskSummaryUpdateMutation(loopCtx, msg.body), + }); + await msg.complete({ ok: true }); + return Loop.continue(undefined); + } + + if (msg.name === "organization.command.removeTaskSummary") { + await loopCtx.step({ + name: "organization-remove-task-summary", + timeout: 30_000, + run: async () => removeTaskSummaryMutation(loopCtx, msg.body), + }); + await msg.complete({ ok: true }); + return Loop.continue(undefined); + } + + if (msg.name === "organization.command.refreshTaskSummaryForBranch") { + await loopCtx.step({ + name: "organization-refresh-task-summary-for-branch", + timeout: 60_000, + run: async () => refreshTaskSummaryForBranchMutation(loopCtx, msg.body), + }); + await msg.complete({ ok: true }); + return Loop.continue(undefined); + } + if (msg.name === "organization.command.snapshot.broadcast") { await loopCtx.step({ name: "organization-snapshot-broadcast", diff --git a/foundry/packages/backend/src/actors/repository/db/db.ts b/foundry/packages/backend/src/actors/repository/db/db.ts deleted file mode 100644 index 79bed8e..0000000 --- a/foundry/packages/backend/src/actors/repository/db/db.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { db } from "rivetkit/db/drizzle"; -import * as schema from "./schema.js"; -import migrations from "./migrations.js"; - -export const repositoryDb = db({ schema, migrations }); diff --git a/foundry/packages/backend/src/actors/repository/db/drizzle.config.ts b/foundry/packages/backend/src/actors/repository/db/drizzle.config.ts deleted file mode 100644 index 8b9a1b9..0000000 --- a/foundry/packages/backend/src/actors/repository/db/drizzle.config.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { defineConfig } from "rivetkit/db/drizzle"; - -export default defineConfig({ - out: "./src/actors/repository/db/drizzle", - schema: "./src/actors/repository/db/schema.ts", -}); diff --git a/foundry/packages/backend/src/actors/repository/db/drizzle/0000_useful_la_nuit.sql b/foundry/packages/backend/src/actors/repository/db/drizzle/0000_useful_la_nuit.sql deleted file mode 100644 index 14bc071..0000000 --- a/foundry/packages/backend/src/actors/repository/db/drizzle/0000_useful_la_nuit.sql +++ /dev/null @@ -1,12 +0,0 @@ -CREATE TABLE `repo_meta` ( - `id` integer PRIMARY KEY NOT NULL, - `remote_url` text NOT NULL, - `updated_at` integer NOT NULL -); ---> statement-breakpoint -CREATE TABLE `task_index` ( - `task_id` text PRIMARY KEY NOT NULL, - `branch_name` text, - `created_at` integer NOT NULL, - `updated_at` integer NOT NULL -); diff --git a/foundry/packages/backend/src/actors/repository/db/drizzle/meta/_journal.json b/foundry/packages/backend/src/actors/repository/db/drizzle/meta/_journal.json deleted file mode 100644 index deebd86..0000000 --- a/foundry/packages/backend/src/actors/repository/db/drizzle/meta/_journal.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "version": "7", - "dialect": "sqlite", - "entries": [ - { - "idx": 0, - "version": "6", - "when": 1773376221848, - "tag": "0000_useful_la_nuit", - "breakpoints": true - } - ] -} diff --git a/foundry/packages/backend/src/actors/repository/db/migrations.ts b/foundry/packages/backend/src/actors/repository/db/migrations.ts deleted file mode 100644 index f0a2543..0000000 --- a/foundry/packages/backend/src/actors/repository/db/migrations.ts +++ /dev/null @@ -1,45 +0,0 @@ -// This file is generated by src/actors/_scripts/generate-actor-migrations.ts. -// Source of truth is drizzle-kit output under ./drizzle (meta/_journal.json + *.sql). -// Do not hand-edit this file. - -const journal = { - entries: [ - { - idx: 0, - when: 1773376221848, - tag: "0000_useful_la_nuit", - breakpoints: true, - }, - ], -} as const; - -export default { - journal, - migrations: { - m0000: `CREATE TABLE \`repo_meta\` ( - \`id\` integer PRIMARY KEY NOT NULL, - \`remote_url\` text NOT NULL, - \`updated_at\` integer NOT NULL, - CONSTRAINT \`repo_meta_singleton_id_check\` CHECK(\`id\` = 1) -); ---> statement-breakpoint -CREATE TABLE \`task_index\` ( - \`task_id\` text PRIMARY KEY NOT NULL, - \`branch_name\` text, - \`created_at\` integer NOT NULL, - \`updated_at\` integer NOT NULL -); ---> statement-breakpoint -CREATE TABLE \`tasks\` ( - \`task_id\` text PRIMARY KEY NOT NULL, - \`title\` text NOT NULL, - \`status\` text NOT NULL, - \`repo_name\` text NOT NULL, - \`updated_at_ms\` integer NOT NULL, - \`branch\` text, - \`pull_request_json\` text, - \`sessions_summary_json\` text DEFAULT '[]' NOT NULL -); -`, - } as const, -}; diff --git a/foundry/packages/backend/src/actors/repository/db/schema.ts b/foundry/packages/backend/src/actors/repository/db/schema.ts deleted file mode 100644 index 4a07332..0000000 --- a/foundry/packages/backend/src/actors/repository/db/schema.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { check, integer, sqliteTable, text } from "rivetkit/db/drizzle"; -import { sql } from "drizzle-orm"; - -// SQLite is per repository actor instance (organizationId+repoId). - -export const repoMeta = sqliteTable( - "repo_meta", - { - id: integer("id").primaryKey(), - remoteUrl: text("remote_url").notNull(), - updatedAt: integer("updated_at").notNull(), - }, - (table) => [check("repo_meta_singleton_id_check", sql`${table.id} = 1`)], -); - -/** - * Coordinator index of TaskActor instances. - * The repository actor is the coordinator for tasks. Each row maps a - * taskId to its immutable branch name. Used for branch conflict checking - * and task-by-branch lookups. Rows are inserted at task creation. - */ -export const taskIndex = sqliteTable("task_index", { - taskId: text("task_id").notNull().primaryKey(), - branchName: text("branch_name"), - createdAt: integer("created_at").notNull(), - updatedAt: integer("updated_at").notNull(), -}); - -/** - * Repository-owned materialized task summary projection. - * Task actors push summary updates to their direct repository coordinator, - * which keeps this table local for fast list/lookups without fan-out. - */ -export const tasks = sqliteTable("tasks", { - taskId: text("task_id").notNull().primaryKey(), - title: text("title").notNull(), - status: text("status").notNull(), - repoName: text("repo_name").notNull(), - updatedAtMs: integer("updated_at_ms").notNull(), - branch: text("branch"), - pullRequestJson: text("pull_request_json"), - sessionsSummaryJson: text("sessions_summary_json").notNull().default("[]"), -}); diff --git a/foundry/packages/backend/src/actors/repository/index.ts b/foundry/packages/backend/src/actors/repository/index.ts deleted file mode 100644 index 01feee0..0000000 --- a/foundry/packages/backend/src/actors/repository/index.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { actor, queue } from "rivetkit"; -import { workflow } from "rivetkit/workflow"; -import { repositoryDb } from "./db/db.js"; -import { repositoryActions } from "./actions.js"; -import { REPOSITORY_QUEUE_NAMES, runRepositoryWorkflow } from "./workflow.js"; - -export interface RepositoryInput { - organizationId: string; - repoId: string; -} - -export const repository = actor({ - db: repositoryDb, - queues: Object.fromEntries(REPOSITORY_QUEUE_NAMES.map((name) => [name, queue()])), - options: { - name: "Repository", - icon: "folder", - actionTimeout: 5 * 60_000, - }, - createState: (_c, input: RepositoryInput) => ({ - organizationId: input.organizationId, - repoId: input.repoId, - }), - actions: repositoryActions, - run: workflow(runRepositoryWorkflow), -}); diff --git a/foundry/packages/backend/src/actors/repository/workflow.ts b/foundry/packages/backend/src/actors/repository/workflow.ts deleted file mode 100644 index 3bf517c..0000000 --- a/foundry/packages/backend/src/actors/repository/workflow.ts +++ /dev/null @@ -1,97 +0,0 @@ -// @ts-nocheck -import { Loop } from "rivetkit/workflow"; -import { logActorWarning, resolveErrorMessage } from "../logging.js"; -import { - applyTaskSummaryUpdateMutation, - createTaskMutation, - refreshTaskSummaryForBranchMutation, - registerTaskBranchMutation, - removeTaskSummaryMutation, -} from "./actions.js"; - -export const REPOSITORY_QUEUE_NAMES = [ - "repository.command.createTask", - "repository.command.registerTaskBranch", - "repository.command.applyTaskSummaryUpdate", - "repository.command.removeTaskSummary", - "repository.command.refreshTaskSummaryForBranch", -] as const; - -export type RepositoryQueueName = (typeof REPOSITORY_QUEUE_NAMES)[number]; - -export function repositoryWorkflowQueueName(name: RepositoryQueueName): RepositoryQueueName { - return name; -} - -export async function runRepositoryWorkflow(ctx: any): Promise { - await ctx.loop("repository-command-loop", async (loopCtx: any) => { - const msg = await loopCtx.queue.next("next-repository-command", { - names: [...REPOSITORY_QUEUE_NAMES], - completable: true, - }); - if (!msg) { - return Loop.continue(undefined); - } - - try { - if (msg.name === "repository.command.createTask") { - const result = await loopCtx.step({ - name: "repository-create-task", - timeout: 5 * 60_000, - run: async () => createTaskMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "repository.command.registerTaskBranch") { - const result = await loopCtx.step({ - name: "repository-register-task-branch", - timeout: 60_000, - run: async () => registerTaskBranchMutation(loopCtx, msg.body), - }); - await msg.complete(result); - return Loop.continue(undefined); - } - - if (msg.name === "repository.command.applyTaskSummaryUpdate") { - await loopCtx.step({ - name: "repository-apply-task-summary-update", - timeout: 30_000, - run: async () => applyTaskSummaryUpdateMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "repository.command.removeTaskSummary") { - await loopCtx.step({ - name: "repository-remove-task-summary", - timeout: 30_000, - run: async () => removeTaskSummaryMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - - if (msg.name === "repository.command.refreshTaskSummaryForBranch") { - await loopCtx.step({ - name: "repository-refresh-task-summary-for-branch", - timeout: 60_000, - run: async () => refreshTaskSummaryForBranchMutation(loopCtx, msg.body), - }); - await msg.complete({ ok: true }); - return Loop.continue(undefined); - } - } catch (error) { - const message = resolveErrorMessage(error); - logActorWarning("repository", "repository workflow command failed", { - queueName: msg.name, - error: message, - }); - await msg.complete({ error: message }).catch(() => {}); - } - - return Loop.continue(undefined); - }); -} diff --git a/foundry/packages/backend/src/actors/task/db/migrations.ts b/foundry/packages/backend/src/actors/task/db/migrations.ts index 141786e..1e6ff76 100644 --- a/foundry/packages/backend/src/actors/task/db/migrations.ts +++ b/foundry/packages/backend/src/actors/task/db/migrations.ts @@ -23,6 +23,7 @@ export default { \`task\` text NOT NULL, \`sandbox_provider_id\` text NOT NULL, \`status\` text NOT NULL, + \`pull_request_json\` text, \`created_at\` integer NOT NULL, \`updated_at\` integer NOT NULL, CONSTRAINT "task_singleton_id_check" CHECK("task"."id" = 1) @@ -45,7 +46,6 @@ CREATE TABLE \`task_sandboxes\` ( \`sandbox_actor_id\` text, \`switch_target\` text NOT NULL, \`cwd\` text, - \`status_message\` text, \`created_at\` integer NOT NULL, \`updated_at\` integer NOT NULL ); diff --git a/foundry/packages/backend/src/actors/task/workflow/common.ts b/foundry/packages/backend/src/actors/task/workflow/common.ts index 88b96ec..d942e01 100644 --- a/foundry/packages/backend/src/actors/task/workflow/common.ts +++ b/foundry/packages/backend/src/actors/task/workflow/common.ts @@ -2,7 +2,7 @@ import { eq } from "drizzle-orm"; import type { TaskRecord, TaskStatus } from "@sandbox-agent/foundry-shared"; import { task as taskTable, taskRuntime, taskSandboxes } from "../db/schema.js"; -import { getOrCreateAuditLog, getOrCreateRepository } from "../../handles.js"; +import { getOrCreateAuditLog, getOrCreateOrganization } from "../../handles.js"; import { broadcastTaskUpdate } from "../workspace.js"; export const TASK_ROW_ID = 1; @@ -66,7 +66,7 @@ export async function setTaskState(ctx: any, status: TaskStatus): Promise export async function getCurrentRecord(ctx: any): Promise { const db = ctx.db; - const repository = await getOrCreateRepository(ctx, ctx.state.organizationId, ctx.state.repoId); + const organization = await getOrCreateOrganization(ctx, ctx.state.organizationId); const row = await db .select({ branchName: taskTable.branchName, @@ -88,7 +88,7 @@ export async function getCurrentRecord(ctx: any): Promise { throw new Error(`Task not found: ${ctx.state.taskId}`); } - const repositoryMetadata = await repository.getRepositoryMetadata({}); + const repositoryMetadata = await organization.getRepositoryMetadata({ repoId: ctx.state.repoId }); let pullRequest = null; if (row.pullRequestJson) { try { @@ -139,11 +139,12 @@ export async function getCurrentRecord(ctx: any): Promise { export async function appendAuditLog(ctx: any, kind: string, payload: Record): Promise { const row = await ctx.db.select({ branchName: taskTable.branchName }).from(taskTable).where(eq(taskTable.id, TASK_ROW_ID)).get(); - const auditLog = await getOrCreateAuditLog(ctx, ctx.state.organizationId, ctx.state.repoId); + const auditLog = await getOrCreateAuditLog(ctx, ctx.state.organizationId); await auditLog.send( "auditLog.command.append", { kind, + repoId: ctx.state.repoId, taskId: ctx.state.taskId, branchName: row?.branchName ?? null, payload, diff --git a/foundry/packages/backend/src/actors/task/workspace.ts b/foundry/packages/backend/src/actors/task/workspace.ts index 40bc22e..4f27c58 100644 --- a/foundry/packages/backend/src/actors/task/workspace.ts +++ b/foundry/packages/backend/src/actors/task/workspace.ts @@ -2,16 +2,21 @@ import { randomUUID } from "node:crypto"; import { basename, dirname } from "node:path"; import { asc, eq } from "drizzle-orm"; -import { DEFAULT_WORKSPACE_MODEL_GROUPS, DEFAULT_WORKSPACE_MODEL_ID, workspaceAgentForModel, workspaceSandboxAgentIdForModel } from "@sandbox-agent/foundry-shared"; +import { + DEFAULT_WORKSPACE_MODEL_GROUPS, + DEFAULT_WORKSPACE_MODEL_ID, + workspaceAgentForModel, + workspaceSandboxAgentIdForModel, +} from "@sandbox-agent/foundry-shared"; import { getActorRuntimeContext } from "../context.js"; -import { getOrCreateRepository, getOrCreateTaskSandbox, getOrCreateUser, getTaskSandbox, selfTask } from "../handles.js"; +import { getOrCreateOrganization, getOrCreateTaskSandbox, getOrCreateUser, getTaskSandbox, selfTask } from "../handles.js"; import { SANDBOX_REPO_CWD } from "../sandbox/index.js"; import { resolveSandboxProviderId } from "../../sandbox-config.js"; import { getBetterAuthService } from "../../services/better-auth.js"; import { expectQueueResponse } from "../../services/queue.js"; import { resolveOrganizationGithubAuth } from "../../services/github-auth.js"; import { githubRepoFullNameFromRemote } from "../../services/repo.js"; -import { repositoryWorkflowQueueName } from "../repository/workflow.js"; +import { organizationWorkflowQueueName } from "../organization/queues.js"; import { userWorkflowQueueName } from "../user/workflow.js"; import { task as taskTable, taskRuntime, taskSandboxes, taskWorkspaceSessions } from "./db/schema.js"; import { getCurrentRecord } from "./workflow/common.js"; @@ -66,8 +71,8 @@ function repoLabelFromRemote(remoteUrl: string): string { } async function getRepositoryMetadata(c: any): Promise<{ defaultBranch: string | null; fullName: string | null; remoteUrl: string }> { - const repository = await getOrCreateRepository(c, c.state.organizationId, c.state.repoId); - return await repository.getRepositoryMetadata({}); + const organization = await getOrCreateOrganization(c, c.state.organizationId); + return await organization.getRepositoryMetadata({ repoId: c.state.repoId }); } function parseDraftAttachments(value: string | null | undefined): Array { @@ -970,10 +975,10 @@ export async function getSessionDetail(c: any, sessionId: string, authSessionId? * - Broadcast full detail/session payloads down to direct task subscribers. */ export async function broadcastTaskUpdate(c: any, options?: { sessionId?: string }): Promise { - const repository = await getOrCreateRepository(c, c.state.organizationId, c.state.repoId); + const organization = await getOrCreateOrganization(c, c.state.organizationId); await expectQueueResponse<{ ok: true }>( - await repository.send( - repositoryWorkflowQueueName("repository.command.applyTaskSummaryUpdate"), + await organization.send( + organizationWorkflowQueueName("organization.command.applyTaskSummaryUpdate"), { taskSummary: await buildTaskSummary(c) }, { wait: true, timeout: 10_000 }, ), diff --git a/foundry/packages/backend/src/services/branch-name-prefixes.ts b/foundry/packages/backend/src/services/branch-name-prefixes.ts new file mode 100644 index 0000000..aaccaee --- /dev/null +++ b/foundry/packages/backend/src/services/branch-name-prefixes.ts @@ -0,0 +1,584 @@ +// Auto-generated list of branch name prefixes. +// Source: McMaster-Carr product catalog. +export const BRANCH_NAME_PREFIXES: readonly string[] = [ + "abrasive-blasters", + "ac-motors", + "access-doors", + "adjustable-handles", + "aerosol-paint", + "air-cleaners", + "air-cylinders", + "air-filters", + "air-hose", + "air-knives", + "air-nozzles", + "air-regulators", + "air-ride-wheels", + "air-slides", + "alligator-clips", + "alloy-steel", + "aluminum-honeycomb", + "angle-indicators", + "antiseize-lubricants", + "antislip-fluid", + "backlight-panel-kits", + "ball-bearings", + "ball-end-mills", + "ball-joint-linkages", + "ball-transfers", + "band-clamps", + "band-saw-blades", + "bar-clamps", + "bar-grating", + "barbed-hose-fittings", + "barbed-tube-fittings", + "basket-strainers", + "batch-cans", + "battery-chargers", + "battery-holders", + "bead-chain", + "beam-clamps", + "belt-conveyors", + "bench-scales", + "bench-vises", + "bin-boxes", + "bin-storage", + "binding-posts", + "blank-tags", + "blasting-cabinets", + "blind-rivets", + "bluetooth-padlocks", + "boring-lathe-tools", + "box-reducers", + "box-wrenches", + "braided-hose", + "brass-pipe-fittings", + "breather-vents", + "butt-splices", + "c-clamps", + "cable-cutters", + "cable-holders", + "cable-tie-mounts", + "cable-ties", + "cam-handles", + "cam-latches", + "cam-locks", + "cap-nuts", + "captive-panel-screws", + "carbide-burs", + "carbide-inserts", + "carbon-fiber", + "carbon-steel", + "cardstock-tags", + "carriage-bolts", + "cast-acrylic", + "cast-iron", + "cast-nylon", + "casting-compounds", + "ceiling-lights", + "ceramic-adhesives", + "chain-slings", + "check-valves", + "chemical-hose", + "chemistry-meters", + "chemistry-testing", + "chip-clearing-tools", + "chucking-reamers", + "cinching-straps", + "circuit-breakers", + "circular-saw-blades", + "circular-saws", + "clamping-hangers", + "clevis-pins", + "clevis-rod-ends", + "clip-on-nuts", + "coaxial-connectors", + "coaxial-cords", + "coiled-spring-pins", + "compact-connectors", + "computer-adapters", + "concrete-adhesives", + "concrete-repair", + "contour-transfers", + "conveyor-belt-lacing", + "conveyor-belting", + "conveyor-brushes", + "conveyor-rollers", + "coolant-hose", + "copper-tube-fittings", + "copper-tubing", + "cord-grips", + "cord-reels", + "cotter-pins", + "coupling-nuts", + "cpvc-pipe-fittings", + "cup-brushes", + "cutoff-wheels", + "cylinder-hones", + "cylinder-racks", + "cylinder-trucks", + "data-cable", + "data-connectors", + "dc-motors", + "dead-blow-hammers", + "delrin-acetal-resin", + "desiccant-air-dryers", + "desktop-cranes", + "dial-calipers", + "dial-indicators", + "die-springs", + "direct-heaters", + "disconnect-switches", + "dispensing-needles", + "dispensing-pumps", + "disposable-clothing", + "disposable-gloves", + "document-protectors", + "door-closers", + "door-handles", + "door-holders", + "dowel-pins", + "drafting-equipment", + "drain-cleaners", + "drainage-mats", + "draw-latches", + "drawer-cabinets", + "drawer-slides", + "drill-bit-sets", + "drill-bits", + "drill-bushings", + "drill-chucks", + "drill-presses", + "drilling-screws", + "drinking-fountains", + "drive-anchors", + "drive-rollers", + "drive-shafts", + "drum-faucets", + "drum-pumps", + "drum-top-vacuums", + "drum-trucks", + "dry-box-gloves", + "dry-erase-boards", + "dry-film-lubricants", + "duct-fans", + "duct-hose", + "duct-tape", + "dust-collectors", + "dustless-chalk", + "edge-trim", + "electric-actuators", + "electric-drills", + "electric-drum-pumps", + "electric-mixers", + "electrical-switches", + "electrical-tape", + "electronic-calipers", + "enclosure-heaters", + "enclosure-panels", + "ethernet-cords", + "exhaust-fans", + "exit-lights", + "expansion-joints", + "expansion-plugs", + "extension-cords", + "extension-springs", + "fabric-snaps", + "fan-blades", + "fep-tubing", + "fiberglass-grating", + "file-holders", + "filter-bag-housings", + "filter-bags", + "filter-cartridges", + "fire-fighting-hose", + "first-aid-supplies", + "fixture-clamps", + "flange-locknuts", + "flange-mount-seals", + "flap-sanding-discs", + "flap-sanding-wheels", + "flared-tube-fittings", + "flashing-lights", + "flat-washers", + "flexible-shafts", + "flexible-shank-burs", + "flexible-trays", + "float-valves", + "floor-locks", + "floor-marking-tape", + "floor-scales", + "floor-squeegees", + "flow-sights", + "flow-switches", + "flowmeter-totalizers", + "foot-switches", + "force-gauges", + "fume-exhausters", + "garbage-bags", + "garden-hose", + "gas-hose", + "gas-regulators", + "gas-springs", + "gauge-blocks", + "glass-sights", + "gold-wire", + "grab-latches", + "grease-fittings", + "grinding-bits", + "grinding-wheels", + "hand-brushes", + "hand-chain-hoists", + "hand-reamers", + "hand-trucks", + "hand-wheels", + "hand-winches", + "hanging-scales", + "hard-hats", + "hardened-shafts", + "hardness-testers", + "heat-exchangers", + "heat-guns", + "heat-lamps", + "heat-sealable-bags", + "heat-set-inserts", + "heat-shrink-tubing", + "heat-sinks", + "heated-scrapers", + "helical-inserts", + "hex-bit-sockets", + "hex-head-screws", + "hex-nuts", + "high-accuracy-rulers", + "high-amp-relays", + "high-vacuum-filters", + "high-vacuum-sights", + "hinge-adjusters", + "hoist-rings", + "hole-saws", + "hose-couplings", + "hose-reels", + "hot-melt-glue", + "hydraulic-cylinders", + "hydraulic-hose", + "hydraulic-jacks", + "iec-connectors", + "immersion-heaters", + "impression-foam", + "indicating-lights", + "inflatable-wedges", + "ink-markers", + "insertion-heaters", + "inspection-mirrors", + "instrument-carts", + "insulation-jacketing", + "jam-removers", + "jigsaw-blades", + "key-cabinets", + "key-locking-inserts", + "key-stock", + "keyed-drive-shafts", + "keyseat-end-mills", + "l-key-sets", + "l-keys", + "label-holders", + "latching-connectors", + "lathe-tools", + "lavatory-partitions", + "lead-screws", + "leveling-lasers", + "leveling-mounts", + "lid-supports", + "lift-off-hinges", + "lift-trucks", + "light-bulbs", + "limit-switches", + "linear-ball-bearings", + "liquid-level-gauges", + "lock-washers", + "lockout-devices", + "loop-clamps", + "loop-hangers", + "machine-brackets", + "machine-handles", + "machine-keys", + "magnetic-base-drills", + "magnetic-bumpers", + "masking-tape", + "masonry-drill-bits", + "medium-amp-relays", + "metal-cable-ties", + "metal-panels", + "metal-plates", + "metal-tags", + "metering-pumps", + "metric-o-rings", + "mil-spec-connectors", + "mobile-lift-tables", + "motor-controls", + "motor-starters", + "mountable-cable-ties", + "mounting-tape", + "neoprene-foam", + "nickel-titanium", + "nonmarring-hammers", + "nonslip-bumpers", + "nylon-rivets", + "nylon-tubing", + "o-rings", + "oil-level-indicators", + "oil-reservoirs", + "oil-skimmers", + "on-off-valves", + "open-end-wrenches", + "outlet-boxes", + "outlet-strips", + "packaging-tape", + "paint-brushes", + "paint-markers", + "paint-sprayers", + "pallet-racks", + "pallet-trucks", + "panel-air-filters", + "parts-baskets", + "pendant-switches", + "perforated-sheets", + "pest-control", + "petroleum-hose", + "piano-hinges", + "pipe-couplings", + "pipe-gaskets", + "pipe-markers", + "pipe-wrenches", + "plank-grating", + "plastic-clamps", + "plastic-mesh", + "plate-lifting-clamps", + "platinum-wire", + "plier-clamps", + "plug-gauges", + "portable-lights", + "power-cords", + "power-supplied", + "power-supplies", + "precision-knives", + "press-fit-nuts", + "press-in-nuts", + "protecting-tape", + "protective-coatings", + "protective-curtains", + "protective-panels", + "protective-wrap", + "proximity-switches", + "pull-handles", + "push-brooms", + "push-nuts", + "push-on-seals", + "pvc-pipe-fittings", + "pvc-tubing", + "quick-release-pins", + "ratchet-pullers", + "recycled-plastics", + "repair-adhesives", + "repair-clamps", + "reusable-cable-ties", + "ring-terminals", + "rivet-nuts", + "robot-base-mounts", + "robot-bases", + "rocker-switches", + "rod-wipers", + "roller-bearings", + "roller-chain", + "roller-conveyors", + "roof-exhaust-fans", + "roof-repair", + "rotary-broaches", + "rotary-hammers", + "rotary-shaft-seals", + "rotating-cranes", + "rotating-joints", + "router-bits", + "rtd-probes", + "rubber-edge-seals", + "rubber-tread-wheels", + "rubber-tubing", + "safety-cabinets", + "safety-glasses", + "safety-mirrors", + "sanding-belts", + "sanding-discs", + "sanding-guides", + "sanding-rolls", + "sanding-sheets", + "screw-extractors", + "screw-jacks", + "scrub-brushes", + "sealing-washers", + "security-lights", + "sensor-connectors", + "set-screws", + "setup-clamps", + "shaft-collars", + "shaft-couplings", + "shaft-repair-sleeves", + "shaft-supports", + "sharpening-stones", + "sheet-metal-cutters", + "shelf-cabinets", + "shim-stock", + "shim-tape", + "shipping-pails", + "shock-absorbers", + "shoulder-screws", + "shower-stations", + "silicone-foam", + "sleeve-bearings", + "slide-bolts", + "slitting-saws", + "slotted-spring-pins", + "sludge-samplers", + "small-parts-storage", + "snap-acting-switches", + "soap-dispensers", + "socket-head-screws", + "socket-organizers", + "socket-wrenches", + "soldering-irons", + "solid-rivets", + "solid-rod-ends", + "sound-insulation", + "space-heaters", + "spacing-beads", + "spanner-wrenches", + "specialty-pliers", + "specialty-vises", + "specialty-washers", + "speed-reducers", + "splicing-connectors", + "spray-bottles", + "spray-nozzles", + "spring-clamps", + "spring-plungers", + "spring-steel", + "square-drive-sockets", + "square-end-mills", + "square-nuts", + "squeeze-bottles", + "stack-lights", + "stainless-steel", + "stair-treads", + "static-control-mats", + "steel-carts", + "steel-pipe-fittings", + "steel-pipe-flanges", + "steel-stamps", + "steel-tubing", + "step-ladders", + "stepper-motors", + "storage-bags", + "storage-boxes", + "storage-chests", + "straight-ladders", + "strap-hinges", + "stretch-wrap", + "strip-doors", + "strip-springs", + "strobe-lights", + "structural-adhesives", + "strut-channel", + "strut-channel-nuts", + "strut-mount-clamps", + "suction-cup-lifters", + "suction-strainers", + "super-absorbent-foam", + "super-flexible-glass", + "surface-fillers", + "surface-mount-hinges", + "t-handle-keys", + "t-slotted-framing", + "tamper-seals", + "tank-level-measurers", + "tape-dispensers", + "tape-measures", + "taper-pins", + "tapping-screws", + "teflon-ptfe", + "terminal-blocks", + "test-indicators", + "test-leads", + "test-weights", + "tethered-knobs", + "thermal-insulation", + "thread-adapters", + "thread-sealant-tape", + "thread-sealants", + "threaded-inserts", + "threaded-standoffs", + "threaded-studs", + "thrust-ball-bearings", + "thrust-bearings", + "thumb-nuts", + "thumb-screws", + "tie-down-rings", + "time-clocks", + "timer-relays", + "timer-switches", + "toggle-clamps", + "toggle-switches", + "tool-holders", + "tool-sets", + "tool-steel", + "torque-wrenches", + "torsion-springs", + "tote-boxes", + "touch-bars", + "track-casters", + "track-rollers", + "track-wheels", + "traction-mats", + "trolley-systems", + "tube-brushes", + "tube-fittings", + "tubular-light-bulbs", + "turn-lock-connectors", + "twist-ties", + "u-bolts", + "u-joints", + "ul-class-fuses", + "unthreaded-spacers", + "usb-adapters", + "usb-cords", + "utility-knives", + "v-belts", + "vacuum-cups", + "vacuum-pumps", + "wall-louvers", + "wash-fountains", + "wash-guns", + "waste-containers", + "water-deionizers", + "water-filters", + "water-hose", + "water-removal-pumps", + "weather-stations", + "web-slings", + "weld-nuts", + "welding-clothing", + "welding-helmets", + "wet-dry-vacuums", + "wet-mops", + "wheel-brushes", + "wing-nuts", + "wire-cloth", + "wire-connectors", + "wire-cutting-pliers", + "wire-partitions", + "wire-rope", + "wire-rope-clamps", + "wire-wrap", + "wool-felt", + "work-platforms", + "workbench-legs", + "woven-wire-cloth", +] as const; diff --git a/foundry/packages/backend/test/keys.test.ts b/foundry/packages/backend/test/keys.test.ts index 843648b..c3b2a10 100644 --- a/foundry/packages/backend/test/keys.test.ts +++ b/foundry/packages/backend/test/keys.test.ts @@ -1,14 +1,13 @@ import { describe, expect, it } from "vitest"; -import { auditLogKey, githubDataKey, organizationKey, repositoryKey, taskKey, taskSandboxKey } from "../src/actors/keys.js"; +import { auditLogKey, githubDataKey, organizationKey, taskKey, taskSandboxKey } from "../src/actors/keys.js"; describe("actor keys", () => { it("prefixes every key with organization namespace", () => { const keys = [ organizationKey("default"), - repositoryKey("default", "repo"), taskKey("default", "repo", "task"), taskSandboxKey("default", "sbx"), - auditLogKey("default", "repo"), + auditLogKey("default"), githubDataKey("default"), ]; diff --git a/foundry/packages/client/src/keys.ts b/foundry/packages/client/src/keys.ts index 84dd00b..7242aae 100644 --- a/foundry/packages/client/src/keys.ts +++ b/foundry/packages/client/src/keys.ts @@ -4,18 +4,14 @@ export function organizationKey(organizationId: string): ActorKey { return ["org", organizationId]; } -export function repositoryKey(organizationId: string, repoId: string): ActorKey { - return ["org", organizationId, "repository", repoId]; -} - export function taskKey(organizationId: string, repoId: string, taskId: string): ActorKey { - return ["org", organizationId, "repository", repoId, "task", taskId]; + return ["org", organizationId, "task", repoId, taskId]; } export function taskSandboxKey(organizationId: string, sandboxId: string): ActorKey { return ["org", organizationId, "sandbox", sandboxId]; } -export function auditLogKey(organizationId: string, repoId: string): ActorKey { - return ["org", organizationId, "repository", repoId, "audit-log"]; +export function auditLogKey(organizationId: string): ActorKey { + return ["org", organizationId, "audit-log"]; } diff --git a/foundry/packages/client/src/mock/backend-client.ts b/foundry/packages/client/src/mock/backend-client.ts index f8ec139..fc6470c 100644 --- a/foundry/packages/client/src/mock/backend-client.ts +++ b/foundry/packages/client/src/mock/backend-client.ts @@ -308,6 +308,7 @@ export function createMockBackendClient(defaultOrganizationId = "default"): Back task: task.title, sandboxProviderId: "local", status: toTaskStatus(archived ? "archived" : "running", archived), + pullRequest: null, activeSandboxId: task.id, sandboxes: [ { @@ -453,6 +454,7 @@ export function createMockBackendClient(defaultOrganizationId = "default"): Back branchName: task.branch, title: task.title, status: task.status === "archived" ? "archived" : "running", + pullRequest: null, updatedAt: task.updatedAtMs, })); }, @@ -633,11 +635,7 @@ export function createMockBackendClient(defaultOrganizationId = "default"): Back return { endpoint: "mock://terminal-unavailable" }; }, - async getSandboxWorkspaceModelGroups( - _organizationId: string, - _sandboxProviderId: SandboxProviderId, - _sandboxId: string, - ): Promise { + async getSandboxWorkspaceModelGroups(_organizationId: string, _sandboxProviderId: SandboxProviderId, _sandboxId: string): Promise { return DEFAULT_WORKSPACE_MODEL_GROUPS; }, diff --git a/foundry/packages/client/test/keys.test.ts b/foundry/packages/client/test/keys.test.ts index eae67cf..6b93ec1 100644 --- a/foundry/packages/client/test/keys.test.ts +++ b/foundry/packages/client/test/keys.test.ts @@ -1,15 +1,9 @@ import { describe, expect, it } from "vitest"; -import { auditLogKey, organizationKey, repositoryKey, taskKey, taskSandboxKey } from "../src/keys.js"; +import { auditLogKey, organizationKey, taskKey, taskSandboxKey } from "../src/keys.js"; describe("actor keys", () => { it("prefixes every key with organization namespace", () => { - const keys = [ - organizationKey("default"), - repositoryKey("default", "repo"), - taskKey("default", "repo", "task"), - taskSandboxKey("default", "sbx"), - auditLogKey("default", "repo"), - ]; + const keys = [organizationKey("default"), taskKey("default", "repo", "task"), taskSandboxKey("default", "sbx"), auditLogKey("default")]; for (const key of keys) { expect(key[0]).toBe("org"); diff --git a/foundry/packages/client/test/subscription-manager.test.ts b/foundry/packages/client/test/subscription-manager.test.ts index 13b646c..c064606 100644 --- a/foundry/packages/client/test/subscription-manager.test.ts +++ b/foundry/packages/client/test/subscription-manager.test.ts @@ -164,6 +164,9 @@ describe("RemoteSubscriptionManager", () => { }, } satisfies OrganizationEvent); + // applyEvent chains onto an internal promise — flush the microtask queue + await flushAsyncWork(); + expect(manager.getSnapshot("organization", params)?.taskSummaries[0]?.title).toBe("Updated task"); expect(listenerA).toHaveBeenCalled(); expect(listenerB).toHaveBeenCalled(); diff --git a/foundry/packages/frontend/src/components/dev-panel.tsx b/foundry/packages/frontend/src/components/dev-panel.tsx index bf0d6e1..947331e 100644 --- a/foundry/packages/frontend/src/components/dev-panel.tsx +++ b/foundry/packages/frontend/src/components/dev-panel.tsx @@ -470,9 +470,7 @@ export const DevPanel = memo(function DevPanel({ organizationId, snapshot, organ /> Sync {liveGithub.syncStatus} - {liveGithub.lastSyncAt != null && ( - {timeAgo(liveGithub.lastSyncAt)} - )} + {liveGithub.lastSyncAt != null && {timeAgo(liveGithub.lastSyncAt)}}
- +
- {liveGithub.connectedAccount && ( -
@{liveGithub.connectedAccount}
- )} - {liveGithub.lastSyncLabel && ( -
last sync: {liveGithub.lastSyncLabel}
- )} + {liveGithub.connectedAccount &&
@{liveGithub.connectedAccount}
} + {liveGithub.lastSyncLabel &&
last sync: {liveGithub.lastSyncLabel}
} {liveGithub.syncPhase && (
- phase: {liveGithub.syncPhase.replace(/^syncing_/, "").replace(/_/g, " ")} ({liveGithub.processedRepositoryCount}/{liveGithub.totalRepositoryCount}) + phase: {liveGithub.syncPhase.replace(/^syncing_/, "").replace(/_/g, " ")} ({liveGithub.processedRepositoryCount}/ + {liveGithub.totalRepositoryCount})
)} diff --git a/foundry/packages/frontend/src/components/mock-layout.tsx b/foundry/packages/frontend/src/components/mock-layout.tsx index 6095d09..b1dadeb 100644 --- a/foundry/packages/frontend/src/components/mock-layout.tsx +++ b/foundry/packages/frontend/src/components/mock-layout.tsx @@ -80,7 +80,10 @@ function sanitizeActiveSessionId(task: Task, sessionId: string | null | undefine return openDiffs.length > 0 ? diffTabId(openDiffs[openDiffs.length - 1]!) : lastAgentSessionId; } -type GithubStatusView = Pick & { +type GithubStatusView = Pick< + FoundryOrganization["github"], + "connectedAccount" | "installationStatus" | "syncStatus" | "importedRepoCount" | "lastSyncLabel" +> & { syncPhase?: string | null; processedRepositoryCount?: number; totalRepositoryCount?: number; @@ -1912,7 +1915,7 @@ export function MockLayout({ organizationId, selectedTaskId, selectedSessionId }

Syncing with GitHub

{liveGithub.lastSyncLabel || `Importing repos from @${liveGithub.connectedAccount || "GitHub"}...`} - {liveGithub.totalRepositoryCount > 0 && ( + {(liveGithub.totalRepositoryCount ?? 0) > 0 && ( <> {" "} {liveGithub.syncPhase === "syncing_repositories" diff --git a/foundry/packages/frontend/src/components/organization-dashboard.tsx b/foundry/packages/frontend/src/components/organization-dashboard.tsx index c367542..672de82 100644 --- a/foundry/packages/frontend/src/components/organization-dashboard.tsx +++ b/foundry/packages/frontend/src/components/organization-dashboard.tsx @@ -530,8 +530,7 @@ export function OrganizationDashboard({ organizationId, selectedTaskId, selected if (!selectedForSession || !activeSandbox?.sandboxId) { throw new Error("No sandbox is available for this task"); } - const preferredAgent = - selectedSessionSummary?.agent === "Claude" ? "claude" : selectedSessionSummary?.agent === "Codex" ? "codex" : undefined; + const preferredAgent = selectedSessionSummary?.agent === "Claude" ? "claude" : selectedSessionSummary?.agent === "Codex" ? "codex" : undefined; return backendClient.createSandboxSession({ organizationId, sandboxProviderId: activeSandbox.sandboxProviderId, @@ -1114,7 +1113,7 @@ export function OrganizationDashboard({ organizationId, selectedTaskId, selected {selectedForSession ? ( {shouldUseTaskStateEmptyState ? taskStateSummary - : (isPendingProvision ? "The task is still provisioning." : "The session is being created.")} + : isPendingProvision + ? "The task is still provisioning." + : "The session is being created."} ) : null} @@ -1456,7 +1457,7 @@ export function OrganizationDashboard({ organizationId, selectedTaskId, selected gap: theme.sizing.scale300, })} > - + @@ -1501,7 +1502,7 @@ export function OrganizationDashboard({ organizationId, selectedTaskId, selected - {taskRuntimeStatus === "error" ? ( + {taskStatus === "error" ? (

{ + window.localStorage.removeItem(REMOTE_APP_SESSION_STORAGE_KEY); await backendClient.signOutApp(); }, async skipStarterRepo(): Promise { @@ -100,6 +101,14 @@ export function useMockAppSnapshot(): FoundryAppSnapshot { const app = useSubscription(subscriptionManager, "app", {}); if (app.status !== "loading") { firstSnapshotDelivered = true; + // Persist session sentinel so isAppSnapshotBootstrapping can show a loading + // screen instead of flashing /signin on the next page load / HMR reload. + const snapshot = app.data ?? EMPTY_APP_SNAPSHOT; + if (snapshot.auth.status === "signed_in") { + window.localStorage.setItem(REMOTE_APP_SESSION_STORAGE_KEY, "1"); + } else { + window.localStorage.removeItem(REMOTE_APP_SESSION_STORAGE_KEY); + } } return app.data ?? EMPTY_APP_SNAPSHOT; } diff --git a/foundry/packages/shared/src/contracts.ts b/foundry/packages/shared/src/contracts.ts index 07eb34c..d9202fe 100644 --- a/foundry/packages/shared/src/contracts.ts +++ b/foundry/packages/shared/src/contracts.ts @@ -173,6 +173,7 @@ export type StarSandboxAgentRepoResult = z.infer