wip(foundry): continue actor refactor

This commit is contained in:
Nathan Flurry 2026-03-15 10:23:04 -07:00
parent e97c502d28
commit 6f85b59f31
7 changed files with 236 additions and 200 deletions

View file

@ -4,7 +4,7 @@ import { actor, queue } from "rivetkit";
import { workflow, Loop } from "rivetkit/workflow";
import type { FoundryOrganization } from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js";
import { getOrCreateOrganization, getTask } from "../handles.js";
import { getOrCreateOrganization, getOrCreateRepository, getTask } from "../handles.js";
import { repoIdFromRemote } from "../../services/repo.js";
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
import { githubDataDb } from "./db/db.js";
@ -259,12 +259,15 @@ async function replacePullRequests(c: any, pullRequests: GithubPullRequestRecord
}
async function refreshTaskSummaryForBranch(c: any, repoId: string, branchName: string) {
const organization = await getOrCreateOrganization(c, c.state.organizationId);
await organization.refreshTaskSummaryForGithubBranch({ repoId, branchName });
const repositoryRecord = await c.db.select().from(githubRepositories).where(eq(githubRepositories.repoId, repoId)).get();
if (!repositoryRecord) {
return;
}
const repository = await getOrCreateRepository(c, c.state.organizationId, repoId, repositoryRecord.cloneUrl);
await repository.refreshTaskSummaryForBranch({ branchName });
}
async function emitPullRequestChangeEvents(c: any, beforeRows: any[], afterRows: any[]) {
const organization = await getOrCreateOrganization(c, c.state.organizationId);
const beforeById = new Map(beforeRows.map((row) => [row.prId, row]));
const afterById = new Map(afterRows.map((row) => [row.prId, row]));
@ -283,9 +286,6 @@ async function emitPullRequestChangeEvents(c: any, beforeRows: any[], afterRows:
if (!changed) {
continue;
}
await organization.applyOpenPullRequestUpdate({
pullRequest: pullRequestSummaryFromRow(row),
});
await refreshTaskSummaryForBranch(c, row.repoId, row.headRefName);
}
@ -293,15 +293,17 @@ async function emitPullRequestChangeEvents(c: any, beforeRows: any[], afterRows:
if (afterById.has(prId)) {
continue;
}
await organization.removeOpenPullRequest({ prId });
await refreshTaskSummaryForBranch(c, row.repoId, row.headRefName);
}
}
async function autoArchiveTaskForClosedPullRequest(c: any, row: any) {
const organization = await getOrCreateOrganization(c, c.state.organizationId);
const match = await organization.findTaskForGithubBranch({
repoId: row.repoId,
const repositoryRecord = await c.db.select().from(githubRepositories).where(eq(githubRepositories.repoId, row.repoId)).get();
if (!repositoryRecord) {
return;
}
const repository = await getOrCreateRepository(c, c.state.organizationId, row.repoId, repositoryRecord.cloneUrl);
const match = await repository.findTaskForBranch({
branchName: row.headRefName,
});
if (!match?.taskId) {

View file

@ -31,7 +31,7 @@ import type {
OrganizationUseInput,
} from "@sandbox-agent/foundry-shared";
import { getActorRuntimeContext } from "../context.js";
import { getGithubData, getOrCreateAuditLog, getOrCreateGithubData, getTask as getTaskHandle, getOrCreateRepository, selfOrganization } from "../handles.js";
import { getOrCreateAuditLog, getOrCreateGithubData, getTask as getTaskHandle, getOrCreateRepository, selfOrganization } from "../handles.js";
import { logActorWarning, resolveErrorMessage } from "../logging.js";
import { defaultSandboxProviderId } from "../../sandbox-config.js";
import { repoIdFromRemote } from "../../services/repo.js";

View file

@ -179,7 +179,6 @@ function parseJsonValue<T>(value: string | null | undefined, fallback: T): T {
function taskSummaryRowFromSummary(taskSummary: WorkspaceTaskSummary) {
return {
taskId: taskSummary.id,
repoId: taskSummary.repoId,
title: taskSummary.title,
status: taskSummary.status,
repoName: taskSummary.repoName,
@ -190,20 +189,6 @@ function taskSummaryRowFromSummary(taskSummary: WorkspaceTaskSummary) {
};
}
function workspaceTaskSummaryFromRow(row: any): WorkspaceTaskSummary {
return {
id: row.taskId,
repoId: row.repoId,
title: row.title,
status: row.status,
repoName: row.repoName,
updatedAtMs: row.updatedAtMs,
branch: row.branch ?? null,
pullRequest: parseJsonValue(row.pullRequestJson, null),
sessionsSummary: parseJsonValue<WorkspaceSessionSummary[]>(row.sessionsSummaryJson, []),
};
}
async function resolveGitHubRepository(c: any) {
const githubData = getGithubData(c, c.state.organizationId);
return await githubData.getRepository({ repoId: c.state.repoId }).catch(() => null);
@ -418,7 +403,7 @@ async function listTaskSummaries(c: any, includeArchived = false): Promise<TaskS
async function listWorkspaceTaskSummaries(c: any): Promise<WorkspaceTaskSummary[]> {
const rows = await c.db.select().from(tasks).orderBy(desc(tasks.updatedAtMs)).all();
return rows.map(workspaceTaskSummaryFromRow);
return rows.map((row) => taskSummaryFromRow(c, row));
}
function sortOverviewBranches(
@ -612,12 +597,12 @@ export const repositoryActions = {
await notifyOrganizationSnapshotChanged(c);
},
async findTaskForGithubBranch(c: any, input: { branchName: string }): Promise<{ taskId: string | null }> {
async findTaskForBranch(c: any, input: { branchName: string }): Promise<{ taskId: string | null }> {
const row = await c.db.select({ taskId: tasks.taskId }).from(tasks).where(eq(tasks.branch, input.branchName)).get();
return { taskId: row?.taskId ?? null };
},
async refreshTaskSummaryForGithubBranch(c: any, input: { branchName: string }): Promise<void> {
async refreshTaskSummaryForBranch(c: any, input: { branchName: string }): Promise<void> {
const rows = await c.db.select({ taskId: tasks.taskId }).from(tasks).where(eq(tasks.branch, input.branchName)).all();
for (const row of rows) {

View file

@ -3,9 +3,10 @@ import { randomUUID } from "node:crypto";
import { basename, dirname } from "node:path";
import { asc, eq } from "drizzle-orm";
import { getActorRuntimeContext } from "../context.js";
import { getOrCreateRepository, getOrCreateTaskSandbox, getTaskSandbox, selfTask } from "../handles.js";
import { getOrCreateRepository, getOrCreateTaskSandbox, getOrCreateUser, getTaskSandbox, selfTask } from "../handles.js";
import { SANDBOX_REPO_CWD } from "../sandbox/index.js";
import { resolveSandboxProviderId } from "../../sandbox-config.js";
import { getBetterAuthService } from "../../services/better-auth.js";
import { resolveOrganizationGithubAuth } from "../../services/github-auth.js";
import { githubRepoFullNameFromRemote } from "../../services/repo.js";
import { task as taskTable, taskRuntime, taskSandboxes, taskWorkspaceSessions } from "./db/schema.js";
@ -20,9 +21,7 @@ function emptyGitState() {
};
}
function defaultModelForAgent(agentType: string | null | undefined) {
return agentType === "codex" ? "gpt-5.3-codex" : "claude-sonnet-4";
}
const FALLBACK_MODEL = "claude-sonnet-4";
function isCodexModel(model: string) {
return model.startsWith("gpt-") || model.startsWith("o");
@ -142,9 +141,6 @@ async function listSessionMetaRows(c: any, options?: { includeClosed?: boolean }
errorMessage: row.errorMessage ?? null,
transcript: parseTranscript(row.transcriptJson),
transcriptUpdatedAt: row.transcriptUpdatedAt ?? null,
draftAttachments: parseDraftAttachments(row.draftAttachmentsJson),
draftUpdatedAtMs: row.draftUpdatedAt ?? null,
unread: row.unread === 1,
created: row.created === 1,
closed: row.closed === 1,
}));
@ -177,22 +173,102 @@ async function readSessionMeta(c: any, sessionId: string): Promise<any | null> {
errorMessage: row.errorMessage ?? null,
transcript: parseTranscript(row.transcriptJson),
transcriptUpdatedAt: row.transcriptUpdatedAt ?? null,
draftAttachments: parseDraftAttachments(row.draftAttachmentsJson),
draftUpdatedAtMs: row.draftUpdatedAt ?? null,
unread: row.unread === 1,
created: row.created === 1,
closed: row.closed === 1,
};
}
async function getUserTaskState(c: any, authSessionId?: string | null): Promise<{ activeSessionId: string | null; bySessionId: Map<string, any> }> {
if (!authSessionId) {
return { activeSessionId: null, bySessionId: new Map() };
}
const authState = await getBetterAuthService().getAuthState(authSessionId);
const userId = authState?.user?.id;
if (typeof userId !== "string" || userId.length === 0) {
return { activeSessionId: null, bySessionId: new Map() };
}
const user = await getOrCreateUser(c, userId);
const state = await user.getTaskState({ taskId: c.state.taskId });
const bySessionId = new Map(
(state?.sessions ?? []).map((row: any) => [
row.sessionId,
{
unread: Boolean(row.unread),
draftText: row.draftText ?? "",
draftAttachments: parseDraftAttachments(row.draftAttachmentsJson),
draftUpdatedAtMs: row.draftUpdatedAt ?? null,
},
]),
);
return {
activeSessionId: state?.activeSessionId ?? null,
bySessionId,
};
}
async function upsertUserTaskState(c: any, authSessionId: string | null | undefined, sessionId: string, patch: Record<string, unknown>): Promise<void> {
if (!authSessionId) {
return;
}
const authState = await getBetterAuthService().getAuthState(authSessionId);
const userId = authState?.user?.id;
if (typeof userId !== "string" || userId.length === 0) {
return;
}
const user = await getOrCreateUser(c, userId);
await user.upsertTaskState({
taskId: c.state.taskId,
sessionId,
patch,
});
}
async function deleteUserTaskState(c: any, authSessionId: string | null | undefined, sessionId: string): Promise<void> {
if (!authSessionId) {
return;
}
const authState = await getBetterAuthService().getAuthState(authSessionId);
const userId = authState?.user?.id;
if (typeof userId !== "string" || userId.length === 0) {
return;
}
const user = await getOrCreateUser(c, userId);
await user.deleteTaskState({
taskId: c.state.taskId,
sessionId,
});
}
async function resolveDefaultModel(c: any, authSessionId?: string | null): Promise<string> {
if (!authSessionId) {
return FALLBACK_MODEL;
}
const authState = await getBetterAuthService().getAuthState(authSessionId);
const userId = authState?.user?.id;
if (typeof userId !== "string" || userId.length === 0) {
return FALLBACK_MODEL;
}
const user = await getOrCreateUser(c, userId);
const userState = await user.getAppAuthState({ sessionId: authSessionId });
return userState?.profile?.defaultModel ?? FALLBACK_MODEL;
}
async function ensureSessionMeta(
c: any,
params: {
sessionId: string;
sandboxSessionId?: string | null;
model?: string;
authSessionId?: string | null;
sessionName?: string;
unread?: boolean;
created?: boolean;
status?: "pending_provision" | "pending_session_create" | "ready" | "error";
errorMessage?: string | null;
@ -205,8 +281,7 @@ async function ensureSessionMeta(
const now = Date.now();
const sessionName = params.sessionName ?? (await nextSessionName(c));
const model = params.model ?? defaultModelForAgent(c.state.agentType);
const unread = params.unread ?? false;
const model = params.model ?? (await resolveDefaultModel(c, params.authSessionId));
await c.db
.insert(taskWorkspaceSessions)
@ -219,10 +294,6 @@ async function ensureSessionMeta(
errorMessage: params.errorMessage ?? null,
transcriptJson: "[]",
transcriptUpdatedAt: null,
unread: unread ? 1 : 0,
draftText: "",
draftAttachmentsJson: "[]",
draftUpdatedAt: null,
created: params.created === false ? 0 : 1,
closed: 0,
thinkingSinceMs: null,
@ -687,15 +758,17 @@ async function maybeScheduleWorkspaceRefreshes(c: any, record: any, sessions: Ar
}
}
function activeSessionStatus(record: any, sessionId: string) {
if (record.activeSessionId !== sessionId) {
return "idle";
function computeWorkspaceTaskStatus(record: any, sessions: Array<any>) {
if (record.status && String(record.status).startsWith("init_")) {
return record.status;
}
if (record.status === "running") {
if (record.status === "archived" || record.status === "killed") {
return record.status;
}
if (sessions.some((session) => session.closed !== true && session.thinkingSinceMs)) {
return "running";
}
if (record.status === "error") {
if (sessions.some((session) => session.closed !== true && session.status === "error")) {
return "error";
}
return "idle";
@ -715,31 +788,23 @@ async function readPullRequestSummary(c: any, branchName: string | null) {
}
export async function ensureWorkspaceSeeded(c: any): Promise<any> {
const record = await getCurrentRecord({ db: c.db, state: c.state });
if (record.activeSessionId) {
await ensureSessionMeta(c, {
sessionId: record.activeSessionId,
sandboxSessionId: record.activeSessionId,
model: defaultModelForAgent(record.agentType),
sessionName: "Session 1",
status: "ready",
});
}
return record;
return await getCurrentRecord({ db: c.db, state: c.state });
}
function buildSessionSummary(record: any, meta: any): any {
function buildSessionSummary(meta: any, userState?: any): any {
const derivedSandboxSessionId = meta.status === "ready" ? (meta.sandboxSessionId ?? null) : null;
const sessionStatus =
meta.status === "pending_provision" || meta.status === "pending_session_create"
? meta.status
: meta.status === "ready" && derivedSandboxSessionId
? activeSessionStatus(record, derivedSandboxSessionId)
: meta.thinkingSinceMs
? "running"
: meta.status === "error"
? "error"
: "ready";
: meta.status === "ready" && derivedSandboxSessionId
? "idle"
: "ready";
let thinkingSinceMs = meta.thinkingSinceMs ?? null;
let unread = Boolean(meta.unread);
let unread = Boolean(userState?.unread);
if (thinkingSinceMs && sessionStatus !== "running") {
thinkingSinceMs = null;
unread = true;
@ -760,8 +825,8 @@ function buildSessionSummary(record: any, meta: any): any {
};
}
function buildSessionDetailFromMeta(record: any, meta: any): any {
const summary = buildSessionSummary(record, meta);
function buildSessionDetailFromMeta(meta: any, userState?: any): any {
const summary = buildSessionSummary(meta, userState);
return {
sessionId: meta.sessionId,
sandboxSessionId: summary.sandboxSessionId ?? null,
@ -774,9 +839,9 @@ function buildSessionDetailFromMeta(record: any, meta: any): any {
created: summary.created,
errorMessage: summary.errorMessage,
draft: {
text: meta.draftText ?? "",
attachments: Array.isArray(meta.draftAttachments) ? meta.draftAttachments : [],
updatedAtMs: meta.draftUpdatedAtMs ?? null,
text: userState?.draftText ?? "",
attachments: Array.isArray(userState?.draftAttachments) ? userState.draftAttachments : [],
updatedAtMs: userState?.draftUpdatedAtMs ?? null,
},
transcript: meta.transcript ?? [],
};
@ -786,21 +851,23 @@ function buildSessionDetailFromMeta(record: any, meta: any): any {
* Builds a WorkspaceTaskSummary from local task actor state. Task actors push
* this to the parent organization actor so organization sidebar reads stay local.
*/
export async function buildTaskSummary(c: any): Promise<any> {
export async function buildTaskSummary(c: any, authSessionId?: string | null): Promise<any> {
const record = await ensureWorkspaceSeeded(c);
const sessions = await listSessionMetaRows(c);
await maybeScheduleWorkspaceRefreshes(c, record, sessions);
const userTaskState = await getUserTaskState(c, authSessionId);
const taskStatus = computeWorkspaceTaskStatus(record, sessions);
return {
id: c.state.taskId,
repoId: c.state.repoId,
title: record.title ?? "New Task",
status: record.status ?? "new",
status: taskStatus ?? "new",
repoName: repoLabelFromRemote(c.state.repoRemote),
updatedAtMs: record.updatedAt,
branch: record.branchName,
pullRequest: await readPullRequestSummary(c, record.branchName),
sessionsSummary: sessions.map((meta) => buildSessionSummary(record, meta)),
sessionsSummary: sessions.map((meta) => buildSessionSummary(meta, userTaskState.bySessionId.get(meta.sessionId))),
};
}
@ -808,20 +875,17 @@ export async function buildTaskSummary(c: any): Promise<any> {
* Builds a WorkspaceTaskDetail from local task actor state for direct task
* subscribers. This is a full replacement payload, not a patch.
*/
export async function buildTaskDetail(c: any): Promise<any> {
export async function buildTaskDetail(c: any, authSessionId?: string | null): Promise<any> {
const record = await ensureWorkspaceSeeded(c);
const gitState = await readCachedGitState(c);
const sessions = await listSessionMetaRows(c);
await maybeScheduleWorkspaceRefreshes(c, record, sessions);
const summary = await buildTaskSummary(c);
const summary = await buildTaskSummary(c, authSessionId);
return {
...summary,
task: record.task,
agentType: record.agentType === "claude" || record.agentType === "codex" ? record.agentType : null,
runtimeStatus: record.status,
statusMessage: record.statusMessage ?? null,
activeSessionId: record.activeSessionId ?? null,
runtimeStatus: summary.status,
diffStat: record.diffStat ?? null,
prUrl: record.prUrl ?? null,
reviewStatus: record.reviewStatus ?? null,
@ -841,44 +905,49 @@ export async function buildTaskDetail(c: any): Promise<any> {
/**
* Builds a WorkspaceSessionDetail for a specific session.
*/
export async function buildSessionDetail(c: any, sessionId: string): Promise<any> {
export async function buildSessionDetail(c: any, sessionId: string, authSessionId?: string | null): Promise<any> {
const record = await ensureWorkspaceSeeded(c);
const meta = await readSessionMeta(c, sessionId);
if (!meta || meta.closed) {
throw new Error(`Unknown workspace session: ${sessionId}`);
}
const userTaskState = await getUserTaskState(c, authSessionId);
const userSessionState = userTaskState.bySessionId.get(sessionId);
if (!meta.sandboxSessionId) {
return buildSessionDetailFromMeta(record, meta);
return buildSessionDetailFromMeta(meta, userSessionState);
}
try {
const transcript = await readSessionTranscript(c, record, meta.sandboxSessionId);
if (JSON.stringify(meta.transcript ?? []) !== JSON.stringify(transcript)) {
await writeSessionTranscript(c, meta.sessionId, transcript);
return buildSessionDetailFromMeta(record, {
...meta,
transcript,
transcriptUpdatedAt: Date.now(),
});
return buildSessionDetailFromMeta(
{
...meta,
transcript,
transcriptUpdatedAt: Date.now(),
},
userSessionState,
);
}
} catch {
// Session detail reads should degrade to cached transcript data if the live sandbox is unavailable.
}
return buildSessionDetailFromMeta(record, meta);
return buildSessionDetailFromMeta(meta, userSessionState);
}
export async function getTaskSummary(c: any): Promise<any> {
return await buildTaskSummary(c);
}
export async function getTaskDetail(c: any): Promise<any> {
return await buildTaskDetail(c);
export async function getTaskDetail(c: any, authSessionId?: string): Promise<any> {
return await buildTaskDetail(c, authSessionId);
}
export async function getSessionDetail(c: any, sessionId: string): Promise<any> {
return await buildSessionDetail(c, sessionId);
export async function getSessionDetail(c: any, sessionId: string, authSessionId?: string): Promise<any> {
return await buildSessionDetail(c, sessionId, authSessionId);
}
/**
@ -938,26 +1007,30 @@ export async function renameWorkspaceTask(c: any, value: string): Promise<void>
})
.where(eq(taskTable.id, 1))
.run();
c.state.title = nextTitle;
await broadcastTaskUpdate(c);
}
export async function createWorkspaceSession(c: any, model?: string): Promise<{ sessionId: string }> {
export async function createWorkspaceSession(c: any, model?: string, authSessionId?: string): Promise<{ sessionId: string }> {
const sessionId = `session-${randomUUID()}`;
const record = await ensureWorkspaceSeeded(c);
await ensureSessionMeta(c, {
sessionId,
model: model ?? defaultModelForAgent(record.agentType),
model: model ?? (await resolveDefaultModel(c, authSessionId)),
authSessionId,
sandboxSessionId: null,
status: pendingWorkspaceSessionStatus(record),
created: false,
});
await upsertUserTaskState(c, authSessionId, sessionId, {
activeSessionId: sessionId,
unread: false,
});
await broadcastTaskUpdate(c, { sessionId: sessionId });
await enqueueWorkspaceEnsureSession(c, sessionId);
return { sessionId };
}
export async function ensureWorkspaceSession(c: any, sessionId: string, model?: string): Promise<void> {
export async function ensureWorkspaceSession(c: any, sessionId: string, model?: string, authSessionId?: string): Promise<void> {
const meta = await readSessionMeta(c, sessionId);
if (!meta || meta.closed) {
return;
@ -981,10 +1054,11 @@ export async function ensureWorkspaceSession(c: any, sessionId: string, model?:
try {
const runtime = await getTaskSandboxRuntime(c, record);
await ensureSandboxRepo(c, runtime.sandbox, record);
const resolvedModel = model ?? meta.model ?? (await resolveDefaultModel(c, authSessionId));
await runtime.sandbox.createSession({
id: meta.sandboxSessionId ?? sessionId,
agent: agentTypeForModel(model ?? meta.model ?? defaultModelForAgent(record.agentType)),
model: model ?? meta.model ?? defaultModelForAgent(record.agentType),
agent: agentTypeForModel(resolvedModel),
model: resolvedModel,
sessionInit: {
cwd: runtime.cwd,
},
@ -1039,15 +1113,15 @@ export async function renameWorkspaceSession(c: any, sessionId: string, title: s
await broadcastTaskUpdate(c, { sessionId });
}
export async function setWorkspaceSessionUnread(c: any, sessionId: string, unread: boolean): Promise<void> {
await updateSessionMeta(c, sessionId, {
unread: unread ? 1 : 0,
export async function setWorkspaceSessionUnread(c: any, sessionId: string, unread: boolean, authSessionId?: string): Promise<void> {
await upsertUserTaskState(c, authSessionId, sessionId, {
unread,
});
await broadcastTaskUpdate(c, { sessionId });
}
export async function updateWorkspaceDraft(c: any, sessionId: string, text: string, attachments: Array<any>): Promise<void> {
await updateSessionMeta(c, sessionId, {
export async function updateWorkspaceDraft(c: any, sessionId: string, text: string, attachments: Array<any>, authSessionId?: string): Promise<void> {
await upsertUserTaskState(c, authSessionId, sessionId, {
draftText: text,
draftAttachmentsJson: JSON.stringify(attachments),
draftUpdatedAt: Date.now(),
@ -1108,7 +1182,7 @@ export async function changeWorkspaceModel(c: any, sessionId: string, model: str
await broadcastTaskUpdate(c, { sessionId });
}
export async function sendWorkspaceMessage(c: any, sessionId: string, text: string, attachments: Array<any>): Promise<void> {
export async function sendWorkspaceMessage(c: any, sessionId: string, text: string, attachments: Array<any>, authSessionId?: string): Promise<void> {
const meta = requireSendableSessionMeta(await readSessionMeta(c, sessionId), sessionId);
const record = await ensureWorkspaceSeeded(c);
const runtime = await getTaskSandboxRuntime(c, record);
@ -1123,23 +1197,17 @@ export async function sendWorkspaceMessage(c: any, sessionId: string, text: stri
}
await updateSessionMeta(c, sessionId, {
unread: 0,
created: 1,
thinkingSinceMs: Date.now(),
});
await upsertUserTaskState(c, authSessionId, sessionId, {
unread: false,
draftText: "",
draftAttachmentsJson: "[]",
draftUpdatedAt: Date.now(),
thinkingSinceMs: Date.now(),
activeSessionId: sessionId,
});
await c.db
.update(taskRuntime)
.set({
activeSessionId: meta.sandboxSessionId,
updatedAt: Date.now(),
})
.where(eq(taskRuntime.id, 1))
.run();
await syncWorkspaceSessionStatus(c, meta.sandboxSessionId, "running", Date.now());
try {
@ -1169,38 +1237,9 @@ export async function stopWorkspaceSession(c: any, sessionId: string): Promise<v
}
export async function syncWorkspaceSessionStatus(c: any, sessionId: string, status: "running" | "idle" | "error", at: number): Promise<void> {
const record = await ensureWorkspaceSeeded(c);
const meta = (await readSessionMetaBySandboxSessionId(c, sessionId)) ?? (await ensureSessionMeta(c, { sessionId: sessionId, sandboxSessionId: sessionId }));
let changed = false;
if (record.activeSessionId === sessionId || record.activeSessionId === meta.sandboxSessionId) {
const mappedStatus = status === "running" ? "running" : status === "error" ? "error" : "idle";
if (record.status !== mappedStatus) {
await c.db
.update(taskTable)
.set({
status: mappedStatus,
updatedAt: at,
})
.where(eq(taskTable.id, 1))
.run();
changed = true;
}
const statusMessage = `session:${status}`;
if (record.statusMessage !== statusMessage) {
await c.db
.update(taskRuntime)
.set({
statusMessage,
updatedAt: at,
})
.where(eq(taskRuntime.id, 1))
.run();
changed = true;
}
}
if (status === "running") {
if (!meta.thinkingSinceMs) {
await updateSessionMeta(c, sessionId, {
@ -1215,15 +1254,19 @@ export async function syncWorkspaceSessionStatus(c: any, sessionId: string, stat
});
changed = true;
}
if (!meta.unread && shouldMarkSessionUnreadForStatus(meta, status)) {
await updateSessionMeta(c, sessionId, {
unread: 1,
});
changed = true;
}
}
if (changed) {
const sessions = await listSessionMetaRows(c, { includeClosed: true });
const nextStatus = computeWorkspaceTaskStatus(await ensureWorkspaceSeeded(c), sessions);
await c.db
.update(taskTable)
.set({
status: nextStatus,
updatedAt: at,
})
.where(eq(taskTable.id, 1))
.run();
await enqueueWorkspaceRefresh(c, "task.command.workspace.refresh_session_transcript", {
sessionId,
});
@ -1234,8 +1277,7 @@ export async function syncWorkspaceSessionStatus(c: any, sessionId: string, stat
}
}
export async function closeWorkspaceSession(c: any, sessionId: string): Promise<void> {
const record = await ensureWorkspaceSeeded(c);
export async function closeWorkspaceSession(c: any, sessionId: string, authSessionId?: string): Promise<void> {
const sessions = await listSessionMetaRows(c);
if (sessions.filter((candidate) => candidate.closed !== true).length <= 1) {
return;
@ -1253,27 +1295,18 @@ export async function closeWorkspaceSession(c: any, sessionId: string): Promise<
closed: 1,
thinkingSinceMs: null,
});
if (record.activeSessionId === sessionId || record.activeSessionId === meta.sandboxSessionId) {
await c.db
.update(taskRuntime)
.set({
activeSessionId: null,
updatedAt: Date.now(),
})
.where(eq(taskRuntime.id, 1))
.run();
}
await deleteUserTaskState(c, authSessionId, sessionId);
await broadcastTaskUpdate(c);
}
export async function markWorkspaceUnread(c: any): Promise<void> {
export async function markWorkspaceUnread(c: any, authSessionId?: string): Promise<void> {
const sessions = await listSessionMetaRows(c);
const latest = sessions[sessions.length - 1];
if (!latest) {
return;
}
await updateSessionMeta(c, latest.sessionId, {
unread: 1,
await upsertUserTaskState(c, authSessionId, latest.sessionId, {
unread: true,
});
await broadcastTaskUpdate(c, { sessionId: latest.sessionId });
}

View file

@ -246,7 +246,6 @@ export function createMockBackendClient(defaultOrganizationId = "default"): Back
};
}),
taskSummaries,
openPullRequests: [],
};
};
@ -464,7 +463,7 @@ export function createMockBackendClient(defaultOrganizationId = "default"): Back
async getRepoOverview(_organizationId: string, _repoId: string): Promise<RepoOverview> {
notSupported("getRepoOverview");
},
async getTask(_organizationId: string, taskId: string): Promise<TaskRecord> {
async getTask(_organizationId: string, _repoId: string, taskId: string): Promise<TaskRecord> {
return buildTaskRecord(taskId);
},
@ -472,7 +471,7 @@ export function createMockBackendClient(defaultOrganizationId = "default"): Back
return [];
},
async switchTask(_organizationId: string, taskId: string): Promise<SwitchResult> {
async switchTask(_organizationId: string, _repoId: string, taskId: string): Promise<SwitchResult> {
return {
organizationId: defaultOrganizationId,
taskId,
@ -481,14 +480,14 @@ export function createMockBackendClient(defaultOrganizationId = "default"): Back
};
},
async attachTask(_organizationId: string, taskId: string): Promise<{ target: string; sessionId: string | null }> {
async attachTask(_organizationId: string, _repoId: string, taskId: string): Promise<{ target: string; sessionId: string | null }> {
return {
target: `mock://${taskId}`,
sessionId: requireTask(taskId).sessions[0]?.sessionId ?? null,
};
},
async runAction(_organizationId: string, _taskId: string): Promise<void> {
async runAction(_organizationId: string, _repoId: string, _taskId: string): Promise<void> {
notSupported("runAction");
},

View file

@ -250,19 +250,19 @@ interface WorkspaceActions {
onBranch?: string;
model?: ModelId;
}): Promise<{ taskId: string; sessionId?: string }>;
markTaskUnread(input: { taskId: string }): Promise<void>;
renameTask(input: { taskId: string; value: string }): Promise<void>;
archiveTask(input: { taskId: string }): Promise<void>;
publishPr(input: { taskId: string }): Promise<void>;
revertFile(input: { taskId: string; path: string }): Promise<void>;
updateDraft(input: { taskId: string; sessionId: string; text: string; attachments: LineAttachment[] }): Promise<void>;
sendMessage(input: { taskId: string; sessionId: string; text: string; attachments: LineAttachment[] }): Promise<void>;
stopAgent(input: { taskId: string; sessionId: string }): Promise<void>;
setSessionUnread(input: { taskId: string; sessionId: string; unread: boolean }): Promise<void>;
renameSession(input: { taskId: string; sessionId: string; title: string }): Promise<void>;
closeSession(input: { taskId: string; sessionId: string }): Promise<void>;
addSession(input: { taskId: string; model?: string }): Promise<{ sessionId: string }>;
changeModel(input: { taskId: string; sessionId: string; model: ModelId }): Promise<void>;
markTaskUnread(input: { repoId: string; taskId: string }): Promise<void>;
renameTask(input: { repoId: string; taskId: string; value: string }): Promise<void>;
archiveTask(input: { repoId: string; taskId: string }): Promise<void>;
publishPr(input: { repoId: string; taskId: string }): Promise<void>;
revertFile(input: { repoId: string; taskId: string; path: string }): Promise<void>;
updateDraft(input: { repoId: string; taskId: string; sessionId: string; text: string; attachments: LineAttachment[] }): Promise<void>;
sendMessage(input: { repoId: string; taskId: string; sessionId: string; text: string; attachments: LineAttachment[] }): Promise<void>;
stopAgent(input: { repoId: string; taskId: string; sessionId: string }): Promise<void>;
setSessionUnread(input: { repoId: string; taskId: string; sessionId: string; unread: boolean }): Promise<void>;
renameSession(input: { repoId: string; taskId: string; sessionId: string; title: string }): Promise<void>;
closeSession(input: { repoId: string; taskId: string; sessionId: string }): Promise<void>;
addSession(input: { repoId: string; taskId: string; model?: string }): Promise<{ sessionId: string }>;
changeModel(input: { repoId: string; taskId: string; sessionId: string; model: ModelId }): Promise<void>;
adminReloadGithubOrganization(): Promise<void>;
adminReloadGithubPullRequests(): Promise<void>;
adminReloadGithubRepository(repoId: string): Promise<void>;
@ -439,6 +439,7 @@ const TranscriptPanel = memo(function TranscriptPanel({
}
void taskWorkspaceClient.setSessionUnread({
repoId: task.repoId,
taskId: task.id,
sessionId: activeAgentSession.id,
unread: false,
@ -462,7 +463,7 @@ const TranscriptPanel = memo(function TranscriptPanel({
return;
}
void taskWorkspaceClient.renameTask({ taskId: task.id, value });
void taskWorkspaceClient.renameTask({ repoId: task.repoId, taskId: task.id, value });
setEditingField(null);
},
[editValue, task.id],
@ -473,6 +474,7 @@ const TranscriptPanel = memo(function TranscriptPanel({
const flushDraft = useCallback(
(text: string, nextAttachments: LineAttachment[], sessionId: string) => {
void taskWorkspaceClient.updateDraft({
repoId: task.repoId,
taskId: task.id,
sessionId,
text,
@ -534,6 +536,7 @@ const TranscriptPanel = memo(function TranscriptPanel({
onSetActiveSessionId(promptSession.id);
onSetLastAgentSessionId(promptSession.id);
void taskWorkspaceClient.sendMessage({
repoId: task.repoId,
taskId: task.id,
sessionId: promptSession.id,
text,
@ -547,6 +550,7 @@ const TranscriptPanel = memo(function TranscriptPanel({
}
void taskWorkspaceClient.stopAgent({
repoId: task.repoId,
taskId: task.id,
sessionId: promptSession.id,
});
@ -561,6 +565,7 @@ const TranscriptPanel = memo(function TranscriptPanel({
const session = task.sessions.find((candidate) => candidate.id === sessionId);
if (session?.unread) {
void taskWorkspaceClient.setSessionUnread({
repoId: task.repoId,
taskId: task.id,
sessionId,
unread: false,
@ -574,9 +579,9 @@ const TranscriptPanel = memo(function TranscriptPanel({
const setSessionUnread = useCallback(
(sessionId: string, unread: boolean) => {
void taskWorkspaceClient.setSessionUnread({ taskId: task.id, sessionId, unread });
void taskWorkspaceClient.setSessionUnread({ repoId: task.repoId, taskId: task.id, sessionId, unread });
},
[task.id],
[task.id, task.repoId],
);
const startRenamingSession = useCallback(
@ -609,6 +614,7 @@ const TranscriptPanel = memo(function TranscriptPanel({
}
void taskWorkspaceClient.renameSession({
repoId: task.repoId,
taskId: task.id,
sessionId: editingSessionId,
title: trimmedName,
@ -629,9 +635,9 @@ const TranscriptPanel = memo(function TranscriptPanel({
}
onSyncRouteSession(task.id, nextSessionId);
void taskWorkspaceClient.closeSession({ taskId: task.id, sessionId });
void taskWorkspaceClient.closeSession({ repoId: task.repoId, taskId: task.id, sessionId });
},
[activeSessionId, task.id, task.sessions, lastAgentSessionId, onSetActiveSessionId, onSetLastAgentSessionId, onSyncRouteSession],
[activeSessionId, task.id, task.repoId, task.sessions, lastAgentSessionId, onSetActiveSessionId, onSetLastAgentSessionId, onSyncRouteSession],
);
const closeDiffTab = useCallback(
@ -649,12 +655,12 @@ const TranscriptPanel = memo(function TranscriptPanel({
const addSession = useCallback(() => {
void (async () => {
const { sessionId } = await taskWorkspaceClient.addSession({ taskId: task.id });
const { sessionId } = await taskWorkspaceClient.addSession({ repoId: task.repoId, taskId: task.id });
onSetLastAgentSessionId(sessionId);
onSetActiveSessionId(sessionId);
onSyncRouteSession(task.id, sessionId);
})();
}, [task.id, onSetActiveSessionId, onSetLastAgentSessionId, onSyncRouteSession]);
}, [task.id, task.repoId, onSetActiveSessionId, onSetLastAgentSessionId, onSyncRouteSession]);
const changeModel = useCallback(
(model: ModelId) => {
@ -663,6 +669,7 @@ const TranscriptPanel = memo(function TranscriptPanel({
}
void taskWorkspaceClient.changeModel({
repoId: task.repoId,
taskId: task.id,
sessionId: promptSession.id,
model,
@ -1663,7 +1670,7 @@ export function MockLayout({ organizationId, selectedTaskId, selectedSessionId }
autoCreatingSessionForTaskRef.current.add(activeTask.id);
void (async () => {
try {
const { sessionId } = await taskWorkspaceClient.addSession({ taskId: activeTask.id });
const { sessionId } = await taskWorkspaceClient.addSession({ repoId: activeTask.repoId, taskId: activeTask.id });
syncRouteSession(activeTask.id, sessionId, true);
} catch (error) {
logger.error(
@ -1755,9 +1762,16 @@ export function MockLayout({ organizationId, selectedTaskId, selectedSessionId }
[materializeOpenPullRequest, navigate, openPullRequestsByTaskId, tasks, organizationId],
);
const markTaskUnread = useCallback((id: string) => {
void taskWorkspaceClient.markTaskUnread({ taskId: id });
}, []);
const markTaskUnread = useCallback(
(id: string) => {
const task = tasks.find((candidate) => candidate.id === id);
if (!task) {
return;
}
void taskWorkspaceClient.markTaskUnread({ repoId: task.repoId, taskId: id });
},
[tasks],
);
const renameTask = useCallback(
(id: string) => {
@ -1776,7 +1790,7 @@ export function MockLayout({ organizationId, selectedTaskId, selectedSessionId }
return;
}
void taskWorkspaceClient.renameTask({ taskId: id, value: trimmedTitle });
void taskWorkspaceClient.renameTask({ repoId: currentTask.repoId, taskId: id, value: trimmedTitle });
},
[tasks],
);
@ -1785,14 +1799,14 @@ export function MockLayout({ organizationId, selectedTaskId, selectedSessionId }
if (!activeTask) {
throw new Error("Cannot archive without an active task");
}
void taskWorkspaceClient.archiveTask({ taskId: activeTask.id });
void taskWorkspaceClient.archiveTask({ repoId: activeTask.repoId, taskId: activeTask.id });
}, [activeTask]);
const publishPr = useCallback(() => {
if (!activeTask) {
throw new Error("Cannot publish PR without an active task");
}
void taskWorkspaceClient.publishPr({ taskId: activeTask.id });
void taskWorkspaceClient.publishPr({ repoId: activeTask.repoId, taskId: activeTask.id });
}, [activeTask]);
const revertFile = useCallback(
@ -1813,6 +1827,7 @@ export function MockLayout({ organizationId, selectedTaskId, selectedSessionId }
}));
void taskWorkspaceClient.revertFile({
repoId: activeTask.repoId,
taskId: activeTask.id,
path,
});

View file

@ -244,6 +244,7 @@ export interface TaskWorkspaceRenameInput {
}
export interface TaskWorkspaceSendMessageInput {
repoId: string;
taskId: string;
sessionId: string;
text: string;
@ -252,6 +253,7 @@ export interface TaskWorkspaceSendMessageInput {
}
export interface TaskWorkspaceSessionInput {
repoId: string;
taskId: string;
sessionId: string;
authSessionId?: string;