Stabilize SDK mode integration test

This commit is contained in:
Nathan Flurry 2026-03-10 22:37:27 -07:00
parent 24e99ac5e7
commit ec8b6afea9
274 changed files with 5412 additions and 7893 deletions

View file

@ -2,24 +2,9 @@
import { randomUUID } from "node:crypto";
import { and, desc, eq, isNotNull, ne } from "drizzle-orm";
import { Loop } from "rivetkit/workflow";
import type {
AgentType,
HandoffRecord,
HandoffSummary,
ProviderId,
RepoOverview,
RepoStackAction,
RepoStackActionResult
} from "@openhandoff/shared";
import type { AgentType, HandoffRecord, HandoffSummary, ProviderId, RepoOverview, RepoStackAction, RepoStackActionResult } from "@openhandoff/shared";
import { getActorRuntimeContext } from "../context.js";
import {
getHandoff,
getOrCreateHandoff,
getOrCreateHistory,
getOrCreateProjectBranchSync,
getOrCreateProjectPrSync,
selfProject
} from "../handles.js";
import { getHandoff, getOrCreateHandoff, getOrCreateHistory, getOrCreateProjectBranchSync, getOrCreateProjectPrSync, selfProject } from "../handles.js";
import { isActorNotFoundError, logActorWarning, resolveErrorMessage } from "../logging.js";
import { openhandoffRepoClonePath } from "../../services/openhandoff-paths.js";
import { expectQueueResponse } from "../../services/queue.js";
@ -163,11 +148,7 @@ async function ensureHandoffIndexHydrated(c: any): Promise<void> {
return;
}
const existing = await c.db
.select({ handoffId: handoffIndex.handoffId })
.from(handoffIndex)
.limit(1)
.get();
const existing = await c.db.select({ handoffId: handoffIndex.handoffId }).from(handoffIndex).limit(1).get();
if (existing) {
c.state.handoffIndexHydrated = true;
@ -204,7 +185,7 @@ async function ensureHandoffIndexHydrated(c: any): Promise<void> {
handoffId: row.handoffId,
branchName: row.branchName,
createdAt: row.createdAt,
updatedAt: row.createdAt
updatedAt: row.createdAt,
})
.onConflictDoNothing()
.run();
@ -214,14 +195,14 @@ async function ensureHandoffIndexHydrated(c: any): Promise<void> {
logActorWarning("project", "skipped missing handoffs while hydrating index", {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
skippedMissingHandoffActors
skippedMissingHandoffActors,
});
}
} catch (error) {
logActorWarning("project", "handoff index hydration from history failed", {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
error: resolveErrorMessage(error)
error: resolveErrorMessage(error),
});
}
@ -283,7 +264,7 @@ async function enrichHandoffRecord(c: any, record: HandoffRecord): Promise<Hando
diffStat: branches.diffStat,
hasUnpushed: branches.hasUnpushed,
conflictsWithMain: branches.conflictsWithMain,
parentBranch: branches.parentBranch
parentBranch: branches.parentBranch,
})
.from(branches)
.where(eq(branches.branchName, branchName))
@ -298,7 +279,7 @@ async function enrichHandoffRecord(c: any, record: HandoffRecord): Promise<Hando
prAuthor: prCache.prAuthor,
ciStatus: prCache.ciStatus,
reviewStatus: prCache.reviewStatus,
reviewer: prCache.reviewer
reviewer: prCache.reviewer,
})
.from(prCache)
.where(eq(prCache.branchName, branchName))
@ -315,7 +296,7 @@ async function enrichHandoffRecord(c: any, record: HandoffRecord): Promise<Hando
prAuthor: pr?.prAuthor ?? null,
ciStatus: pr?.ciStatus ?? null,
reviewStatus: pr?.reviewStatus ?? null,
reviewer: pr?.reviewer ?? null
reviewer: pr?.reviewer ?? null,
};
}
@ -328,14 +309,14 @@ async function ensureProjectMutation(c: any, cmd: EnsureProjectCommand): Promise
.values({
id: 1,
remoteUrl: cmd.remoteUrl,
updatedAt: Date.now()
updatedAt: Date.now(),
})
.onConflictDoUpdate({
target: repoMeta.id,
set: {
remoteUrl: cmd.remoteUrl,
updatedAt: Date.now()
}
updatedAt: Date.now(),
},
})
.run();
@ -357,11 +338,7 @@ async function createHandoffMutation(c: any, cmd: CreateHandoffCommand): Promise
if (onBranch) {
await forceProjectSync(c, localPath);
const branchRow = await c.db
.select({ branchName: branches.branchName })
.from(branches)
.where(eq(branches.branchName, onBranch))
.get();
const branchRow = await c.db.select({ branchName: branches.branchName }).from(branches).where(eq(branches.branchName, onBranch)).get();
if (!branchRow) {
throw new Error(`Branch not found in repo snapshot: ${onBranch}`);
}
@ -369,7 +346,7 @@ async function createHandoffMutation(c: any, cmd: CreateHandoffCommand): Promise
await registerHandoffBranchMutation(c, {
handoffId,
branchName: onBranch,
requireExistingRemote: true
requireExistingRemote: true,
});
}
@ -387,11 +364,15 @@ async function createHandoffMutation(c: any, cmd: CreateHandoffCommand): Promise
providerId: cmd.providerId,
agentType: cmd.agentType,
explicitTitle: onBranch ? null : cmd.explicitTitle,
explicitBranchName: onBranch ? null : cmd.explicitBranchName
explicitBranchName: onBranch ? null : cmd.explicitBranchName,
});
} catch (error) {
if (onBranch) {
await c.db.delete(handoffIndex).where(eq(handoffIndex.handoffId, handoffId)).run().catch(() => {});
await c.db
.delete(handoffIndex)
.where(eq(handoffIndex.handoffId, handoffId))
.run()
.catch(() => {});
}
throw error;
}
@ -404,7 +385,7 @@ async function createHandoffMutation(c: any, cmd: CreateHandoffCommand): Promise
handoffId,
branchName: initialBranchName,
createdAt: now,
updatedAt: now
updatedAt: now,
})
.onConflictDoNothing()
.run();
@ -418,17 +399,14 @@ async function createHandoffMutation(c: any, cmd: CreateHandoffCommand): Promise
handoffId,
payload: {
repoId: c.state.repoId,
providerId: cmd.providerId
}
providerId: cmd.providerId,
},
});
return created;
}
async function registerHandoffBranchMutation(
c: any,
cmd: RegisterHandoffBranchCommand,
): Promise<{ branchName: string; headSha: string }> {
async function registerHandoffBranchMutation(c: any, cmd: RegisterHandoffBranchCommand): Promise<{ branchName: string; headSha: string }> {
const localPath = await ensureProjectReady(c);
const branchName = cmd.branchName.trim();
@ -458,7 +436,7 @@ async function registerHandoffBranchMutation(
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
handoffId: existingOwner.handoffId,
branchName
branchName,
});
} else {
throw error;
@ -508,7 +486,7 @@ async function registerHandoffBranchMutation(
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
branchName,
error: resolveErrorMessage(error)
error: resolveErrorMessage(error),
});
}
stackRows = await driver.stack.listStack(localPath).catch(() => []);
@ -530,7 +508,7 @@ async function registerHandoffBranchMutation(
trackedInStack: trackedInStack ? 1 : 0,
firstSeenAt: now,
lastSeenAt: now,
updatedAt: now
updatedAt: now,
})
.onConflictDoUpdate({
target: branches.branchName,
@ -539,8 +517,8 @@ async function registerHandoffBranchMutation(
parentBranch,
trackedInStack: trackedInStack ? 1 : 0,
lastSeenAt: now,
updatedAt: now
}
updatedAt: now,
},
})
.run();
@ -550,14 +528,14 @@ async function registerHandoffBranchMutation(
handoffId: cmd.handoffId,
branchName,
createdAt: now,
updatedAt: now
updatedAt: now,
})
.onConflictDoUpdate({
target: handoffIndex.handoffId,
set: {
branchName,
updatedAt: now
}
updatedAt: now,
},
})
.run();
@ -579,7 +557,7 @@ async function runRepoStackActionMutation(c: any, cmd: RunRepoStackActionCommand
action,
executed: false,
message: "git-spice is not available for this repo",
at
at,
};
}
@ -593,11 +571,7 @@ async function runRepoStackActionMutation(c: any, cmd: RunRepoStackActionCommand
await forceProjectSync(c, localPath);
if (branchName) {
const row = await c.db
.select({ branchName: branches.branchName })
.from(branches)
.where(eq(branches.branchName, branchName))
.get();
const row = await c.db.select({ branchName: branches.branchName }).from(branches).where(eq(branches.branchName, branchName)).get();
if (!row) {
throw new Error(`Branch not found in repo snapshot: ${branchName}`);
}
@ -610,11 +584,7 @@ async function runRepoStackActionMutation(c: any, cmd: RunRepoStackActionCommand
if (parentBranch === branchName) {
throw new Error("parentBranch must be different from branchName");
}
const parentRow = await c.db
.select({ branchName: branches.branchName })
.from(branches)
.where(eq(branches.branchName, parentBranch))
.get();
const parentRow = await c.db.select({ branchName: branches.branchName }).from(branches).where(eq(branches.branchName, parentBranch)).get();
if (!parentRow) {
throw new Error(`Parent branch not found in repo snapshot: ${parentBranch}`);
}
@ -646,15 +616,15 @@ async function runRepoStackActionMutation(c: any, cmd: RunRepoStackActionCommand
payload: {
action,
branchName: branchName ?? null,
parentBranch: parentBranch ?? null
}
parentBranch: parentBranch ?? null,
},
});
} catch (error) {
logActorWarning("project", "failed appending repo stack history event", {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
action,
error: resolveErrorMessage(error)
error: resolveErrorMessage(error),
});
}
@ -662,7 +632,7 @@ async function runRepoStackActionMutation(c: any, cmd: RunRepoStackActionCommand
action,
executed: true,
message: `stack action executed: ${action}`,
at
at,
};
}
@ -684,7 +654,7 @@ async function applyPrSyncResultMutation(c: any, body: PrSyncResult): Promise<vo
reviewStatus: item.reviewStatus ?? null,
reviewer: item.reviewer ?? null,
fetchedAt: body.at,
updatedAt: body.at
updatedAt: body.at,
})
.onConflictDoUpdate({
target: prCache.branchName,
@ -699,8 +669,8 @@ async function applyPrSyncResultMutation(c: any, body: PrSyncResult): Promise<vo
reviewStatus: item.reviewStatus ?? null,
reviewer: item.reviewer ?? null,
fetchedAt: body.at,
updatedAt: body.at
}
updatedAt: body.at,
},
})
.run();
}
@ -710,11 +680,7 @@ async function applyPrSyncResultMutation(c: any, body: PrSyncResult): Promise<vo
continue;
}
const row = await c.db
.select({ handoffId: handoffIndex.handoffId })
.from(handoffIndex)
.where(eq(handoffIndex.branchName, item.headRefName))
.get();
const row = await c.db.select({ handoffId: handoffIndex.handoffId }).from(handoffIndex).where(eq(handoffIndex.branchName, item.headRefName)).get();
if (!row) {
continue;
}
@ -730,7 +696,7 @@ async function applyPrSyncResultMutation(c: any, body: PrSyncResult): Promise<vo
repoId: c.state.repoId,
handoffId: row.handoffId,
branchName: item.headRefName,
prState: item.state
prState: item.state,
});
continue;
}
@ -740,7 +706,7 @@ async function applyPrSyncResultMutation(c: any, body: PrSyncResult): Promise<vo
handoffId: row.handoffId,
branchName: item.headRefName,
prState: item.state,
error: resolveErrorMessage(error)
error: resolveErrorMessage(error),
});
}
}
@ -752,7 +718,7 @@ async function applyBranchSyncResultMutation(c: any, body: BranchSyncResult): Pr
for (const item of body.items) {
const existing = await c.db
.select({
firstSeenAt: branches.firstSeenAt
firstSeenAt: branches.firstSeenAt,
})
.from(branches)
.where(eq(branches.branchName, item.branchName))
@ -770,7 +736,7 @@ async function applyBranchSyncResultMutation(c: any, body: BranchSyncResult): Pr
conflictsWithMain: item.conflictsWithMain ? 1 : 0,
firstSeenAt: existing?.firstSeenAt ?? body.at,
lastSeenAt: body.at,
updatedAt: body.at
updatedAt: body.at,
})
.onConflictDoUpdate({
target: branches.branchName,
@ -783,16 +749,13 @@ async function applyBranchSyncResultMutation(c: any, body: BranchSyncResult): Pr
conflictsWithMain: item.conflictsWithMain ? 1 : 0,
firstSeenAt: existing?.firstSeenAt ?? body.at,
lastSeenAt: body.at,
updatedAt: body.at
}
updatedAt: body.at,
},
})
.run();
}
const existingRows = await c.db
.select({ branchName: branches.branchName })
.from(branches)
.all();
const existingRows = await c.db.select({ branchName: branches.branchName }).from(branches).all();
for (const row of existingRows) {
if (incoming.has(row.branchName)) {
@ -822,62 +785,60 @@ export async function runProjectWorkflow(ctx: any): Promise<void> {
return Loop.continue(undefined);
}
if (msg.name === "project.command.hydrateHandoffIndex") {
await loopCtx.step("project-hydrate-handoff-index", async () =>
hydrateHandoffIndexMutation(loopCtx, msg.body as HydrateHandoffIndexCommand),
);
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "project.command.hydrateHandoffIndex") {
await loopCtx.step("project-hydrate-handoff-index", async () => hydrateHandoffIndexMutation(loopCtx, msg.body as HydrateHandoffIndexCommand));
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "project.command.createHandoff") {
const result = await loopCtx.step({
name: "project-create-handoff",
timeout: 12 * 60_000,
run: async () => createHandoffMutation(loopCtx, msg.body as CreateHandoffCommand),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "project.command.createHandoff") {
const result = await loopCtx.step({
name: "project-create-handoff",
timeout: 12 * 60_000,
run: async () => createHandoffMutation(loopCtx, msg.body as CreateHandoffCommand),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "project.command.registerHandoffBranch") {
const result = await loopCtx.step({
name: "project-register-handoff-branch",
timeout: 5 * 60_000,
run: async () => registerHandoffBranchMutation(loopCtx, msg.body as RegisterHandoffBranchCommand),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "project.command.registerHandoffBranch") {
const result = await loopCtx.step({
name: "project-register-handoff-branch",
timeout: 5 * 60_000,
run: async () => registerHandoffBranchMutation(loopCtx, msg.body as RegisterHandoffBranchCommand),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "project.command.runRepoStackAction") {
const result = await loopCtx.step({
name: "project-run-repo-stack-action",
timeout: 12 * 60_000,
run: async () => runRepoStackActionMutation(loopCtx, msg.body as RunRepoStackActionCommand),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "project.command.runRepoStackAction") {
const result = await loopCtx.step({
name: "project-run-repo-stack-action",
timeout: 12 * 60_000,
run: async () => runRepoStackActionMutation(loopCtx, msg.body as RunRepoStackActionCommand),
});
await msg.complete(result);
return Loop.continue(undefined);
}
if (msg.name === "project.command.applyPrSyncResult") {
await loopCtx.step({
name: "project-apply-pr-sync-result",
timeout: 60_000,
run: async () => applyPrSyncResultMutation(loopCtx, msg.body as PrSyncResult),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "project.command.applyPrSyncResult") {
await loopCtx.step({
name: "project-apply-pr-sync-result",
timeout: 60_000,
run: async () => applyPrSyncResultMutation(loopCtx, msg.body as PrSyncResult),
});
await msg.complete({ ok: true });
return Loop.continue(undefined);
}
if (msg.name === "project.command.applyBranchSyncResult") {
await loopCtx.step({
name: "project-apply-branch-sync-result",
timeout: 60_000,
run: async () => applyBranchSyncResultMutation(loopCtx, msg.body as BranchSyncResult),
});
await msg.complete({ ok: true });
}
if (msg.name === "project.command.applyBranchSyncResult") {
await loopCtx.step({
name: "project-apply-branch-sync-result",
timeout: 60_000,
run: async () => applyBranchSyncResultMutation(loopCtx, msg.body as BranchSyncResult),
});
await msg.complete({ ok: true });
}
return Loop.continue(undefined);
});
@ -907,15 +868,9 @@ export const projectActions = {
async listReservedBranches(c: any, _cmd?: ListReservedBranchesCommand): Promise<string[]> {
await ensureHandoffIndexHydratedForRead(c);
const rows = await c.db
.select({ branchName: handoffIndex.branchName })
.from(handoffIndex)
.where(isNotNull(handoffIndex.branchName))
.all();
const rows = await c.db.select({ branchName: handoffIndex.branchName }).from(handoffIndex).where(isNotNull(handoffIndex.branchName)).all();
return rows
.map((row) => row.branchName)
.filter((name): name is string => typeof name === "string" && name.trim().length > 0);
return rows.map((row) => row.branchName).filter((name): name is string => typeof name === "string" && name.trim().length > 0);
},
async registerHandoffBranch(c: any, cmd: RegisterHandoffBranchCommand): Promise<{ branchName: string; headSha: string }> {
@ -942,11 +897,7 @@ export const projectActions = {
await ensureHandoffIndexHydratedForRead(c);
const handoffRows = await c.db
.select({ handoffId: handoffIndex.handoffId })
.from(handoffIndex)
.orderBy(desc(handoffIndex.updatedAt))
.all();
const handoffRows = await c.db.select({ handoffId: handoffIndex.handoffId }).from(handoffIndex).orderBy(desc(handoffIndex.updatedAt)).all();
for (const row of handoffRows) {
try {
@ -964,7 +915,7 @@ export const projectActions = {
branchName: record.branchName,
title: record.title,
status: record.status,
updatedAt: record.updatedAt
updatedAt: record.updatedAt,
});
} catch (error) {
if (isStaleHandoffReferenceError(error)) {
@ -972,7 +923,7 @@ export const projectActions = {
logActorWarning("project", "pruned stale handoff index row during summary listing", {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
handoffId: row.handoffId
handoffId: row.handoffId,
});
continue;
}
@ -980,7 +931,7 @@ export const projectActions = {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
handoffId: row.handoffId,
error: resolveErrorMessage(error)
error: resolveErrorMessage(error),
});
}
}
@ -992,11 +943,7 @@ export const projectActions = {
async getHandoffEnriched(c: any, cmd: GetHandoffEnrichedCommand): Promise<HandoffRecord> {
await ensureHandoffIndexHydratedForRead(c);
const row = await c.db
.select({ handoffId: handoffIndex.handoffId })
.from(handoffIndex)
.where(eq(handoffIndex.handoffId, cmd.handoffId))
.get();
const row = await c.db.select({ handoffId: handoffIndex.handoffId }).from(handoffIndex).where(eq(handoffIndex.handoffId, cmd.handoffId)).get();
if (!row) {
throw new Error(`Unknown handoff in repo ${c.state.repoId}: ${cmd.handoffId}`);
}
@ -1035,7 +982,7 @@ export const projectActions = {
conflictsWithMain: branches.conflictsWithMain,
firstSeenAt: branches.firstSeenAt,
lastSeenAt: branches.lastSeenAt,
updatedAt: branches.updatedAt
updatedAt: branches.updatedAt,
})
.from(branches)
.all();
@ -1044,15 +991,12 @@ export const projectActions = {
.select({
handoffId: handoffIndex.handoffId,
branchName: handoffIndex.branchName,
updatedAt: handoffIndex.updatedAt
updatedAt: handoffIndex.updatedAt,
})
.from(handoffIndex)
.all();
const handoffMetaByBranch = new Map<
string,
{ handoffId: string; title: string | null; status: HandoffRecord["status"] | null; updatedAt: number }
>();
const handoffMetaByBranch = new Map<string, { handoffId: string; title: string | null; status: HandoffRecord["status"] | null; updatedAt: number }>();
for (const row of handoffRows) {
if (!row.branchName) {
@ -1065,7 +1009,7 @@ export const projectActions = {
handoffId: row.handoffId,
title: record.title ?? null,
status: record.status,
updatedAt: record.updatedAt
updatedAt: record.updatedAt,
});
} catch (error) {
if (isStaleHandoffReferenceError(error)) {
@ -1074,7 +1018,7 @@ export const projectActions = {
workspaceId: c.state.workspaceId,
repoId: c.state.repoId,
handoffId: row.handoffId,
branchName: row.branchName
branchName: row.branchName,
});
continue;
}
@ -1083,7 +1027,7 @@ export const projectActions = {
repoId: c.state.repoId,
handoffId: row.handoffId,
branchName: row.branchName,
error: resolveErrorMessage(error)
error: resolveErrorMessage(error),
});
}
}
@ -1096,7 +1040,7 @@ export const projectActions = {
prUrl: prCache.prUrl,
ciStatus: prCache.ciStatus,
reviewStatus: prCache.reviewStatus,
reviewer: prCache.reviewer
reviewer: prCache.reviewer,
})
.from(prCache)
.all();
@ -1106,8 +1050,8 @@ export const projectActions = {
branchRowsRaw.map((row) => ({
branchName: row.branchName,
parentBranch: row.parentBranch ?? null,
updatedAt: row.updatedAt
}))
updatedAt: row.updatedAt,
})),
);
const detailByBranch = new Map(branchRowsRaw.map((row) => [row.branchName, row]));
@ -1135,7 +1079,7 @@ export const projectActions = {
reviewer: pr?.reviewer ?? null,
firstSeenAt: row.firstSeenAt ?? null,
lastSeenAt: row.lastSeenAt ?? null,
updatedAt: Math.max(row.updatedAt, handoffMeta?.updatedAt ?? 0)
updatedAt: Math.max(row.updatedAt, handoffMeta?.updatedAt ?? 0),
};
});
@ -1146,14 +1090,11 @@ export const projectActions = {
baseRef,
stackAvailable,
fetchedAt: now,
branches: branchRows
branches: branchRows,
};
},
async getPullRequestForBranch(
c: any,
cmd: GetPullRequestForBranchCommand,
): Promise<{ number: number; status: "draft" | "ready" } | null> {
async getPullRequestForBranch(c: any, cmd: GetPullRequestForBranchCommand): Promise<{ number: number; status: "draft" | "ready" } | null> {
const branchName = cmd.branchName?.trim();
if (!branchName) {
return null;
@ -1202,5 +1143,5 @@ export const projectActions = {
wait: true,
timeout: 5 * 60_000,
});
}
},
};