From 41bf2776f1ab0d442ba1d9b9f7c3acf9d4301cd8 Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Sat, 14 Mar 2026 00:19:59 -0400 Subject: [PATCH] fix chat timeout --- .../src/core/gateway/durable-chat-run.ts | 257 ++++++++++++++++++ .../src/core/gateway/internal-types.ts | 10 +- .../coding-agent/src/core/gateway/runtime.ts | 113 +++++++- .../coding-agent/src/core/gateway/types.ts | 15 +- .../test/durable-chat-run.test.ts | 161 +++++++++++ .../test/gateway-session-titles.test.ts | 1 + .../coding-agent/test/gateway-steer.test.ts | 1 + 7 files changed, 551 insertions(+), 7 deletions(-) create mode 100644 packages/coding-agent/src/core/gateway/durable-chat-run.ts create mode 100644 packages/coding-agent/test/durable-chat-run.test.ts diff --git a/packages/coding-agent/src/core/gateway/durable-chat-run.ts b/packages/coding-agent/src/core/gateway/durable-chat-run.ts new file mode 100644 index 0000000..607ab92 --- /dev/null +++ b/packages/coding-agent/src/core/gateway/durable-chat-run.ts @@ -0,0 +1,257 @@ +import type { AgentMessage } from "@mariozechner/companion-agent-core"; +import type { AgentSessionEvent } from "../agent-session.js"; +import { extractMessageText } from "./helpers.js"; +import { messageContentToHistoryParts } from "./session-state.js"; +import type { GatewayTransientToolResult } from "./session-state.js"; +import type { GatewayMessageResult, GatewayMessageRequest } from "./types.js"; + +const FLUSH_INTERVAL_MS = 500; + +type PersistHistoryItem = { + role: "user" | "assistant" | "toolResult"; + text?: string; + partsJson: string; + timestamp: number; + idempotencyKey: string; +}; + +type ConvexRunStatus = "completed" | "failed" | "interrupted"; + +function normalizeErrorMessage(error: unknown): string { + if (error instanceof Error) { + return error.message; + } + return typeof error === "string" ? error : String(error); +} + +function readConvexSiteUrl(): string | null { + const raw = + process.env.CONVEX_SITE_URL ?? + process.env.NEXT_PUBLIC_CONVEX_SITE_URL ?? + process.env.CONVEX_URL ?? + process.env.NEXT_PUBLIC_CONVEX_URL; + return typeof raw === "string" && raw.trim().length > 0 ? raw.trim() : null; +} + +function readConvexSecret(): string | null { + const raw = process.env.CONVEX_SECRET; + return typeof raw === "string" && raw.trim().length > 0 ? raw.trim() : null; +} + +export class DurableChatRunReporter { + private readonly assistantMessageId: string; + private readonly convexSiteUrl: string; + private readonly convexSecret: string; + private latestAssistantMessage: AgentMessage | null = null; + private readonly knownToolResults = new Map< + string, + GatewayTransientToolResult + >(); + private flushTimer: ReturnType | null = null; + private flushChain: Promise = Promise.resolve(); + private flushFailure: Error | null = null; + + constructor( + private readonly durableRun: NonNullable< + GatewayMessageRequest["durableRun"] + >, + ) { + const convexSiteUrl = readConvexSiteUrl(); + const convexSecret = readConvexSecret(); + if (!convexSiteUrl || !convexSecret) { + throw new Error( + "Durable chat run reporting requires CONVEX_SITE_URL/CONVEX_URL and CONVEX_SECRET", + ); + } + this.convexSiteUrl = convexSiteUrl; + this.convexSecret = convexSecret; + this.assistantMessageId = `run:${durableRun.runId}:assistant`; + } + + handleSessionEvent( + event: AgentSessionEvent, + pendingToolResults: GatewayTransientToolResult[], + ): void { + for (const toolResult of pendingToolResults) { + this.knownToolResults.set(toolResult.toolCallId, toolResult); + } + + if (event.type === "message_start" && event.message.role === "assistant") { + this.latestAssistantMessage = event.message; + return; + } + + if (event.type === "message_update" && event.message.role === "assistant") { + this.latestAssistantMessage = event.message; + this.scheduleFlush(); + return; + } + + if (event.type === "message_end" && event.message.role === "assistant") { + this.latestAssistantMessage = event.message; + this.scheduleFlush(); + return; + } + + if ( + event.type === "tool_execution_end" || + event.type === "turn_end" || + (event.type === "message_end" && event.message.role === "toolResult") + ) { + this.scheduleFlush(); + } + } + + async finalize(result: GatewayMessageResult): Promise { + let status: ConvexRunStatus = result.ok + ? "completed" + : result.error?.includes("aborted") + ? "interrupted" + : "failed"; + let errorMessage = result.error; + + try { + await this.finalFlush(); + } catch (error) { + status = "failed"; + errorMessage = normalizeErrorMessage(error); + } + + const endpoint = + status === "completed" + ? "/api/chat/complete-run" + : status === "interrupted" + ? "/api/chat/interrupt-run" + : "/api/chat/fail-run"; + + await this.callConvexHttpAction(endpoint, { + runId: this.durableRun.runId, + ...(status === "failed" && errorMessage ? { error: errorMessage } : {}), + }); + } + + private scheduleFlush(): void { + if (this.flushTimer) return; + this.flushTimer = globalThis.setTimeout(() => { + this.flushTimer = null; + void this.flush().catch(() => undefined); + }, FLUSH_INTERVAL_MS); + } + + private async flush(): Promise { + this.throwIfFlushFailed(); + if (this.flushTimer) { + globalThis.clearTimeout(this.flushTimer); + this.flushTimer = null; + } + + const items = this.buildItems(); + if (items.length === 0) { + return; + } + + const flushPromise = this.flushChain.then(async () => { + this.throwIfFlushFailed(); + await this.callConvexHttpAction("/api/chat/run-messages", { + runId: this.durableRun.runId, + userId: this.durableRun.userId, + agentId: this.durableRun.agentId, + threadId: this.durableRun.threadId, + sessionKey: this.durableRun.sessionKey, + items, + }); + }); + this.flushChain = flushPromise.catch(() => undefined); + + try { + await flushPromise; + } catch (error) { + throw this.markFlushFailed(error); + } + } + + private async finalFlush(): Promise { + await this.flush(); + await this.flushChain; + this.throwIfFlushFailed(); + } + + private buildItems(): PersistHistoryItem[] { + const items: PersistHistoryItem[] = []; + + const assistantParts = + this.latestAssistantMessage?.role === "assistant" + ? messageContentToHistoryParts(this.latestAssistantMessage) + : []; + + for (const toolResult of this.knownToolResults.values()) { + assistantParts.push({ + type: "tool-invocation", + toolCallId: toolResult.toolCallId, + toolName: toolResult.toolName, + args: undefined, + state: toolResult.isError ? "error" : "result", + result: toolResult.result, + }); + } + + const firstToolResult = this.knownToolResults.values().next().value; + + if ( + this.latestAssistantMessage?.role === "assistant" || + assistantParts.length > 0 + ) { + items.push({ + role: "assistant", + text: + this.latestAssistantMessage?.role === "assistant" + ? extractMessageText(this.latestAssistantMessage) || undefined + : undefined, + partsJson: JSON.stringify(assistantParts), + timestamp: + this.latestAssistantMessage?.timestamp ?? + firstToolResult?.timestamp ?? + Date.now(), + idempotencyKey: this.assistantMessageId, + }); + } + + return items; + } + + private async callConvexHttpAction( + path: string, + body: Record, + ): Promise { + const response = await fetch(`${this.convexSiteUrl}${path}`, { + method: "POST", + headers: { + "Content-Type": "application/json", + Authorization: `Bearer ${this.convexSecret}`, + }, + body: JSON.stringify(body), + }); + if (!response.ok) { + const text = await response.text().catch(() => ""); + throw new Error( + `Convex HTTP action failed for ${path}: ${response.status} ${text}`, + ); + } + } + + private throwIfFlushFailed(): void { + if (this.flushFailure) { + throw this.flushFailure; + } + } + + private markFlushFailed(error: unknown): Error { + if (this.flushFailure) { + return this.flushFailure; + } + const normalizedError = + error instanceof Error ? error : new Error(normalizeErrorMessage(error)); + this.flushFailure = normalizedError; + return normalizedError; + } +} diff --git a/packages/coding-agent/src/core/gateway/internal-types.ts b/packages/coding-agent/src/core/gateway/internal-types.ts index 88274d4..2bbb086 100644 --- a/packages/coding-agent/src/core/gateway/internal-types.ts +++ b/packages/coding-agent/src/core/gateway/internal-types.ts @@ -1,5 +1,6 @@ import type { AgentMessage } from "@mariozechner/companion-agent-core"; import type { AgentSession } from "../agent-session.js"; +import type { DurableChatRunReporter } from "./durable-chat-run.js"; import type { GatewayMessageRequest, GatewayMessageResult, @@ -63,7 +64,13 @@ export type GatewayEvent = payload: { teamId: string; status: string; - members: Array<{ id: string; name: string; role?: string; status: string; message?: string }>; + members: Array<{ + id: string; + name: string; + role?: string; + status: string; + message?: string; + }>; }; } | { @@ -84,6 +91,7 @@ export interface ManagedGatewaySession { session: AgentSession; queue: GatewayQueuedMessage[]; processing: boolean; + activeDurableRun: DurableChatRunReporter | null; activeAssistantMessage: AgentMessage | null; pendingToolResults: GatewayTransientToolResult[]; createdAt: number; diff --git a/packages/coding-agent/src/core/gateway/runtime.ts b/packages/coding-agent/src/core/gateway/runtime.ts index 81ec27b..96291a8 100644 --- a/packages/coding-agent/src/core/gateway/runtime.ts +++ b/packages/coding-agent/src/core/gateway/runtime.ts @@ -10,6 +10,7 @@ import { URL } from "node:url"; import type { AgentMessage } from "@mariozechner/companion-agent-core"; import type { AgentSession, AgentSessionEvent } from "../agent-session.js"; import type { Settings } from "../settings-manager.js"; +import { DurableChatRunReporter } from "./durable-chat-run.js"; import { extractMessageText, getLastAssistantText } from "./helpers.js"; import { type GatewayEvent, @@ -108,6 +109,32 @@ function readString(value: unknown): string | undefined { return trimmed.length > 0 ? trimmed : undefined; } +function readDurableRun( + value: unknown, +): GatewayMessageRequest["durableRun"] | undefined { + if (!isRecord(value)) { + return undefined; + } + + const runId = readString(value.runId); + const userId = readString(value.userId); + const agentId = readString(value.agentId); + const threadId = readString(value.threadId); + const sessionKey = readString(value.sessionKey); + + if (!runId || !userId || !agentId || !threadId || !sessionKey) { + return undefined; + } + + return { + runId, + userId, + agentId, + threadId, + sessionKey, + }; +} + export function setActiveGatewayRuntime(runtime: GatewayRuntime | null): void { activeGatewayRuntime = runtime; } @@ -419,6 +446,7 @@ export class GatewayRuntime { session, queue: [], processing: false, + activeDurableRun: null, activeAssistantMessage: null, pendingToolResults: [], createdAt: Date.now(), @@ -462,18 +490,33 @@ export class GatewayRuntime { ); this.emitState(managedSession); + let result: GatewayMessageResult = { + ok: false, + response: "", + error: "Unknown error", + sessionKey: managedSession.sessionKey, + }; + let durableRunReporter: DurableChatRunReporter | null = null; + try { queued.onStart?.(); + if (queued.request.durableRun) { + durableRunReporter = new DurableChatRunReporter( + queued.request.durableRun, + ); + managedSession.activeDurableRun = durableRunReporter; + } await managedSession.session.prompt(queued.request.text, { images: queued.request.images, source: queued.request.source ?? "extension", }); const response = getLastAssistantText(managedSession.session); - queued.resolve({ + result = { ok: true, response, sessionKey: managedSession.sessionKey, - }); + }; + queued.resolve(result); } catch (error) { const message = error instanceof Error ? error.message : String(error); this.log( @@ -491,19 +534,30 @@ export class GatewayRuntime { error: message, }); } - queued.resolve({ + result = { ok: false, response: "", error: message, sessionKey: managedSession.sessionKey, - }); + }; + queued.resolve(result); } finally { queued.onFinish?.(); managedSession.processing = false; + managedSession.activeDurableRun = null; managedSession.activeAssistantMessage = null; managedSession.pendingToolResults = []; managedSession.lastActiveAt = Date.now(); this.emitState(managedSession); + if (durableRunReporter) { + await durableRunReporter.finalize(result).catch((error) => { + const message = + error instanceof Error ? error.message : String(error); + this.log( + `[chat-run] session=${managedSession.sessionKey} finalize error: ${message}`, + ); + }); + } if (managedSession.queue.length > 0) { void this.processNext(managedSession); } @@ -529,6 +583,13 @@ export class GatewayRuntime { managedSession: ManagedGatewaySession, event: AgentSessionEvent, ): void { + const forwardToDurableRun = () => { + managedSession.activeDurableRun?.handleSessionEvent( + event, + managedSession.pendingToolResults, + ); + }; + switch (event.type) { case "turn_start": managedSession.lastActiveAt = Date.now(); @@ -537,6 +598,7 @@ export class GatewayRuntime { type: "turn_start", sessionKey: managedSession.sessionKey, }); + forwardToDurableRun(); return; case "turn_end": managedSession.lastActiveAt = Date.now(); @@ -545,6 +607,7 @@ export class GatewayRuntime { type: "turn_end", sessionKey: managedSession.sessionKey, }); + forwardToDurableRun(); return; case "message_start": managedSession.lastActiveAt = Date.now(); @@ -556,6 +619,7 @@ export class GatewayRuntime { sessionKey: managedSession.sessionKey, role: event.message.role, }); + forwardToDurableRun(); return; case "message_update": managedSession.lastActiveAt = Date.now(); @@ -570,6 +634,7 @@ export class GatewayRuntime { delta: event.assistantMessageEvent.delta, contentIndex: event.assistantMessageEvent.contentIndex, }); + forwardToDurableRun(); return; case "thinking_delta": this.emit(managedSession, { @@ -578,8 +643,10 @@ export class GatewayRuntime { delta: event.assistantMessageEvent.delta, contentIndex: event.assistantMessageEvent.contentIndex, }); + forwardToDurableRun(); return; } + forwardToDurableRun(); return; case "message_end": managedSession.lastActiveAt = Date.now(); @@ -595,6 +662,7 @@ export class GatewayRuntime { text: extractMessageText(event.message), }); this.emitStructuredParts(managedSession, event.message); + forwardToDurableRun(); return; } if (event.message.role === "toolResult") { @@ -610,6 +678,7 @@ export class GatewayRuntime { ); } } + forwardToDurableRun(); return; case "tool_execution_start": managedSession.lastActiveAt = Date.now(); @@ -624,6 +693,7 @@ export class GatewayRuntime { toolName: event.toolName, args: event.args, }); + forwardToDurableRun(); return; case "tool_execution_update": managedSession.lastActiveAt = Date.now(); @@ -634,6 +704,7 @@ export class GatewayRuntime { toolName: event.toolName, partialResult: event.partialResult, }); + forwardToDurableRun(); return; case "tool_execution_end": managedSession.lastActiveAt = Date.now(); @@ -661,6 +732,7 @@ export class GatewayRuntime { result: event.result, isError: event.isError, }); + forwardToDurableRun(); return; } } @@ -1027,7 +1099,7 @@ export class GatewayRuntime { } const sessionMatch = path.match( - /^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset|chat|history|model|reload|state|steer))?$/, + /^\/sessions\/([^/]+)(?:\/(enqueue|events|messages|abort|reset|chat|history|model|reload|state|steer))?$/, ); if (!sessionMatch) { this.writeJson(response, 404, { error: "Not found" }); @@ -1066,6 +1138,37 @@ export class GatewayRuntime { return; } + if (action === "enqueue" && method === "POST") { + const body = await this.readJsonBody(request); + const text = extractUserText(body); + if (!text) { + this.writeJson(response, 400, { error: "Missing user message text" }); + return; + } + const durableRun = readDurableRun(body.durableRun); + const queued = await this.queueManagedMessage({ + request: { + sessionKey, + text, + source: "extension", + metadata: isRecord(body.metadata) ? body.metadata : undefined, + durableRun, + }, + }); + if (!queued.accepted) { + this.writeJson(response, 409, queued.errorResult); + return; + } + void queued.completion.catch(() => undefined); + this.writeJson(response, 202, { + ok: true, + queued: true, + sessionKey, + ...(durableRun ? { runId: durableRun.runId } : {}), + }); + return; + } + if (action === "messages" && method === "POST") { const body = await this.readJsonBody(request); const text = typeof body.text === "string" ? body.text : ""; diff --git a/packages/coding-agent/src/core/gateway/types.ts b/packages/coding-agent/src/core/gateway/types.ts index 938bdd4..201c0d3 100644 --- a/packages/coding-agent/src/core/gateway/types.ts +++ b/packages/coding-agent/src/core/gateway/types.ts @@ -26,6 +26,13 @@ export interface GatewayMessageRequest { source?: "interactive" | "rpc" | "extension"; images?: ImageContent[]; metadata?: Record; + durableRun?: { + runId: string; + userId: string; + agentId: string; + threadId: string; + sessionKey: string; + }; } export interface GatewayMessageResult { @@ -82,7 +89,13 @@ export type HistoryPart = type: "teamActivity"; teamId: string; status: string; - members: Array<{ id: string; name: string; role?: string; status: string; message?: string }>; + members: Array<{ + id: string; + name: string; + role?: string; + status: string; + message?: string; + }>; } | { type: "media"; url: string; mimeType?: string } | { type: "error"; code: string; message: string }; diff --git a/packages/coding-agent/test/durable-chat-run.test.ts b/packages/coding-agent/test/durable-chat-run.test.ts new file mode 100644 index 0000000..b613016 --- /dev/null +++ b/packages/coding-agent/test/durable-chat-run.test.ts @@ -0,0 +1,161 @@ +import { afterEach, describe, expect, it, vi } from "vitest"; +import { DurableChatRunReporter } from "../src/core/gateway/durable-chat-run.js"; + +const originalConvexUrl = process.env.CONVEX_URL; +const originalConvexSecret = process.env.CONVEX_SECRET; + +function mockOkResponse() { + return { + ok: true, + status: 200, + text: vi.fn().mockResolvedValue(""), + } as unknown as Response; +} + +describe("DurableChatRunReporter", () => { + afterEach(() => { + vi.restoreAllMocks(); + if (originalConvexUrl === undefined) { + delete process.env.CONVEX_URL; + } else { + process.env.CONVEX_URL = originalConvexUrl; + } + if (originalConvexSecret === undefined) { + delete process.env.CONVEX_SECRET; + } else { + process.env.CONVEX_SECRET = originalConvexSecret; + } + }); + + it("upserts a single assistant message with tool results and completes the run", async () => { + process.env.CONVEX_URL = "https://convex.example"; + process.env.CONVEX_SECRET = "test-secret"; + + const fetchMock = vi.fn().mockResolvedValue(mockOkResponse()); + vi.stubGlobal("fetch", fetchMock); + + const reporter = new DurableChatRunReporter({ + runId: "run-1", + userId: "user-1", + agentId: "agent-1", + threadId: "thread-1", + sessionKey: "session-1", + }); + + const assistantMessage = { + role: "assistant", + timestamp: 111, + content: [{ type: "text", text: "hello from the sandbox" }], + }; + + reporter.handleSessionEvent( + { + type: "message_start", + message: assistantMessage, + } as never, + [], + ); + reporter.handleSessionEvent( + { + type: "message_update", + message: assistantMessage, + assistantMessageEvent: { + type: "text_delta", + delta: "hello from the sandbox", + contentIndex: 0, + }, + } as never, + [], + ); + reporter.handleSessionEvent( + { + type: "tool_execution_end", + toolCallId: "call-1", + toolName: "bash", + result: { stdout: "done" }, + isError: false, + } as never, + [ + { + toolCallId: "call-1", + toolName: "bash", + result: { stdout: "done" }, + isError: false, + timestamp: 222, + }, + ], + ); + + await reporter.finalize({ + ok: true, + response: "hello from the sandbox", + sessionKey: "session-1", + }); + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(fetchMock.mock.calls[0]?.[0]).toBe( + "https://convex.example/api/chat/run-messages", + ); + expect(fetchMock.mock.calls[1]?.[0]).toBe( + "https://convex.example/api/chat/complete-run", + ); + + const runMessagesBody = JSON.parse( + String(fetchMock.mock.calls[0]?.[1]?.body), + ) as { + items: Array<{ + role: string; + idempotencyKey: string; + partsJson: string; + }>; + }; + expect(runMessagesBody.items).toHaveLength(1); + expect(runMessagesBody.items[0]).toMatchObject({ + role: "assistant", + idempotencyKey: "run:run-1:assistant", + }); + expect(JSON.parse(runMessagesBody.items[0]?.partsJson ?? "[]")).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + type: "text", + text: "hello from the sandbox", + }), + expect.objectContaining({ + type: "tool-invocation", + toolCallId: "call-1", + toolName: "bash", + state: "result", + result: { stdout: "done" }, + }), + ]), + ); + }); + + it("marks aborted runs as interrupted", async () => { + process.env.CONVEX_URL = "https://convex.example"; + process.env.CONVEX_SECRET = "test-secret"; + + const fetchMock = vi.fn().mockResolvedValue(mockOkResponse()); + vi.stubGlobal("fetch", fetchMock); + + const reporter = new DurableChatRunReporter({ + runId: "run-2", + userId: "user-1", + agentId: "agent-1", + threadId: "thread-1", + sessionKey: "session-1", + }); + + await reporter.finalize({ + ok: false, + response: "", + error: "Session aborted", + sessionKey: "session-1", + }); + + expect(fetchMock).toHaveBeenCalledTimes(1); + expect(fetchMock.mock.calls[0]?.[0]).toBe( + "https://convex.example/api/chat/interrupt-run", + ); + }); +}); diff --git a/packages/coding-agent/test/gateway-session-titles.test.ts b/packages/coding-agent/test/gateway-session-titles.test.ts index 9846f19..2573e2f 100644 --- a/packages/coding-agent/test/gateway-session-titles.test.ts +++ b/packages/coding-agent/test/gateway-session-titles.test.ts @@ -49,6 +49,7 @@ function addManagedSession( session, queue: [], processing: false, + activeDurableRun: null, activeAssistantMessage: null, pendingToolResults: [], createdAt: Date.now(), diff --git a/packages/coding-agent/test/gateway-steer.test.ts b/packages/coding-agent/test/gateway-steer.test.ts index 19d2d29..0a63ca4 100644 --- a/packages/coding-agent/test/gateway-steer.test.ts +++ b/packages/coding-agent/test/gateway-steer.test.ts @@ -49,6 +49,7 @@ function addManagedSession( session: session as never, queue: [], processing, + activeDurableRun: null, activeAssistantMessage: null, pendingToolResults: [], createdAt: Date.now(),