mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-20 09:01:56 +00:00
Finalize Foundry sync flow
This commit is contained in:
parent
5c70cbcd23
commit
1c852cc5f8
14 changed files with 768 additions and 187 deletions
|
|
@ -6,6 +6,12 @@ const journal = {
|
|||
tag: "0000_github_state",
|
||||
breakpoints: true,
|
||||
},
|
||||
{
|
||||
idx: 1,
|
||||
when: 1773340800000,
|
||||
tag: "0001_github_state_sync_progress",
|
||||
breakpoints: true,
|
||||
},
|
||||
],
|
||||
} as const;
|
||||
|
||||
|
|
@ -53,6 +59,12 @@ CREATE TABLE \`github_pull_requests\` (
|
|||
\`is_draft\` integer NOT NULL,
|
||||
\`updated_at\` integer NOT NULL
|
||||
);
|
||||
`,
|
||||
m0001: `ALTER TABLE \`github_meta\` ADD \`sync_phase\` text;
|
||||
ALTER TABLE \`github_meta\` ADD \`sync_run_started_at\` integer;
|
||||
ALTER TABLE \`github_meta\` ADD \`sync_repositories_total\` integer;
|
||||
ALTER TABLE \`github_meta\` ADD \`sync_repositories_completed\` integer;
|
||||
ALTER TABLE \`github_meta\` ADD \`sync_pull_request_repositories_completed\` integer;
|
||||
`,
|
||||
} as const,
|
||||
};
|
||||
|
|
|
|||
|
|
@ -8,6 +8,11 @@ export const githubMeta = sqliteTable("github_meta", {
|
|||
installationId: integer("installation_id"),
|
||||
lastSyncLabel: text("last_sync_label").notNull(),
|
||||
lastSyncAt: integer("last_sync_at"),
|
||||
syncPhase: text("sync_phase"),
|
||||
syncRunStartedAt: integer("sync_run_started_at"),
|
||||
syncRepositoriesTotal: integer("sync_repositories_total"),
|
||||
syncRepositoriesCompleted: integer("sync_repositories_completed"),
|
||||
syncPullRequestRepositoriesCompleted: integer("sync_pull_request_repositories_completed"),
|
||||
updatedAt: integer("updated_at").notNull(),
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -1,15 +1,20 @@
|
|||
// @ts-nocheck
|
||||
import { eq } from "drizzle-orm";
|
||||
import { actor } from "rivetkit";
|
||||
import { randomUUID } from "node:crypto";
|
||||
import { eq, lt } from "drizzle-orm";
|
||||
import { actor, queue } from "rivetkit";
|
||||
import { Loop, workflow } from "rivetkit/workflow";
|
||||
import type { FoundryGithubInstallationStatus, FoundryGithubSyncStatus } from "@sandbox-agent/foundry-shared";
|
||||
import { repoIdFromRemote } from "../../services/repo.js";
|
||||
import { resolveWorkspaceGithubAuth } from "../../services/github-auth.js";
|
||||
import { getActorRuntimeContext } from "../context.js";
|
||||
import { getOrCreateOrganization, getOrCreateRepository, selfGithubState } from "../handles.js";
|
||||
import { APP_SHELL_ORGANIZATION_ID } from "../organization/app-shell.js";
|
||||
import { githubStateDb } from "./db/db.js";
|
||||
import { githubMembers, githubMeta, githubPullRequests, githubRepositories } from "./db/schema.js";
|
||||
|
||||
const META_ROW_ID = 1;
|
||||
const GITHUB_PR_BATCH_SIZE = 10;
|
||||
const GITHUB_QUEUE_NAMES = ["github.command.full_sync"] as const;
|
||||
|
||||
interface GithubStateInput {
|
||||
organizationId: string;
|
||||
|
|
@ -22,6 +27,11 @@ interface GithubStateMeta {
|
|||
installationId: number | null;
|
||||
lastSyncLabel: string;
|
||||
lastSyncAt: number | null;
|
||||
syncPhase: string | null;
|
||||
syncRunStartedAt: number | null;
|
||||
syncRepositoriesTotal: number | null;
|
||||
syncRepositoriesCompleted: number;
|
||||
syncPullRequestRepositoriesCompleted: number;
|
||||
}
|
||||
|
||||
interface SyncMemberSeed {
|
||||
|
|
@ -42,6 +52,12 @@ interface FullSyncInput {
|
|||
accessToken?: string | null;
|
||||
label?: string;
|
||||
fallbackMembers?: SyncMemberSeed[];
|
||||
force?: boolean;
|
||||
}
|
||||
|
||||
interface FullSyncCommand extends FullSyncInput {
|
||||
runId: string;
|
||||
runStartedAt: number;
|
||||
}
|
||||
|
||||
interface PullRequestWebhookInput {
|
||||
|
|
@ -67,6 +83,36 @@ interface PullRequestWebhookInput {
|
|||
};
|
||||
}
|
||||
|
||||
interface GitHubRepositorySnapshot {
|
||||
fullName: string;
|
||||
cloneUrl: string;
|
||||
private: boolean;
|
||||
}
|
||||
|
||||
interface GitHubPullRequestSnapshot {
|
||||
repoFullName: string;
|
||||
cloneUrl: string;
|
||||
number: number;
|
||||
title: string;
|
||||
body?: string | null;
|
||||
state: string;
|
||||
url: string;
|
||||
headRefName: string;
|
||||
baseRefName: string;
|
||||
authorLogin?: string | null;
|
||||
isDraft?: boolean;
|
||||
}
|
||||
|
||||
interface FullSyncSeed {
|
||||
repositories: GitHubRepositorySnapshot[];
|
||||
members: SyncMemberSeed[];
|
||||
pullRequestSource: "installation" | "user" | "none";
|
||||
}
|
||||
|
||||
function githubWorkflowQueueName(name: string): string {
|
||||
return name;
|
||||
}
|
||||
|
||||
function normalizePullRequestStatus(input: { state: string; isDraft?: boolean; merged?: boolean }): "draft" | "ready" | "closed" | "merged" {
|
||||
const rawState = input.state.trim().toUpperCase();
|
||||
if (input.merged || rawState === "MERGED") {
|
||||
|
|
@ -78,24 +124,13 @@ function normalizePullRequestStatus(input: { state: string; isDraft?: boolean; m
|
|||
return input.isDraft ? "draft" : "ready";
|
||||
}
|
||||
|
||||
interface FullSyncSnapshot {
|
||||
repositories: Array<{ fullName: string; cloneUrl: string; private: boolean }>;
|
||||
members: SyncMemberSeed[];
|
||||
loadPullRequests: () => Promise<
|
||||
Array<{
|
||||
repoFullName: string;
|
||||
cloneUrl: string;
|
||||
number: number;
|
||||
title: string;
|
||||
body?: string | null;
|
||||
state: string;
|
||||
url: string;
|
||||
headRefName: string;
|
||||
baseRefName: string;
|
||||
authorLogin?: string | null;
|
||||
isDraft?: boolean;
|
||||
}>
|
||||
>;
|
||||
function repoBelongsToAccount(fullName: string, accountLogin: string): boolean {
|
||||
const owner = fullName.split("/")[0]?.trim().toLowerCase() ?? "";
|
||||
return owner.length > 0 && owner === accountLogin.trim().toLowerCase();
|
||||
}
|
||||
|
||||
function batchLabel(completed: number, total: number): string {
|
||||
return `Syncing pull requests (${completed}/${total} repositories)...`;
|
||||
}
|
||||
|
||||
async function readMeta(c: any): Promise<GithubStateMeta> {
|
||||
|
|
@ -107,6 +142,11 @@ async function readMeta(c: any): Promise<GithubStateMeta> {
|
|||
installationId: row?.installationId ?? null,
|
||||
lastSyncLabel: row?.lastSyncLabel ?? "Waiting for first sync",
|
||||
lastSyncAt: row?.lastSyncAt ?? null,
|
||||
syncPhase: row?.syncPhase ?? null,
|
||||
syncRunStartedAt: row?.syncRunStartedAt ?? null,
|
||||
syncRepositoriesTotal: row?.syncRepositoriesTotal ?? null,
|
||||
syncRepositoriesCompleted: row?.syncRepositoriesCompleted ?? 0,
|
||||
syncPullRequestRepositoriesCompleted: row?.syncPullRequestRepositoriesCompleted ?? 0,
|
||||
};
|
||||
}
|
||||
|
||||
|
|
@ -126,6 +166,11 @@ async function writeMeta(c: any, patch: Partial<GithubStateMeta>): Promise<Githu
|
|||
installationId: next.installationId,
|
||||
lastSyncLabel: next.lastSyncLabel,
|
||||
lastSyncAt: next.lastSyncAt,
|
||||
syncPhase: next.syncPhase,
|
||||
syncRunStartedAt: next.syncRunStartedAt,
|
||||
syncRepositoriesTotal: next.syncRepositoriesTotal,
|
||||
syncRepositoriesCompleted: next.syncRepositoriesCompleted,
|
||||
syncPullRequestRepositoriesCompleted: next.syncPullRequestRepositoriesCompleted,
|
||||
updatedAt: Date.now(),
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
|
|
@ -137,6 +182,11 @@ async function writeMeta(c: any, patch: Partial<GithubStateMeta>): Promise<Githu
|
|||
installationId: next.installationId,
|
||||
lastSyncLabel: next.lastSyncLabel,
|
||||
lastSyncAt: next.lastSyncAt,
|
||||
syncPhase: next.syncPhase,
|
||||
syncRunStartedAt: next.syncRunStartedAt,
|
||||
syncRepositoriesTotal: next.syncRepositoriesTotal,
|
||||
syncRepositoriesCompleted: next.syncRepositoriesCompleted,
|
||||
syncPullRequestRepositoriesCompleted: next.syncPullRequestRepositoriesCompleted,
|
||||
updatedAt: Date.now(),
|
||||
},
|
||||
})
|
||||
|
|
@ -144,9 +194,17 @@ async function writeMeta(c: any, patch: Partial<GithubStateMeta>): Promise<Githu
|
|||
return next;
|
||||
}
|
||||
|
||||
async function replaceRepositories(c: any, repositories: Array<{ fullName: string; cloneUrl: string; private: boolean }>): Promise<void> {
|
||||
await c.db.delete(githubRepositories).run();
|
||||
const now = Date.now();
|
||||
async function notifyAppUpdated(c: any): Promise<void> {
|
||||
const app = await getOrCreateOrganization(c, APP_SHELL_ORGANIZATION_ID);
|
||||
await app.notifyAppUpdated({});
|
||||
}
|
||||
|
||||
async function notifyOrganizationUpdated(c: any): Promise<void> {
|
||||
const organization = await getOrCreateOrganization(c, c.state.organizationId);
|
||||
await Promise.all([organization.notifyWorkbenchUpdated({}), notifyAppUpdated(c)]);
|
||||
}
|
||||
|
||||
async function upsertRepositories(c: any, repositories: GitHubRepositorySnapshot[], updatedAt: number): Promise<void> {
|
||||
for (const repository of repositories) {
|
||||
await c.db
|
||||
.insert(githubRepositories)
|
||||
|
|
@ -155,15 +213,22 @@ async function replaceRepositories(c: any, repositories: Array<{ fullName: strin
|
|||
fullName: repository.fullName,
|
||||
cloneUrl: repository.cloneUrl,
|
||||
private: repository.private ? 1 : 0,
|
||||
updatedAt: now,
|
||||
updatedAt,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: githubRepositories.repoId,
|
||||
set: {
|
||||
fullName: repository.fullName,
|
||||
cloneUrl: repository.cloneUrl,
|
||||
private: repository.private ? 1 : 0,
|
||||
updatedAt,
|
||||
},
|
||||
})
|
||||
.run();
|
||||
}
|
||||
}
|
||||
|
||||
async function replaceMembers(c: any, members: SyncMemberSeed[]): Promise<void> {
|
||||
await c.db.delete(githubMembers).run();
|
||||
const now = Date.now();
|
||||
async function upsertMembers(c: any, members: SyncMemberSeed[], updatedAt: number): Promise<void> {
|
||||
for (const member of members) {
|
||||
await c.db
|
||||
.insert(githubMembers)
|
||||
|
|
@ -174,30 +239,24 @@ async function replaceMembers(c: any, members: SyncMemberSeed[]): Promise<void>
|
|||
email: member.email ?? null,
|
||||
role: member.role ?? null,
|
||||
state: member.state ?? "active",
|
||||
updatedAt: now,
|
||||
updatedAt,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: githubMembers.memberId,
|
||||
set: {
|
||||
login: member.login,
|
||||
displayName: member.name || member.login,
|
||||
email: member.email ?? null,
|
||||
role: member.role ?? null,
|
||||
state: member.state ?? "active",
|
||||
updatedAt,
|
||||
},
|
||||
})
|
||||
.run();
|
||||
}
|
||||
}
|
||||
|
||||
async function replacePullRequests(
|
||||
c: any,
|
||||
pullRequests: Array<{
|
||||
repoFullName: string;
|
||||
cloneUrl: string;
|
||||
number: number;
|
||||
title: string;
|
||||
body?: string | null;
|
||||
state: string;
|
||||
url: string;
|
||||
headRefName: string;
|
||||
baseRefName: string;
|
||||
authorLogin?: string | null;
|
||||
isDraft?: boolean;
|
||||
}>,
|
||||
): Promise<void> {
|
||||
await c.db.delete(githubPullRequests).run();
|
||||
const now = Date.now();
|
||||
async function upsertPullRequests(c: any, pullRequests: GitHubPullRequestSnapshot[], updatedAt: number): Promise<void> {
|
||||
for (const pullRequest of pullRequests) {
|
||||
const repoId = repoIdFromRemote(pullRequest.cloneUrl);
|
||||
await c.db
|
||||
|
|
@ -215,12 +274,34 @@ async function replacePullRequests(
|
|||
baseRefName: pullRequest.baseRefName,
|
||||
authorLogin: pullRequest.authorLogin ?? null,
|
||||
isDraft: pullRequest.isDraft ? 1 : 0,
|
||||
updatedAt: now,
|
||||
updatedAt,
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: githubPullRequests.prId,
|
||||
set: {
|
||||
repoId,
|
||||
repoFullName: pullRequest.repoFullName,
|
||||
title: pullRequest.title,
|
||||
body: pullRequest.body ?? null,
|
||||
state: pullRequest.state,
|
||||
url: pullRequest.url,
|
||||
headRefName: pullRequest.headRefName,
|
||||
baseRefName: pullRequest.baseRefName,
|
||||
authorLogin: pullRequest.authorLogin ?? null,
|
||||
isDraft: pullRequest.isDraft ? 1 : 0,
|
||||
updatedAt,
|
||||
},
|
||||
})
|
||||
.run();
|
||||
}
|
||||
}
|
||||
|
||||
async function pruneStaleRows(c: any, runStartedAt: number): Promise<void> {
|
||||
await c.db.delete(githubRepositories).where(lt(githubRepositories.updatedAt, runStartedAt)).run();
|
||||
await c.db.delete(githubMembers).where(lt(githubMembers.updatedAt, runStartedAt)).run();
|
||||
await c.db.delete(githubPullRequests).where(lt(githubPullRequests.updatedAt, runStartedAt)).run();
|
||||
}
|
||||
|
||||
async function upsertPullRequest(c: any, input: PullRequestWebhookInput): Promise<void> {
|
||||
const repoId = repoIdFromRemote(input.repository.cloneUrl);
|
||||
const now = Date.now();
|
||||
|
|
@ -340,13 +421,206 @@ async function countRows(c: any) {
|
|||
};
|
||||
}
|
||||
|
||||
function repoBelongsToAccount(fullName: string, accountLogin: string): boolean {
|
||||
const owner = fullName.split("/")[0]?.trim().toLowerCase() ?? "";
|
||||
return owner.length > 0 && owner === accountLogin.trim().toLowerCase();
|
||||
async function resolveFullSyncSeed(c: any, input: FullSyncCommand): Promise<FullSyncSeed> {
|
||||
const { appShell } = getActorRuntimeContext();
|
||||
|
||||
const syncFromUserToken = async (): Promise<FullSyncSeed> => {
|
||||
const rawRepositories = input.accessToken ? await appShell.github.listUserRepositories(input.accessToken) : [];
|
||||
const repositories =
|
||||
input.kind === "organization" ? rawRepositories.filter((repository) => repoBelongsToAccount(repository.fullName, input.githubLogin)) : rawRepositories;
|
||||
const members =
|
||||
input.accessToken && input.kind === "organization"
|
||||
? await appShell.github.listOrganizationMembers(input.accessToken, input.githubLogin)
|
||||
: (input.fallbackMembers ?? []).map((member) => ({
|
||||
id: member.id,
|
||||
login: member.login,
|
||||
name: member.name,
|
||||
email: member.email ?? null,
|
||||
role: member.role ?? null,
|
||||
state: member.state ?? "active",
|
||||
}));
|
||||
return {
|
||||
repositories,
|
||||
members,
|
||||
pullRequestSource: input.accessToken ? "user" : "none",
|
||||
};
|
||||
};
|
||||
|
||||
if (input.installationId != null) {
|
||||
try {
|
||||
const repositories = await appShell.github.listInstallationRepositories(input.installationId);
|
||||
const members =
|
||||
input.kind === "organization"
|
||||
? await appShell.github.listInstallationMembers(input.installationId, input.githubLogin)
|
||||
: (input.fallbackMembers ?? []).map((member) => ({
|
||||
id: member.id,
|
||||
login: member.login,
|
||||
name: member.name,
|
||||
email: member.email ?? null,
|
||||
role: member.role ?? null,
|
||||
state: member.state ?? "active",
|
||||
}));
|
||||
return {
|
||||
repositories,
|
||||
members,
|
||||
pullRequestSource: "installation",
|
||||
};
|
||||
} catch (error) {
|
||||
if (!input.accessToken) {
|
||||
throw error;
|
||||
}
|
||||
return await syncFromUserToken();
|
||||
}
|
||||
}
|
||||
|
||||
return await syncFromUserToken();
|
||||
}
|
||||
|
||||
async function loadPullRequestsForBatch(c: any, input: FullSyncCommand, source: FullSyncSeed["pullRequestSource"], repositories: GitHubRepositorySnapshot[]) {
|
||||
const { appShell } = getActorRuntimeContext();
|
||||
if (repositories.length === 0) {
|
||||
return [];
|
||||
}
|
||||
if (source === "installation" && input.installationId != null) {
|
||||
try {
|
||||
return await appShell.github.listInstallationPullRequestsForRepositories(input.installationId, repositories);
|
||||
} catch (error) {
|
||||
if (!input.accessToken) {
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (source === "user" && input.accessToken) {
|
||||
return await appShell.github.listPullRequestsForUserRepositories(input.accessToken, repositories);
|
||||
}
|
||||
return [];
|
||||
}
|
||||
|
||||
async function runFullSyncWorkflow(loopCtx: any, msg: any): Promise<void> {
|
||||
const body = msg.body as FullSyncCommand;
|
||||
const stepPrefix = `github-sync-${body.runId}`;
|
||||
let completionSummary: Awaited<ReturnType<typeof readMeta>> & Awaited<ReturnType<typeof countRows>>;
|
||||
|
||||
try {
|
||||
const seed = await loopCtx.step({
|
||||
name: `${stepPrefix}-resolve-seed`,
|
||||
timeout: 5 * 60_000,
|
||||
run: async () => resolveFullSyncSeed(loopCtx, body),
|
||||
});
|
||||
|
||||
await loopCtx.step(`${stepPrefix}-write-repositories`, async () => {
|
||||
await upsertRepositories(loopCtx, seed.repositories, body.runStartedAt);
|
||||
const organization = await getOrCreateOrganization(loopCtx, loopCtx.state.organizationId);
|
||||
await organization.applyOrganizationRepositoryCatalog({
|
||||
repositories: seed.repositories,
|
||||
});
|
||||
await writeMeta(loopCtx, {
|
||||
connectedAccount: body.connectedAccount,
|
||||
installationStatus: body.installationStatus,
|
||||
installationId: body.installationId,
|
||||
syncStatus: "syncing",
|
||||
syncPhase: "repositories",
|
||||
syncRunStartedAt: body.runStartedAt,
|
||||
syncRepositoriesTotal: seed.repositories.length,
|
||||
syncRepositoriesCompleted: seed.repositories.length,
|
||||
syncPullRequestRepositoriesCompleted: 0,
|
||||
lastSyncLabel: seed.repositories.length > 0 ? batchLabel(0, seed.repositories.length) : "No repositories available",
|
||||
});
|
||||
await notifyAppUpdated(loopCtx);
|
||||
});
|
||||
|
||||
await loopCtx.step(`${stepPrefix}-write-members`, async () => {
|
||||
await upsertMembers(loopCtx, seed.members, body.runStartedAt);
|
||||
await writeMeta(loopCtx, {
|
||||
syncPhase: "pull_requests",
|
||||
});
|
||||
await notifyAppUpdated(loopCtx);
|
||||
});
|
||||
|
||||
for (let start = 0; start < seed.repositories.length; start += GITHUB_PR_BATCH_SIZE) {
|
||||
const batch = seed.repositories.slice(start, start + GITHUB_PR_BATCH_SIZE);
|
||||
const completed = Math.min(start + batch.length, seed.repositories.length);
|
||||
await loopCtx.step({
|
||||
name: `${stepPrefix}-pull-requests-${start}`,
|
||||
timeout: 5 * 60_000,
|
||||
run: async () => {
|
||||
const pullRequests = await loadPullRequestsForBatch(loopCtx, body, seed.pullRequestSource, batch);
|
||||
await upsertPullRequests(loopCtx, pullRequests, Date.now());
|
||||
await writeMeta(loopCtx, {
|
||||
syncPhase: "pull_requests",
|
||||
syncPullRequestRepositoriesCompleted: completed,
|
||||
lastSyncLabel: batchLabel(completed, seed.repositories.length),
|
||||
});
|
||||
await notifyAppUpdated(loopCtx);
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
await loopCtx.step(`${stepPrefix}-finalize`, async () => {
|
||||
await pruneStaleRows(loopCtx, body.runStartedAt);
|
||||
const lastSyncLabel = seed.repositories.length > 0 ? `Synced ${seed.repositories.length} repositories` : "No repositories available";
|
||||
await writeMeta(loopCtx, {
|
||||
connectedAccount: body.connectedAccount,
|
||||
installationStatus: body.installationStatus,
|
||||
installationId: body.installationId,
|
||||
syncStatus: "synced",
|
||||
lastSyncLabel,
|
||||
lastSyncAt: Date.now(),
|
||||
syncPhase: null,
|
||||
syncRunStartedAt: null,
|
||||
syncRepositoriesTotal: seed.repositories.length,
|
||||
syncRepositoriesCompleted: seed.repositories.length,
|
||||
syncPullRequestRepositoriesCompleted: seed.repositories.length,
|
||||
});
|
||||
completionSummary = {
|
||||
...(await readMeta(loopCtx)),
|
||||
...(await countRows(loopCtx)),
|
||||
};
|
||||
await notifyOrganizationUpdated(loopCtx);
|
||||
});
|
||||
|
||||
await msg.complete(completionSummary);
|
||||
} catch (error) {
|
||||
await loopCtx.step(`${stepPrefix}-failed`, async () => {
|
||||
const message = error instanceof Error ? error.message : "GitHub sync failed";
|
||||
const installationStatus = error instanceof Error && /403|404|401/.test(error.message) ? "reconnect_required" : body.installationStatus;
|
||||
await writeMeta(loopCtx, {
|
||||
connectedAccount: body.connectedAccount,
|
||||
installationStatus,
|
||||
installationId: body.installationId,
|
||||
syncStatus: "error",
|
||||
lastSyncLabel: message,
|
||||
syncPhase: null,
|
||||
syncRunStartedAt: null,
|
||||
});
|
||||
await notifyAppUpdated(loopCtx);
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async function runGithubStateWorkflow(ctx: any): Promise<void> {
|
||||
await ctx.loop("github-command-loop", async (loopCtx: any) => {
|
||||
const msg = await loopCtx.queue.next("next-github-command", {
|
||||
names: [...GITHUB_QUEUE_NAMES],
|
||||
completable: true,
|
||||
});
|
||||
if (!msg) {
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
if (msg.name === "github.command.full_sync") {
|
||||
await runFullSyncWorkflow(loopCtx, msg);
|
||||
return Loop.continue(undefined);
|
||||
}
|
||||
|
||||
return Loop.continue(undefined);
|
||||
});
|
||||
}
|
||||
|
||||
export const githubState = actor({
|
||||
db: githubStateDb,
|
||||
queues: Object.fromEntries(GITHUB_QUEUE_NAMES.map((name) => [name, queue()])),
|
||||
createState: (_c, input: GithubStateInput) => ({
|
||||
organizationId: input.organizationId,
|
||||
}),
|
||||
|
|
@ -423,111 +697,69 @@ export const githubState = actor({
|
|||
syncStatus: input.installationStatus === "connected" ? "pending" : "error",
|
||||
lastSyncLabel: input.label,
|
||||
lastSyncAt: null,
|
||||
syncPhase: null,
|
||||
syncRunStartedAt: null,
|
||||
syncRepositoriesTotal: null,
|
||||
syncRepositoriesCompleted: 0,
|
||||
syncPullRequestRepositoriesCompleted: 0,
|
||||
});
|
||||
const organization = await getOrCreateOrganization(c, c.state.organizationId);
|
||||
await organization.applyOrganizationRepositoryCatalog({
|
||||
repositories: [],
|
||||
});
|
||||
await notifyOrganizationUpdated(c);
|
||||
},
|
||||
|
||||
async fullSync(c, input: FullSyncInput) {
|
||||
const { appShell } = getActorRuntimeContext();
|
||||
const organization = await getOrCreateOrganization(c, c.state.organizationId);
|
||||
const current = await readMeta(c);
|
||||
const counts = await countRows(c);
|
||||
const currentSummary = {
|
||||
...current,
|
||||
...counts,
|
||||
};
|
||||
const matchesCurrentTarget =
|
||||
current.connectedAccount === input.connectedAccount &&
|
||||
current.installationStatus === input.installationStatus &&
|
||||
current.installationId === input.installationId;
|
||||
|
||||
if (!input.force && current.syncStatus === "syncing") {
|
||||
return currentSummary;
|
||||
}
|
||||
|
||||
if (!input.force && matchesCurrentTarget && current.syncStatus === "synced" && counts.repositoryCount > 0) {
|
||||
return currentSummary;
|
||||
}
|
||||
|
||||
const runId = randomUUID();
|
||||
const runStartedAt = Date.now();
|
||||
await writeMeta(c, {
|
||||
connectedAccount: input.connectedAccount,
|
||||
installationStatus: input.installationStatus,
|
||||
installationId: input.installationId,
|
||||
syncStatus: "syncing",
|
||||
lastSyncLabel: input.label ?? "Syncing GitHub data...",
|
||||
lastSyncLabel: input.label ?? "Queued GitHub sync...",
|
||||
syncPhase: "queued",
|
||||
syncRunStartedAt: runStartedAt,
|
||||
syncRepositoriesTotal: null,
|
||||
syncRepositoriesCompleted: 0,
|
||||
syncPullRequestRepositoriesCompleted: 0,
|
||||
});
|
||||
await notifyAppUpdated(c);
|
||||
|
||||
try {
|
||||
const syncFromUserToken = async (): Promise<FullSyncSnapshot> => {
|
||||
const rawRepositories = input.accessToken ? await appShell.github.listUserRepositories(input.accessToken) : [];
|
||||
const repositories =
|
||||
input.kind === "organization"
|
||||
? rawRepositories.filter((repository) => repoBelongsToAccount(repository.fullName, input.githubLogin))
|
||||
: rawRepositories;
|
||||
const members =
|
||||
input.accessToken && input.kind === "organization"
|
||||
? await appShell.github.listOrganizationMembers(input.accessToken, input.githubLogin)
|
||||
: (input.fallbackMembers ?? []).map((member) => ({
|
||||
id: member.id,
|
||||
login: member.login,
|
||||
name: member.name,
|
||||
email: member.email ?? null,
|
||||
role: member.role ?? null,
|
||||
state: member.state ?? "active",
|
||||
}));
|
||||
return {
|
||||
repositories,
|
||||
members,
|
||||
loadPullRequests: async () => (input.accessToken ? await appShell.github.listPullRequestsForUserRepositories(input.accessToken, repositories) : []),
|
||||
};
|
||||
};
|
||||
const self = selfGithubState(c);
|
||||
await self.send(
|
||||
githubWorkflowQueueName("github.command.full_sync"),
|
||||
{
|
||||
...input,
|
||||
runId,
|
||||
runStartedAt,
|
||||
} satisfies FullSyncCommand,
|
||||
{
|
||||
wait: false,
|
||||
},
|
||||
);
|
||||
|
||||
const { repositories, members, loadPullRequests } =
|
||||
input.installationId != null
|
||||
? await (async (): Promise<FullSyncSnapshot> => {
|
||||
try {
|
||||
const repositories = await appShell.github.listInstallationRepositories(input.installationId!);
|
||||
const members =
|
||||
input.kind === "organization"
|
||||
? await appShell.github.listInstallationMembers(input.installationId!, input.githubLogin)
|
||||
: (input.fallbackMembers ?? []).map((member) => ({
|
||||
id: member.id,
|
||||
login: member.login,
|
||||
name: member.name,
|
||||
email: member.email ?? null,
|
||||
role: member.role ?? null,
|
||||
state: member.state ?? "active",
|
||||
}));
|
||||
return {
|
||||
repositories,
|
||||
members,
|
||||
loadPullRequests: async () => await appShell.github.listInstallationPullRequests(input.installationId!),
|
||||
};
|
||||
} catch (error) {
|
||||
if (!input.accessToken) {
|
||||
throw error;
|
||||
}
|
||||
return await syncFromUserToken();
|
||||
}
|
||||
})()
|
||||
: await syncFromUserToken();
|
||||
|
||||
await replaceRepositories(c, repositories);
|
||||
await organization.applyOrganizationRepositoryCatalog({
|
||||
repositories,
|
||||
});
|
||||
await replaceMembers(c, members);
|
||||
const pullRequests = await loadPullRequests();
|
||||
await replacePullRequests(c, pullRequests);
|
||||
|
||||
const lastSyncLabel = repositories.length > 0 ? `Synced ${repositories.length} repositories` : "No repositories available";
|
||||
await writeMeta(c, {
|
||||
connectedAccount: input.connectedAccount,
|
||||
installationStatus: input.installationStatus,
|
||||
installationId: input.installationId,
|
||||
syncStatus: "synced",
|
||||
lastSyncLabel,
|
||||
lastSyncAt: Date.now(),
|
||||
});
|
||||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : "GitHub sync failed";
|
||||
const installationStatus = error instanceof Error && /403|404|401/.test(error.message) ? "reconnect_required" : input.installationStatus;
|
||||
await writeMeta(c, {
|
||||
connectedAccount: input.connectedAccount,
|
||||
installationStatus,
|
||||
installationId: input.installationId,
|
||||
syncStatus: "error",
|
||||
lastSyncLabel: message,
|
||||
});
|
||||
throw error;
|
||||
}
|
||||
|
||||
return await selfGithubState(c).getSummary();
|
||||
return await self.getSummary();
|
||||
},
|
||||
|
||||
async handlePullRequestWebhook(c, input: PullRequestWebhookInput): Promise<void> {
|
||||
|
|
@ -539,6 +771,8 @@ export const githubState = actor({
|
|||
syncStatus: "synced",
|
||||
lastSyncLabel: `Updated PR #${input.pullRequest.number}`,
|
||||
lastSyncAt: Date.now(),
|
||||
syncPhase: null,
|
||||
syncRunStartedAt: null,
|
||||
});
|
||||
|
||||
const repository = await getOrCreateRepository(c, c.state.organizationId, repoIdFromRemote(input.repository.cloneUrl), input.repository.cloneUrl);
|
||||
|
|
@ -546,6 +780,7 @@ export const githubState = actor({
|
|||
branchName: input.pullRequest.headRefName,
|
||||
state: input.pullRequest.state,
|
||||
});
|
||||
await notifyOrganizationUpdated(c);
|
||||
},
|
||||
|
||||
async createPullRequest(
|
||||
|
|
@ -608,7 +843,10 @@ export const githubState = actor({
|
|||
syncStatus: "synced",
|
||||
lastSyncLabel: `Linked existing PR #${existing.number}`,
|
||||
lastSyncAt: now,
|
||||
syncPhase: null,
|
||||
syncRunStartedAt: null,
|
||||
});
|
||||
await notifyOrganizationUpdated(c);
|
||||
|
||||
return created;
|
||||
}
|
||||
|
|
@ -633,7 +871,10 @@ export const githubState = actor({
|
|||
syncStatus: "synced",
|
||||
lastSyncLabel: `Created PR #${created.number}`,
|
||||
lastSyncAt: now,
|
||||
syncPhase: null,
|
||||
syncRunStartedAt: null,
|
||||
});
|
||||
await notifyOrganizationUpdated(c);
|
||||
|
||||
return created;
|
||||
},
|
||||
|
|
@ -646,4 +887,5 @@ export const githubState = actor({
|
|||
});
|
||||
},
|
||||
},
|
||||
run: workflow(runGithubStateWorkflow),
|
||||
});
|
||||
|
|
|
|||
|
|
@ -610,6 +610,11 @@ export const workspaceAppActions = {
|
|||
return await buildAppSnapshot(c, input.sessionId);
|
||||
},
|
||||
|
||||
async notifyAppUpdated(c: any): Promise<void> {
|
||||
assertAppWorkspace(c);
|
||||
c.broadcast("appUpdated", { at: Date.now() });
|
||||
},
|
||||
|
||||
async resolveAppGithubToken(
|
||||
c: any,
|
||||
input: { organizationId: string; requireRepoScope?: boolean },
|
||||
|
|
@ -785,6 +790,7 @@ export const workspaceAppActions = {
|
|||
installationId: organization.snapshot.kind === "personal" ? null : organization.githubInstallationId,
|
||||
accessToken: auth.accessToken,
|
||||
label: "Syncing GitHub data...",
|
||||
force: true,
|
||||
fallbackMembers:
|
||||
organization.snapshot.kind === "personal"
|
||||
? [
|
||||
|
|
@ -1052,8 +1058,8 @@ export const workspaceAppActions = {
|
|||
const { appShell } = getActorRuntimeContext();
|
||||
const { event, body } = appShell.github.verifyWebhookEvent(input.payload, input.signatureHeader, input.eventHeader);
|
||||
|
||||
const accountLogin = body.installation?.account?.login;
|
||||
const accountType = body.installation?.account?.type;
|
||||
const accountLogin = body.installation?.account?.login ?? body.repository?.owner?.login ?? body.organization?.login ?? null;
|
||||
const accountType = body.installation?.account?.type ?? body.repository?.owner?.type ?? (body.organization?.login ? "Organization" : null);
|
||||
if (!accountLogin) {
|
||||
console.log(`[github-webhook] Ignoring ${event}.${body.action ?? ""}: no installation account`);
|
||||
return { ok: true };
|
||||
|
|
@ -1080,6 +1086,7 @@ export const workspaceAppActions = {
|
|||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
label: "Syncing GitHub data from installation webhook...",
|
||||
force: true,
|
||||
fallbackMembers: [],
|
||||
});
|
||||
} else if (body.action === "suspend") {
|
||||
|
|
@ -1097,6 +1104,7 @@ export const workspaceAppActions = {
|
|||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
label: "Resyncing GitHub data after unsuspend...",
|
||||
force: true,
|
||||
fallbackMembers: [],
|
||||
});
|
||||
}
|
||||
|
|
@ -1114,6 +1122,7 @@ export const workspaceAppActions = {
|
|||
installationStatus: "connected",
|
||||
installationId: body.installation?.id ?? null,
|
||||
label: "Resyncing GitHub data after repository access change...",
|
||||
force: true,
|
||||
fallbackMembers: [],
|
||||
});
|
||||
return { ok: true };
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ export async function startBackend(options: BackendStartOptions = {}): Promise<v
|
|||
const notifications = createNotificationService(backends);
|
||||
initActorRuntimeContext(config, providers, notifications, driver, createDefaultAppShellServices());
|
||||
|
||||
registry.startRunner();
|
||||
await registry.startRunner();
|
||||
const managerOrigin = `http://127.0.0.1:${resolveManagerPort()}`;
|
||||
const actorClient = createClient({
|
||||
endpoint: managerOrigin,
|
||||
|
|
|
|||
|
|
@ -408,6 +408,11 @@ export class GitHubAppClient {
|
|||
return await this.listPullRequestsForRepositories(repositories, accessToken);
|
||||
}
|
||||
|
||||
async listInstallationPullRequestsForRepositories(installationId: number, repositories: GitHubRepositoryRecord[]): Promise<GitHubPullRequestRecord[]> {
|
||||
const accessToken = await this.createInstallationAccessToken(installationId);
|
||||
return await this.listPullRequestsForRepositories(repositories, accessToken);
|
||||
}
|
||||
|
||||
async listUserPullRequests(accessToken: string): Promise<GitHubPullRequestRecord[]> {
|
||||
const repositories = await this.listUserRepositories(accessToken);
|
||||
return await this.listPullRequestsForRepositories(repositories, accessToken);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue