This commit is contained in:
Nathan Flurry 2026-03-14 23:47:43 -07:00 committed by GitHub
parent 99abb9d42e
commit 57a07f6a0a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 206 additions and 113 deletions

View file

@ -1,6 +1,7 @@
// @ts-nocheck
import { eq } from "drizzle-orm";
import { actor } from "rivetkit";
import { actor, queue } from "rivetkit";
import { workflow, Loop } from "rivetkit/workflow";
import type { FoundryOrganization } from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js";
import { getOrCreateOrganization, getTask } from "../handles.js";
@ -536,8 +537,69 @@ async function runFullSync(c: any, input: FullSyncInput = {}) {
};
}
const GITHUB_DATA_QUEUE_NAMES = ["githubData.command.syncRepos"] as const;
async function runGithubDataWorkflow(ctx: any): Promise<void> {
// Initial sync: if this actor was just created and has never synced,
// kick off the first full sync automatically.
await ctx.step({
name: "github-data-initial-sync",
timeout: 5 * 60_000,
run: async () => {
const meta = await readMeta(ctx);
if (meta.syncStatus !== "pending") {
return; // Already synced or syncing — skip initial sync
}
try {
await runFullSync(ctx, { label: "Importing repository catalog..." });
} catch (error) {
// Best-effort initial sync. Write the error to meta so the client
// sees the failure and can trigger a manual retry.
const currentMeta = await readMeta(ctx);
const organization = await getOrCreateOrganization(ctx, ctx.state.organizationId);
await organization.markOrganizationSyncFailed({
message: error instanceof Error ? error.message : "GitHub import failed",
installationStatus: currentMeta.installationStatus,
});
}
},
});
// Command loop for explicit sync requests (reload, re-import, etc.)
await ctx.loop("github-data-command-loop", async (loopCtx: any) => {
const msg = await loopCtx.queue.next("next-github-data-command", {
names: [...GITHUB_DATA_QUEUE_NAMES],
completable: true,
});
if (!msg) {
return Loop.continue(undefined);
}
try {
if (msg.name === "githubData.command.syncRepos") {
await loopCtx.step({
name: "github-data-sync-repos",
timeout: 5 * 60_000,
run: async () => {
const body = msg.body as FullSyncInput;
await runFullSync(loopCtx, body);
},
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await msg.complete({ error: message }).catch(() => {});
}
return Loop.continue(undefined);
});
}
export const githubData = actor({
db: githubDataDb,
queues: Object.fromEntries(GITHUB_DATA_QUEUE_NAMES.map((name) => [name, queue()])),
options: {
name: "GitHub Data",
icon: "github",
@ -546,6 +608,7 @@ export const githubData = actor({
createState: (_c, input: GithubDataInput) => ({
organizationId: input.organizationId,
}),
run: workflow(runGithubDataWorkflow),
actions: {
async getSummary(c) {
const repositories = await c.db.select().from(githubRepositories).all();

View file

@ -61,11 +61,7 @@ interface RepoOverviewInput {
repoId: string;
}
const ORGANIZATION_QUEUE_NAMES = [
"organization.command.createTask",
"organization.command.syncGithubOrganizationRepos",
"organization.command.syncGithubSession",
] as const;
const ORGANIZATION_QUEUE_NAMES = ["organization.command.createTask", "organization.command.syncGithubSession"] as const;
const SANDBOX_AGENT_REPO = "rivet-dev/sandbox-agent";
type OrganizationQueueName = (typeof ORGANIZATION_QUEUE_NAMES)[number];
@ -384,19 +380,6 @@ export async function runOrganizationWorkflow(ctx: any): Promise<void> {
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "organization.command.syncGithubOrganizationRepos") {
await loopCtx.step({
name: "organization-sync-github-organization-repos",
timeout: 60_000,
run: async () => {
const { syncGithubOrganizationRepos } = await import("./app-shell.js");
await syncGithubOrganizationRepos(loopCtx, msg.body as { sessionId: string; organizationId: string });
},
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
} catch (error) {
const message = resolveErrorMessage(error);
logActorWarning("organization", "organization workflow command failed", {

View file

@ -596,49 +596,6 @@ async function syncGithubOrganizationsInternal(c: any, input: { sessionId: strin
});
}
export async function syncGithubOrganizationRepos(c: any, input: { sessionId: string; organizationId: string }): Promise<void> {
assertAppOrganization(c);
const session = await requireSignedInSession(c, input.sessionId);
requireEligibleOrganization(session, input.organizationId);
const organizationHandle = await getOrCreateOrganization(c, input.organizationId);
const organizationState = await getOrganizationState(organizationHandle);
const githubData = await getOrCreateGithubData(c, input.organizationId);
try {
await githubData.fullSync({
accessToken: session.githubAccessToken,
connectedAccount: organizationState.snapshot.github.connectedAccount,
installationId: organizationState.githubInstallationId,
installationStatus: organizationState.snapshot.github.installationStatus,
githubLogin: organizationState.githubLogin,
kind: organizationState.snapshot.kind,
label: "Importing repository catalog...",
});
// Broadcast updated app snapshot so connected clients see the new repos
c.broadcast("appUpdated", {
type: "appUpdated",
snapshot: await buildAppSnapshot(c, input.sessionId),
});
} catch (error) {
const installationStatus =
error instanceof GitHubAppError && (error.status === 403 || error.status === 404)
? "reconnect_required"
: organizationState.snapshot.github.installationStatus;
await organizationHandle.markOrganizationSyncFailed({
message: error instanceof Error ? error.message : "GitHub import failed",
installationStatus,
});
// Broadcast sync failure so the client updates status
c.broadcast("appUpdated", {
type: "appUpdated",
snapshot: await buildAppSnapshot(c, input.sessionId),
});
}
}
async function readOrganizationProfileRow(c: any) {
assertOrganizationShell(c);
return await c.db.select().from(organizationProfile).where(eq(organizationProfile.id, PROFILE_ROW_ID)).get();
@ -1113,26 +1070,11 @@ export const organizationAppActions = {
requireEligibleOrganization(session, input.organizationId);
await getBetterAuthService().setActiveOrganization(input.sessionId, input.organizationId);
const organizationHandle = await getOrCreateOrganization(c, input.organizationId);
const organizationState = await getOrganizationState(organizationHandle);
if (organizationState.snapshot.github.syncStatus !== "synced") {
if (organizationState.snapshot.github.syncStatus !== "syncing") {
await organizationHandle.markOrganizationSyncStarted({
label: "Importing repository catalog...",
});
// Ensure the GitHub data actor exists. If it's newly created, its own
// workflow will detect the pending sync status and run the initial
// full sync automatically — no orchestration needed here.
await getOrCreateGithubData(c, input.organizationId);
const self = selfOrganization(c);
await self.send(
"organization.command.syncGithubOrganizationRepos",
{ sessionId: input.sessionId, organizationId: input.organizationId },
{
wait: false,
},
);
}
return await buildAppSnapshot(c, input.sessionId);
}
return await buildAppSnapshot(c, input.sessionId);
},
@ -1157,24 +1099,20 @@ export const organizationAppActions = {
const session = await requireSignedInSession(c, input.sessionId);
requireEligibleOrganization(session, input.organizationId);
const organizationHandle = await getOrCreateOrganization(c, input.organizationId);
const organizationState = await getOrganizationState(organizationHandle);
if (organizationState.snapshot.github.syncStatus === "syncing") {
const githubData = await getOrCreateGithubData(c, input.organizationId);
const summary = await githubData.getSummary({});
if (summary.syncStatus === "syncing") {
return await buildAppSnapshot(c, input.sessionId);
}
// Mark sync started on the organization, then send directly to the
// GitHub data actor's own workflow queue.
const organizationHandle = await getOrCreateOrganization(c, input.organizationId);
await organizationHandle.markOrganizationSyncStarted({
label: "Importing repository catalog...",
});
const self = selfOrganization(c);
await self.send(
"organization.command.syncGithubOrganizationRepos",
{ sessionId: input.sessionId, organizationId: input.organizationId },
{
wait: false,
},
);
await githubData.send("githubData.command.syncRepos", { label: "Importing repository catalog..." }, { wait: false });
return await buildAppSnapshot(c, input.sessionId);
},

View file

@ -2,6 +2,11 @@ import { integer, sqliteTable, text } from "rivetkit/db/drizzle";
// SQLite is per organization actor instance, so no organizationId column needed.
/**
* Coordinator index of RepositoryActor instances.
* The organization actor is the coordinator for repositories.
* Rows are created/removed when repos are added/removed from the organization.
*/
export const repos = sqliteTable("repos", {
repoId: text("repo_id").notNull().primaryKey(),
remoteUrl: text("remote_url").notNull(),
@ -9,15 +14,21 @@ export const repos = sqliteTable("repos", {
updatedAt: integer("updated_at").notNull(),
});
/**
* Coordinator index of TaskActor instances.
* Fast taskId repoId lookup so the organization can route requests
* to the correct RepositoryActor without scanning all repos.
*/
export const taskLookup = sqliteTable("task_lookup", {
taskId: text("task_id").notNull().primaryKey(),
repoId: text("repo_id").notNull(),
});
/**
* Materialized sidebar projection maintained by task actors.
* The source of truth still lives on each task actor; this table exists so
* organization reads can stay local and avoid fan-out across child actors.
* Coordinator index of TaskActor instances materialized sidebar projection.
* Task actors push summary updates to the organization actor via
* applyTaskSummaryUpdate(). Source of truth lives on each TaskActor;
* this table exists so organization reads stay local without fan-out.
*/
export const taskSummaries = sqliteTable("task_summaries", {
taskId: text("task_id").notNull().primaryKey(),
@ -87,6 +98,11 @@ export const invoices = sqliteTable("invoices", {
createdAt: integer("created_at").notNull(),
});
/**
* Coordinator index of AuthUserActor instances routes session token userId.
* Better Auth adapter uses this to resolve which user actor to query
* before the user identity is known.
*/
export const authSessionIndex = sqliteTable("auth_session_index", {
sessionId: text("session_id").notNull().primaryKey(),
sessionToken: text("session_token").notNull(),
@ -95,12 +111,20 @@ export const authSessionIndex = sqliteTable("auth_session_index", {
updatedAt: integer("updated_at").notNull(),
});
/**
* Coordinator index of AuthUserActor instances routes email userId.
* Better Auth adapter uses this to resolve which user actor to query.
*/
export const authEmailIndex = sqliteTable("auth_email_index", {
email: text("email").notNull().primaryKey(),
userId: text("user_id").notNull(),
updatedAt: integer("updated_at").notNull(),
});
/**
* Coordinator index of AuthUserActor instances routes OAuth account userId.
* Better Auth adapter uses this to resolve which user actor to query.
*/
export const authAccountIndex = sqliteTable("auth_account_index", {
id: text("id").notNull().primaryKey(),
providerId: text("provider_id").notNull(),

View file

@ -8,6 +8,13 @@ export const repoMeta = sqliteTable("repo_meta", {
updatedAt: integer("updated_at").notNull(),
});
/**
* Coordinator index of TaskActor instances.
* The repository actor is the coordinator for tasks. Each row maps a
* taskId to its branch name. Used for branch conflict checking and
* task-by-branch lookups. Rows are inserted at task creation and
* updated on branch rename.
*/
export const taskIndex = sqliteTable("task_index", {
taskId: text("task_id").notNull().primaryKey(),
branchName: text("branch_name"),

View file

@ -37,6 +37,11 @@ export const taskRuntime = sqliteTable(
(table) => [check("task_runtime_singleton_id_check", sql`${table.id} = 1`)],
);
/**
* Coordinator index of SandboxInstanceActor instances.
* Tracks all sandbox instances provisioned for this task. Only one
* is active at a time (referenced by taskRuntime.activeSandboxId).
*/
export const taskSandboxes = sqliteTable("task_sandboxes", {
sandboxId: text("sandbox_id").notNull().primaryKey(),
sandboxProviderId: text("sandbox_provider_id").notNull(),
@ -48,6 +53,12 @@ export const taskSandboxes = sqliteTable("task_sandboxes", {
updatedAt: integer("updated_at").notNull(),
});
/**
* Coordinator index of workbench sessions within this task.
* The task actor is the coordinator for sessions. Each row holds session
* metadata, model, status, transcript, and draft state. Sessions are
* sub-entities of the task no separate session actor in the DB.
*/
export const taskWorkbenchSessions = sqliteTable("task_workbench_sessions", {
sessionId: text("session_id").notNull().primaryKey(),
sandboxSessionId: text("sandbox_session_id"),

View file

@ -386,11 +386,24 @@ async function getTaskSandboxRuntime(
};
}
async function ensureSandboxRepo(c: any, sandbox: any, record: any): Promise<void> {
/**
* Track whether the sandbox repo has been fully prepared (cloned + fetched + checked out)
* for the current actor lifecycle. Subsequent calls can skip the expensive `git fetch`
* when `skipFetch` is true (used by sendWorkbenchMessage to avoid blocking on every prompt).
*/
let sandboxRepoPrepared = false;
async function ensureSandboxRepo(c: any, sandbox: any, record: any, opts?: { skipFetchIfPrepared?: boolean }): Promise<void> {
if (!record.branchName) {
throw new Error("cannot prepare a sandbox repo before the task branch exists");
}
// If the repo was already prepared and the caller allows skipping fetch, just return.
// The clone, fetch, and checkout already happened on a prior call.
if (opts?.skipFetchIfPrepared && sandboxRepoPrepared) {
return;
}
const auth = await resolveOrganizationGithubAuth(c, c.state.organizationId);
const repository = await getOrCreateRepository(c, c.state.organizationId, c.state.repoId, c.state.repoRemote);
const metadata = await repository.getRepositoryMetadata({});
@ -426,6 +439,8 @@ async function ensureSandboxRepo(c: any, sandbox: any, record: any): Promise<voi
if ((result.exitCode ?? 0) !== 0) {
throw new Error(`sandbox repo preparation failed (${result.exitCode ?? 1}): ${[result.stdout, result.stderr].filter(Boolean).join("")}`);
}
sandboxRepoPrepared = true;
}
async function executeInSandbox(
@ -1191,7 +1206,9 @@ export async function sendWorkbenchMessage(c: any, sessionId: string, text: stri
const meta = requireSendableSessionMeta(await readSessionMeta(c, sessionId), sessionId);
const record = await ensureWorkbenchSeeded(c);
const runtime = await getTaskSandboxRuntime(c, record);
await ensureSandboxRepo(c, runtime.sandbox, record);
// Skip git fetch on subsequent messages — the repo was already prepared during session
// creation. This avoids a 5-30s network round-trip to GitHub on every prompt.
await ensureSandboxRepo(c, runtime.sandbox, record, { skipFetchIfPrepared: true });
const prompt = [text.trim(), ...attachments.map((attachment: any) => `@ ${attachment.filePath}:${attachment.lineNumber}\n${attachment.lineContent}`)].filter(
Boolean,
);