mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-21 08:02:18 +00:00
wip
This commit is contained in:
parent
400f9a214e
commit
3263d4f5e1
18 changed files with 677 additions and 329 deletions
|
|
@ -156,7 +156,7 @@ export const projectBranchSync = actor({
|
|||
|
||||
async force(c): Promise<void> {
|
||||
const self = selfProjectBranchSync(c);
|
||||
await self.send(CONTROL.force, {}, { wait: true, timeout: 5 * 60_000 });
|
||||
await self.send(CONTROL.force, {}, { wait: true, timeout: 10_000 });
|
||||
},
|
||||
},
|
||||
run: workflow(async (ctx) => {
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import { resolveWorkspaceGithubAuth } from "../../services/github-auth.js";
|
|||
import { expectQueueResponse } from "../../services/queue.js";
|
||||
import { withRepoGitLock } from "../../services/repo-git-lock.js";
|
||||
import { branches, taskIndex, repoActionJobs, repoMeta } from "./db/schema.js";
|
||||
import { deriveFallbackTitle } from "../../services/create-flow.js";
|
||||
import { deriveFallbackTitle, resolveCreateFlowDecision } from "../../services/create-flow.js";
|
||||
import { normalizeBaseBranchName } from "../../integrations/git-spice/index.js";
|
||||
import { sortBranchesForOverview } from "./stack-model.js";
|
||||
|
||||
|
|
@ -416,37 +416,81 @@ async function hydrateTaskIndexMutation(c: any, _cmd?: HydrateTaskIndexCommand):
|
|||
}
|
||||
|
||||
async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promise<TaskRecord> {
|
||||
const workspaceId = c.state.workspaceId;
|
||||
const repoId = c.state.repoId;
|
||||
const repoRemote = c.state.remoteUrl;
|
||||
const onBranch = cmd.onBranch?.trim() || null;
|
||||
const initialBranchName = onBranch;
|
||||
const initialTitle = onBranch ? deriveFallbackTitle(cmd.task, cmd.explicitTitle ?? undefined) : null;
|
||||
const taskId = randomUUID();
|
||||
let initialBranchName: string | null = null;
|
||||
let initialTitle: string | null = null;
|
||||
|
||||
if (onBranch) {
|
||||
initialBranchName = onBranch;
|
||||
initialTitle = deriveFallbackTitle(cmd.task, cmd.explicitTitle ?? undefined);
|
||||
|
||||
await registerTaskBranchMutation(c, {
|
||||
taskId,
|
||||
branchName: onBranch,
|
||||
requireExistingRemote: true,
|
||||
});
|
||||
} else {
|
||||
const localPath = await ensureProjectReady(c);
|
||||
const { driver } = getActorRuntimeContext();
|
||||
|
||||
// Read locally cached remote-tracking refs — no network fetch.
|
||||
// The branch sync actor keeps these reasonably fresh. If a rare naming
|
||||
// collision occurs with a very recently created remote branch, it will
|
||||
// be caught lazily on push/checkout.
|
||||
const remoteBranches = (await driver.git.listLocalRemoteRefs(localPath)).map((branch: any) => branch.branchName);
|
||||
|
||||
await ensureTaskIndexHydrated(c);
|
||||
const reservedBranchRows = await c.db.select({ branchName: taskIndex.branchName }).from(taskIndex).where(isNotNull(taskIndex.branchName)).all();
|
||||
const reservedBranches = reservedBranchRows
|
||||
.map((row: { branchName: string | null }) => row.branchName)
|
||||
.filter((branchName): branchName is string => typeof branchName === "string" && branchName.length > 0);
|
||||
|
||||
const resolved = resolveCreateFlowDecision({
|
||||
task: cmd.task,
|
||||
explicitTitle: cmd.explicitTitle ?? undefined,
|
||||
explicitBranchName: cmd.explicitBranchName ?? undefined,
|
||||
localBranches: remoteBranches,
|
||||
taskBranches: reservedBranches,
|
||||
});
|
||||
|
||||
initialBranchName = resolved.branchName;
|
||||
initialTitle = resolved.title;
|
||||
|
||||
const now = Date.now();
|
||||
await c.db
|
||||
.insert(taskIndex)
|
||||
.values({
|
||||
taskId,
|
||||
branchName: resolved.branchName,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.onConflictDoNothing()
|
||||
.run();
|
||||
}
|
||||
|
||||
let task: Awaited<ReturnType<typeof getOrCreateTask>>;
|
||||
try {
|
||||
task = await getOrCreateTask(c, c.state.workspaceId, c.state.repoId, taskId, {
|
||||
workspaceId: c.state.workspaceId,
|
||||
repoId: c.state.repoId,
|
||||
task = await getOrCreateTask(c, workspaceId, repoId, taskId, {
|
||||
workspaceId,
|
||||
repoId,
|
||||
taskId,
|
||||
repoRemote: c.state.remoteUrl,
|
||||
repoRemote,
|
||||
branchName: initialBranchName,
|
||||
title: initialTitle,
|
||||
task: cmd.task,
|
||||
providerId: cmd.providerId,
|
||||
agentType: cmd.agentType,
|
||||
explicitTitle: onBranch ? null : cmd.explicitTitle,
|
||||
explicitBranchName: onBranch ? null : cmd.explicitBranchName,
|
||||
explicitTitle: null,
|
||||
explicitBranchName: null,
|
||||
initialPrompt: cmd.initialPrompt,
|
||||
});
|
||||
} catch (error) {
|
||||
if (onBranch) {
|
||||
if (initialBranchName) {
|
||||
await c.db
|
||||
.delete(taskIndex)
|
||||
.where(eq(taskIndex.taskId, taskId))
|
||||
|
|
@ -456,28 +500,14 @@ async function createTaskMutation(c: any, cmd: CreateTaskCommand): Promise<TaskR
|
|||
throw error;
|
||||
}
|
||||
|
||||
if (!onBranch) {
|
||||
const now = Date.now();
|
||||
await c.db
|
||||
.insert(taskIndex)
|
||||
.values({
|
||||
taskId,
|
||||
branchName: initialBranchName,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.onConflictDoNothing()
|
||||
.run();
|
||||
}
|
||||
|
||||
const created = await task.initialize({ providerId: cmd.providerId });
|
||||
|
||||
const history = await getOrCreateHistory(c, c.state.workspaceId, c.state.repoId);
|
||||
const history = await getOrCreateHistory(c, workspaceId, repoId);
|
||||
await history.append({
|
||||
kind: "task.created",
|
||||
taskId,
|
||||
payload: {
|
||||
repoId: c.state.repoId,
|
||||
repoId,
|
||||
providerId: cmd.providerId,
|
||||
},
|
||||
});
|
||||
|
|
@ -919,7 +949,7 @@ export const projectActions = {
|
|||
return expectQueueResponse<EnsureProjectResult>(
|
||||
await self.send(projectWorkflowQueueName("project.command.ensure"), cmd, {
|
||||
wait: true,
|
||||
timeout: 5 * 60_000,
|
||||
timeout: 10_000,
|
||||
}),
|
||||
);
|
||||
},
|
||||
|
|
@ -929,7 +959,7 @@ export const projectActions = {
|
|||
return expectQueueResponse<TaskRecord>(
|
||||
await self.send(projectWorkflowQueueName("project.command.createTask"), cmd, {
|
||||
wait: true,
|
||||
timeout: 5 * 60_000,
|
||||
timeout: 10_000,
|
||||
}),
|
||||
);
|
||||
},
|
||||
|
|
@ -947,7 +977,7 @@ export const projectActions = {
|
|||
return expectQueueResponse<{ branchName: string; headSha: string }>(
|
||||
await self.send(projectWorkflowQueueName("project.command.registerTaskBranch"), cmd, {
|
||||
wait: true,
|
||||
timeout: 5 * 60_000,
|
||||
timeout: 10_000,
|
||||
}),
|
||||
);
|
||||
},
|
||||
|
|
@ -956,7 +986,7 @@ export const projectActions = {
|
|||
const self = selfProject(c);
|
||||
await self.send(projectWorkflowQueueName("project.command.hydrateTaskIndex"), cmd ?? {}, {
|
||||
wait: true,
|
||||
timeout: 60_000,
|
||||
timeout: 10_000,
|
||||
});
|
||||
},
|
||||
|
||||
|
|
@ -1225,7 +1255,7 @@ export const projectActions = {
|
|||
const self = selfProject(c);
|
||||
await self.send(projectWorkflowQueueName("project.command.applyBranchSyncResult"), body, {
|
||||
wait: true,
|
||||
timeout: 5 * 60_000,
|
||||
timeout: 10_000,
|
||||
});
|
||||
},
|
||||
};
|
||||
|
|
|
|||
|
|
@ -101,14 +101,15 @@ interface TaskWorkbenchSendMessageCommand {
|
|||
attachments: Array<any>;
|
||||
}
|
||||
|
||||
interface TaskWorkbenchSendMessageActionInput extends TaskWorkbenchSendMessageInput {
|
||||
waitForCompletion?: boolean;
|
||||
}
|
||||
|
||||
interface TaskWorkbenchCreateSessionCommand {
|
||||
model?: string;
|
||||
}
|
||||
|
||||
interface TaskWorkbenchCreateSessionAndSendCommand {
|
||||
model?: string;
|
||||
text: string;
|
||||
}
|
||||
|
||||
interface TaskWorkbenchSessionCommand {
|
||||
sessionId: string;
|
||||
}
|
||||
|
|
@ -143,7 +144,7 @@ export const task = actor({
|
|||
const self = selfTask(c);
|
||||
const result = await self.send(taskWorkflowQueueName("task.command.initialize"), cmd ?? {}, {
|
||||
wait: true,
|
||||
timeout: 5 * 60_000,
|
||||
timeout: 10_000,
|
||||
});
|
||||
return expectQueueResponse<TaskRecord>(result);
|
||||
},
|
||||
|
|
@ -160,7 +161,7 @@ export const task = actor({
|
|||
const self = selfTask(c);
|
||||
const result = await self.send(taskWorkflowQueueName("task.command.attach"), cmd ?? {}, {
|
||||
wait: true,
|
||||
timeout: 20_000,
|
||||
timeout: 10_000,
|
||||
});
|
||||
return expectQueueResponse<{ target: string; sessionId: string | null }>(result);
|
||||
},
|
||||
|
|
@ -172,7 +173,7 @@ export const task = actor({
|
|||
{},
|
||||
{
|
||||
wait: true,
|
||||
timeout: 20_000,
|
||||
timeout: 10_000,
|
||||
},
|
||||
);
|
||||
return expectQueueResponse<{ switchTarget: string }>(result);
|
||||
|
|
@ -236,7 +237,7 @@ export const task = actor({
|
|||
{},
|
||||
{
|
||||
wait: true,
|
||||
timeout: 20_000,
|
||||
timeout: 10_000,
|
||||
},
|
||||
);
|
||||
},
|
||||
|
|
@ -263,12 +264,25 @@ export const task = actor({
|
|||
{ ...(input?.model ? { model: input.model } : {}) } satisfies TaskWorkbenchCreateSessionCommand,
|
||||
{
|
||||
wait: true,
|
||||
timeout: 5 * 60_000,
|
||||
timeout: 10_000,
|
||||
},
|
||||
);
|
||||
return expectQueueResponse<{ tabId: string }>(result);
|
||||
},
|
||||
|
||||
/**
|
||||
* Fire-and-forget: creates a workbench session and sends the initial message.
|
||||
* Used by createWorkbenchTask so the caller doesn't block on session creation.
|
||||
*/
|
||||
async createWorkbenchSessionAndSend(c, input: { model?: string; text: string }): Promise<void> {
|
||||
const self = selfTask(c);
|
||||
await self.send(
|
||||
taskWorkflowQueueName("task.command.workbench.create_session_and_send"),
|
||||
{ model: input.model, text: input.text } satisfies TaskWorkbenchCreateSessionAndSendCommand,
|
||||
{ wait: false },
|
||||
);
|
||||
},
|
||||
|
||||
async renameWorkbenchSession(c, input: TaskWorkbenchRenameSessionInput): Promise<void> {
|
||||
const self = selfTask(c);
|
||||
await self.send(
|
||||
|
|
@ -276,7 +290,7 @@ export const task = actor({
|
|||
{ sessionId: input.tabId, title: input.title } satisfies TaskWorkbenchSessionTitleCommand,
|
||||
{
|
||||
wait: true,
|
||||
timeout: 20_000,
|
||||
timeout: 10_000,
|
||||
},
|
||||
);
|
||||
},
|
||||
|
|
@ -288,7 +302,7 @@ export const task = actor({
|
|||
{ sessionId: input.tabId, unread: input.unread } satisfies TaskWorkbenchSessionUnreadCommand,
|
||||
{
|
||||
wait: true,
|
||||
timeout: 20_000,
|
||||
timeout: 10_000,
|
||||
},
|
||||
);
|
||||
},
|
||||
|
|
@ -304,7 +318,7 @@ export const task = actor({
|
|||
} satisfies TaskWorkbenchUpdateDraftCommand,
|
||||
{
|
||||
wait: true,
|
||||
timeout: 20_000,
|
||||
timeout: 10_000,
|
||||
},
|
||||
);
|
||||
},
|
||||
|
|
@ -316,14 +330,14 @@ export const task = actor({
|
|||
{ sessionId: input.tabId, model: input.model } satisfies TaskWorkbenchChangeModelCommand,
|
||||
{
|
||||
wait: true,
|
||||
timeout: 20_000,
|
||||
timeout: 10_000,
|
||||
},
|
||||
);
|
||||
},
|
||||
|
||||
async sendWorkbenchMessage(c, input: TaskWorkbenchSendMessageActionInput): Promise<void> {
|
||||
async sendWorkbenchMessage(c, input: TaskWorkbenchSendMessageInput): Promise<void> {
|
||||
const self = selfTask(c);
|
||||
const result = await self.send(
|
||||
await self.send(
|
||||
taskWorkflowQueueName("task.command.workbench.send_message"),
|
||||
{
|
||||
sessionId: input.tabId,
|
||||
|
|
@ -331,13 +345,9 @@ export const task = actor({
|
|||
attachments: input.attachments,
|
||||
} satisfies TaskWorkbenchSendMessageCommand,
|
||||
{
|
||||
wait: input.waitForCompletion === true,
|
||||
...(input.waitForCompletion === true ? { timeout: 10 * 60_000 } : {}),
|
||||
wait: false,
|
||||
},
|
||||
);
|
||||
if (input.waitForCompletion === true) {
|
||||
expectQueueResponse(result);
|
||||
}
|
||||
},
|
||||
|
||||
async stopWorkbenchSession(c, input: TaskTabCommand): Promise<void> {
|
||||
|
|
|
|||
|
|
@ -307,22 +307,14 @@ async function requireReadySessionMeta(c: any, tabId: string): Promise<any> {
|
|||
return meta;
|
||||
}
|
||||
|
||||
async function ensureReadySessionMeta(c: any, tabId: string): Promise<any> {
|
||||
const meta = await readSessionMeta(c, tabId);
|
||||
export function requireSendableSessionMeta(meta: any, tabId: string): any {
|
||||
if (!meta) {
|
||||
throw new Error(`Unknown workbench tab: ${tabId}`);
|
||||
}
|
||||
|
||||
if (meta.status === "ready" && meta.sandboxSessionId) {
|
||||
return meta;
|
||||
if (meta.status !== "ready" || !meta.sandboxSessionId) {
|
||||
throw new Error(`Session is not ready (status: ${meta.status}). Wait for session provisioning to complete.`);
|
||||
}
|
||||
|
||||
if (meta.status === "error") {
|
||||
throw new Error(meta.errorMessage ?? "This workbench tab failed to prepare");
|
||||
}
|
||||
|
||||
await ensureWorkbenchSession(c, tabId);
|
||||
return await requireReadySessionMeta(c, tabId);
|
||||
return meta;
|
||||
}
|
||||
|
||||
function shellFragment(parts: string[]): string {
|
||||
|
|
@ -1204,7 +1196,7 @@ export async function changeWorkbenchModel(c: any, sessionId: string, model: str
|
|||
}
|
||||
|
||||
export async function sendWorkbenchMessage(c: any, sessionId: string, text: string, attachments: Array<any>): Promise<void> {
|
||||
const meta = await ensureReadySessionMeta(c, sessionId);
|
||||
const meta = requireSendableSessionMeta(await readSessionMeta(c, sessionId), sessionId);
|
||||
const record = await ensureWorkbenchSeeded(c);
|
||||
const runtime = await getTaskSandboxRuntime(c, record);
|
||||
await ensureSandboxRepo(c, runtime.sandbox, record);
|
||||
|
|
|
|||
|
|
@ -1,14 +1,7 @@
|
|||
import { Loop } from "rivetkit/workflow";
|
||||
import { logActorWarning, resolveErrorMessage } from "../../logging.js";
|
||||
import { getCurrentRecord } from "./common.js";
|
||||
import {
|
||||
initAssertNameActivity,
|
||||
initBootstrapDbActivity,
|
||||
initCompleteActivity,
|
||||
initEnqueueProvisionActivity,
|
||||
initEnsureNameActivity,
|
||||
initFailedActivity,
|
||||
} from "./init.js";
|
||||
import { initBootstrapDbActivity, initCompleteActivity, initEnqueueProvisionActivity, initFailedActivity } from "./init.js";
|
||||
import {
|
||||
handleArchiveActivity,
|
||||
handleAttachActivity,
|
||||
|
|
@ -67,12 +60,8 @@ const commandHandlers: Record<TaskQueueName, WorkflowHandler> = {
|
|||
await loopCtx.removed("init-failed", "step");
|
||||
await loopCtx.removed("init-failed-v2", "step");
|
||||
try {
|
||||
await loopCtx.step({
|
||||
name: "init-ensure-name",
|
||||
timeout: 5 * 60_000,
|
||||
run: async () => initEnsureNameActivity(loopCtx),
|
||||
});
|
||||
await loopCtx.step("init-assert-name", async () => initAssertNameActivity(loopCtx));
|
||||
await loopCtx.removed("init-ensure-name", "step");
|
||||
await loopCtx.removed("init-assert-name", "step");
|
||||
await loopCtx.removed("init-create-sandbox", "step");
|
||||
await loopCtx.removed("init-ensure-agent", "step");
|
||||
await loopCtx.removed("init-start-sandbox-instance", "step");
|
||||
|
|
@ -156,6 +145,26 @@ const commandHandlers: Record<TaskQueueName, WorkflowHandler> = {
|
|||
}
|
||||
},
|
||||
|
||||
"task.command.workbench.create_session_and_send": async (loopCtx, msg) => {
|
||||
try {
|
||||
const created = await loopCtx.step({
|
||||
name: "workbench-create-session-for-send",
|
||||
timeout: 5 * 60_000,
|
||||
run: async () => createWorkbenchSession(loopCtx, msg.body?.model),
|
||||
});
|
||||
await loopCtx.step({
|
||||
name: "workbench-send-initial-message",
|
||||
timeout: 5 * 60_000,
|
||||
run: async () => sendWorkbenchMessage(loopCtx, created.tabId, msg.body.text, []),
|
||||
});
|
||||
} catch (error) {
|
||||
logActorWarning("task.workflow", "create_session_and_send failed", {
|
||||
error: resolveErrorMessage(error),
|
||||
});
|
||||
}
|
||||
await msg.complete({ ok: true });
|
||||
},
|
||||
|
||||
"task.command.workbench.ensure_session": async (loopCtx, msg) => {
|
||||
await loopCtx.step({
|
||||
name: "workbench-ensure-session",
|
||||
|
|
|
|||
|
|
@ -1,10 +1,8 @@
|
|||
// @ts-nocheck
|
||||
import { eq } from "drizzle-orm";
|
||||
import { resolveCreateFlowDecision } from "../../../services/create-flow.js";
|
||||
import { resolveWorkspaceGithubAuth } from "../../../services/github-auth.js";
|
||||
import { getActorRuntimeContext } from "../../context.js";
|
||||
import { getOrCreateHistory, getOrCreateProject, selfTask } from "../../handles.js";
|
||||
import { logActorWarning, resolveErrorMessage } from "../../logging.js";
|
||||
import { getOrCreateHistory, selfTask } from "../../handles.js";
|
||||
import { resolveErrorMessage } from "../../logging.js";
|
||||
import { defaultSandboxProviderId } from "../../../sandbox-config.js";
|
||||
import { task as taskTable, taskRuntime } from "../db/schema.js";
|
||||
import { TASK_ROW_ID, appendHistory, collectErrorMessages, resolveErrorDetail, setTaskState } from "./common.js";
|
||||
|
|
@ -21,7 +19,6 @@ export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise<
|
|||
const { config } = getActorRuntimeContext();
|
||||
const providerId = body?.providerId ?? loopCtx.state.providerId ?? defaultSandboxProviderId(config);
|
||||
const now = Date.now();
|
||||
const initialStatusMessage = loopCtx.state.branchName && loopCtx.state.title ? "provisioning" : "naming";
|
||||
|
||||
await ensureTaskRuntimeCacheColumns(loopCtx.db);
|
||||
|
||||
|
|
@ -60,7 +57,7 @@ export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise<
|
|||
activeSessionId: null,
|
||||
activeSwitchTarget: null,
|
||||
activeCwd: null,
|
||||
statusMessage: initialStatusMessage,
|
||||
statusMessage: "provisioning",
|
||||
gitStateJson: null,
|
||||
gitStateUpdatedAt: null,
|
||||
provisionStage: "queued",
|
||||
|
|
@ -74,7 +71,7 @@ export async function initBootstrapDbActivity(loopCtx: any, body: any): Promise<
|
|||
activeSessionId: null,
|
||||
activeSwitchTarget: null,
|
||||
activeCwd: null,
|
||||
statusMessage: initialStatusMessage,
|
||||
statusMessage: "provisioning",
|
||||
provisionStage: "queued",
|
||||
provisionStageUpdatedAt: now,
|
||||
updatedAt: now,
|
||||
|
|
@ -111,102 +108,6 @@ export async function initEnqueueProvisionActivity(loopCtx: any, body: any): Pro
|
|||
}
|
||||
}
|
||||
|
||||
export async function initEnsureNameActivity(loopCtx: any): Promise<void> {
|
||||
await setTaskState(loopCtx, "init_ensure_name", "determining title and branch");
|
||||
const existing = await loopCtx.db
|
||||
.select({
|
||||
branchName: taskTable.branchName,
|
||||
title: taskTable.title,
|
||||
})
|
||||
.from(taskTable)
|
||||
.where(eq(taskTable.id, TASK_ROW_ID))
|
||||
.get();
|
||||
|
||||
if (existing?.branchName && existing?.title) {
|
||||
loopCtx.state.branchName = existing.branchName;
|
||||
loopCtx.state.title = existing.title;
|
||||
return;
|
||||
}
|
||||
|
||||
const { driver } = getActorRuntimeContext();
|
||||
const auth = await resolveWorkspaceGithubAuth(loopCtx, loopCtx.state.workspaceId);
|
||||
let repoLocalPath = loopCtx.state.repoLocalPath;
|
||||
if (!repoLocalPath) {
|
||||
const project = await getOrCreateProject(loopCtx, loopCtx.state.workspaceId, loopCtx.state.repoId, loopCtx.state.repoRemote);
|
||||
const result = await project.ensure({ remoteUrl: loopCtx.state.repoRemote });
|
||||
repoLocalPath = result.localPath;
|
||||
loopCtx.state.repoLocalPath = repoLocalPath;
|
||||
}
|
||||
|
||||
try {
|
||||
await driver.git.fetch(repoLocalPath, { githubToken: auth?.githubToken ?? null });
|
||||
} catch (error) {
|
||||
logActorWarning("task.init", "fetch before naming failed", {
|
||||
workspaceId: loopCtx.state.workspaceId,
|
||||
repoId: loopCtx.state.repoId,
|
||||
taskId: loopCtx.state.taskId,
|
||||
error: resolveErrorMessage(error),
|
||||
});
|
||||
}
|
||||
|
||||
const remoteBranches = (await driver.git.listRemoteBranches(repoLocalPath, { githubToken: auth?.githubToken ?? null })).map(
|
||||
(branch: any) => branch.branchName,
|
||||
);
|
||||
const project = await getOrCreateProject(loopCtx, loopCtx.state.workspaceId, loopCtx.state.repoId, loopCtx.state.repoRemote);
|
||||
const reservedBranches = await project.listReservedBranches({});
|
||||
const resolved = resolveCreateFlowDecision({
|
||||
task: loopCtx.state.task,
|
||||
explicitTitle: loopCtx.state.explicitTitle ?? undefined,
|
||||
explicitBranchName: loopCtx.state.explicitBranchName ?? undefined,
|
||||
localBranches: remoteBranches,
|
||||
taskBranches: reservedBranches,
|
||||
});
|
||||
|
||||
const now = Date.now();
|
||||
await loopCtx.db
|
||||
.update(taskTable)
|
||||
.set({
|
||||
branchName: resolved.branchName,
|
||||
title: resolved.title,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(taskTable.id, TASK_ROW_ID))
|
||||
.run();
|
||||
|
||||
loopCtx.state.branchName = resolved.branchName;
|
||||
loopCtx.state.title = resolved.title;
|
||||
loopCtx.state.explicitTitle = null;
|
||||
loopCtx.state.explicitBranchName = null;
|
||||
|
||||
await loopCtx.db
|
||||
.update(taskRuntime)
|
||||
.set({
|
||||
statusMessage: "provisioning",
|
||||
provisionStage: "repo_prepared",
|
||||
provisionStageUpdatedAt: now,
|
||||
updatedAt: now,
|
||||
})
|
||||
.where(eq(taskRuntime.id, TASK_ROW_ID))
|
||||
.run();
|
||||
|
||||
await project.registerTaskBranch({
|
||||
taskId: loopCtx.state.taskId,
|
||||
branchName: resolved.branchName,
|
||||
});
|
||||
|
||||
await appendHistory(loopCtx, "task.named", {
|
||||
title: resolved.title,
|
||||
branchName: resolved.branchName,
|
||||
});
|
||||
}
|
||||
|
||||
export async function initAssertNameActivity(loopCtx: any): Promise<void> {
|
||||
await setTaskState(loopCtx, "init_assert_name", "validating naming");
|
||||
if (!loopCtx.state.branchName) {
|
||||
throw new Error("task branchName is not initialized");
|
||||
}
|
||||
}
|
||||
|
||||
export async function initCompleteActivity(loopCtx: any, body: any): Promise<void> {
|
||||
const now = Date.now();
|
||||
const { config } = getActorRuntimeContext();
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ export const TASK_QUEUE_NAMES = [
|
|||
"task.command.workbench.rename_task",
|
||||
"task.command.workbench.rename_branch",
|
||||
"task.command.workbench.create_session",
|
||||
"task.command.workbench.create_session_and_send",
|
||||
"task.command.workbench.ensure_session",
|
||||
"task.command.workbench.rename_session",
|
||||
"task.command.workbench.set_session_unread",
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
// @ts-nocheck
|
||||
import { setTimeout as delay } from "node:timers/promises";
|
||||
import { desc, eq } from "drizzle-orm";
|
||||
import { Loop } from "rivetkit/workflow";
|
||||
import type {
|
||||
|
|
@ -272,24 +271,6 @@ async function requireWorkbenchTask(c: any, taskId: string) {
|
|||
return getTask(c, c.state.workspaceId, repoId, taskId);
|
||||
}
|
||||
|
||||
async function waitForWorkbenchTaskReady(task: any, timeoutMs = 5 * 60_000): Promise<any> {
|
||||
const startedAt = Date.now();
|
||||
|
||||
for (;;) {
|
||||
const record = await task.get();
|
||||
if (record?.branchName && record?.title) {
|
||||
return record;
|
||||
}
|
||||
if (record?.status === "error") {
|
||||
throw new Error("task initialization failed before the workbench session was ready");
|
||||
}
|
||||
if (Date.now() - startedAt > timeoutMs) {
|
||||
throw new Error("timed out waiting for task initialization");
|
||||
}
|
||||
await delay(1_000);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the workspace sidebar snapshot from the workspace actor's local SQLite
|
||||
* plus the org-scoped GitHub actor for open PRs. Task actors still push
|
||||
|
|
@ -562,7 +543,7 @@ export const workspaceActions = {
|
|||
return expectQueueResponse<RepoRecord>(
|
||||
await self.send(workspaceWorkflowQueueName("workspace.command.addRepo"), input, {
|
||||
wait: true,
|
||||
timeout: 60_000,
|
||||
timeout: 10_000,
|
||||
}),
|
||||
);
|
||||
},
|
||||
|
|
@ -595,7 +576,7 @@ export const workspaceActions = {
|
|||
return expectQueueResponse<TaskRecord>(
|
||||
await self.send(workspaceWorkflowQueueName("workspace.command.createTask"), input, {
|
||||
wait: true,
|
||||
timeout: 5 * 60_000,
|
||||
timeout: 10_000,
|
||||
}),
|
||||
);
|
||||
},
|
||||
|
|
@ -813,6 +794,7 @@ export const workspaceActions = {
|
|||
},
|
||||
|
||||
async createWorkbenchTask(c: any, input: TaskWorkbenchCreateTaskInput): Promise<{ taskId: string; tabId?: string }> {
|
||||
// Step 1: Create the task record (wait: true — local state mutations only).
|
||||
const created = await workspaceActions.createTask(c, {
|
||||
workspaceId: c.state.workspaceId,
|
||||
repoId: input.repoId,
|
||||
|
|
@ -821,26 +803,18 @@ export const workspaceActions = {
|
|||
...(input.onBranch ? { onBranch: input.onBranch } : input.branch ? { explicitBranchName: input.branch } : {}),
|
||||
...(input.model ? { agentType: agentTypeForModel(input.model) } : {}),
|
||||
});
|
||||
|
||||
// Step 2: Enqueue session creation + initial message (wait: false).
|
||||
// The task workflow creates the session record and sends the message in
|
||||
// the background. The client observes progress via push events on the
|
||||
// task interest topic.
|
||||
const task = await requireWorkbenchTask(c, created.taskId);
|
||||
await waitForWorkbenchTaskReady(task);
|
||||
const session = await task.createWorkbenchSession({
|
||||
taskId: created.taskId,
|
||||
...(input.model ? { model: input.model } : {}),
|
||||
});
|
||||
await task.sendWorkbenchMessage({
|
||||
taskId: created.taskId,
|
||||
tabId: session.tabId,
|
||||
await task.createWorkbenchSessionAndSend({
|
||||
model: input.model,
|
||||
text: input.task,
|
||||
attachments: [],
|
||||
waitForCompletion: true,
|
||||
});
|
||||
await task.getSessionDetail({
|
||||
sessionId: session.tabId,
|
||||
});
|
||||
return {
|
||||
taskId: created.taskId,
|
||||
tabId: session.tabId,
|
||||
};
|
||||
|
||||
return { taskId: created.taskId };
|
||||
},
|
||||
|
||||
async markWorkbenchUnread(c: any, input: TaskWorkbenchSelectInput): Promise<void> {
|
||||
|
|
@ -988,7 +962,7 @@ export const workspaceActions = {
|
|||
const self = selfWorkspace(c);
|
||||
await self.send(workspaceWorkflowQueueName("workspace.command.refreshProviderProfiles"), command ?? {}, {
|
||||
wait: true,
|
||||
timeout: 60_000,
|
||||
timeout: 10_000,
|
||||
});
|
||||
},
|
||||
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import {
|
|||
ensureCloned,
|
||||
fetch,
|
||||
listRemoteBranches,
|
||||
listLocalRemoteRefs,
|
||||
remoteDefaultBaseRef,
|
||||
revParse,
|
||||
ensureRemoteBranch,
|
||||
|
|
@ -28,6 +29,8 @@ export interface GitDriver {
|
|||
ensureCloned(remoteUrl: string, targetPath: string, options?: { githubToken?: string | null }): Promise<void>;
|
||||
fetch(repoPath: string, options?: { githubToken?: string | null }): Promise<void>;
|
||||
listRemoteBranches(repoPath: string, options?: { githubToken?: string | null }): Promise<BranchSnapshot[]>;
|
||||
/** Read remote-tracking refs from the local clone without fetching. */
|
||||
listLocalRemoteRefs(repoPath: string): Promise<BranchSnapshot[]>;
|
||||
remoteDefaultBaseRef(repoPath: string): Promise<string>;
|
||||
revParse(repoPath: string, ref: string): Promise<string>;
|
||||
ensureRemoteBranch(repoPath: string, branchName: string, options?: { githubToken?: string | null }): Promise<void>;
|
||||
|
|
@ -81,6 +84,7 @@ export function createDefaultDriver(): BackendDriver {
|
|||
ensureCloned,
|
||||
fetch,
|
||||
listRemoteBranches,
|
||||
listLocalRemoteRefs,
|
||||
remoteDefaultBaseRef,
|
||||
revParse,
|
||||
ensureRemoteBranch,
|
||||
|
|
|
|||
|
|
@ -208,11 +208,25 @@ export async function remoteDefaultBaseRef(repoPath: string): Promise<string> {
|
|||
return "origin/main";
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch from origin, then read remote-tracking refs.
|
||||
* Use when you need guaranteed-fresh branch data and can tolerate network I/O.
|
||||
*/
|
||||
export async function listRemoteBranches(repoPath: string, options?: GitAuthOptions): Promise<BranchSnapshot[]> {
|
||||
await fetch(repoPath, options);
|
||||
return listLocalRemoteRefs(repoPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Read remote-tracking refs (`refs/remotes/origin/*`) from the local clone
|
||||
* without fetching. The data is only as fresh as the last fetch — use this
|
||||
* when the branch sync actor keeps refs current and you want to avoid
|
||||
* blocking on network I/O.
|
||||
*/
|
||||
export async function listLocalRemoteRefs(repoPath: string): Promise<BranchSnapshot[]> {
|
||||
const { stdout } = await execFileAsync("git", ["-C", repoPath, "for-each-ref", "--format=%(refname:short) %(objectname)", "refs/remotes/origin"], {
|
||||
maxBuffer: 1024 * 1024,
|
||||
env: gitEnv(options),
|
||||
env: gitEnv(),
|
||||
});
|
||||
|
||||
return stdout
|
||||
|
|
|
|||
|
|
@ -15,6 +15,7 @@ export function createTestGitDriver(overrides?: Partial<GitDriver>): GitDriver {
|
|||
ensureCloned: async () => {},
|
||||
fetch: async () => {},
|
||||
listRemoteBranches: async () => [],
|
||||
listLocalRemoteRefs: async () => [],
|
||||
remoteDefaultBaseRef: async () => "origin/main",
|
||||
revParse: async () => "abc1234567890",
|
||||
ensureRemoteBranch: async () => {},
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import { describe, expect, it } from "vitest";
|
||||
import { shouldMarkSessionUnreadForStatus, shouldRecreateSessionForModelChange } from "../src/actors/task/workbench.js";
|
||||
import { requireSendableSessionMeta, shouldMarkSessionUnreadForStatus, shouldRecreateSessionForModelChange } from "../src/actors/task/workbench.js";
|
||||
|
||||
describe("workbench unread status transitions", () => {
|
||||
it("marks unread when a running session first becomes idle", () => {
|
||||
|
|
@ -57,3 +57,30 @@ describe("workbench model changes", () => {
|
|||
).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("workbench send readiness", () => {
|
||||
it("rejects unknown tabs", () => {
|
||||
expect(() => requireSendableSessionMeta(null, "tab-1")).toThrow("Unknown workbench tab: tab-1");
|
||||
});
|
||||
|
||||
it("rejects pending sessions", () => {
|
||||
expect(() =>
|
||||
requireSendableSessionMeta(
|
||||
{
|
||||
status: "pending_session_create",
|
||||
sandboxSessionId: null,
|
||||
},
|
||||
"tab-2",
|
||||
),
|
||||
).toThrow("Session is not ready (status: pending_session_create). Wait for session provisioning to complete.");
|
||||
});
|
||||
|
||||
it("accepts ready sessions with a sandbox session id", () => {
|
||||
const meta = {
|
||||
status: "ready",
|
||||
sandboxSessionId: "session-1",
|
||||
};
|
||||
|
||||
expect(requireSendableSessionMeta(meta, "tab-3")).toBe(meta);
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue