diff --git a/packages/coding-agent/src/core/gateway/durable-chat-run.ts b/packages/coding-agent/src/core/gateway/durable-chat-run.ts index 607ab92..a16a349 100644 --- a/packages/coding-agent/src/core/gateway/durable-chat-run.ts +++ b/packages/coding-agent/src/core/gateway/durable-chat-run.ts @@ -24,24 +24,31 @@ function normalizeErrorMessage(error: unknown): string { return typeof error === "string" ? error : String(error); } -function readConvexSiteUrl(): string | null { - const raw = - process.env.CONVEX_SITE_URL ?? - process.env.NEXT_PUBLIC_CONVEX_SITE_URL ?? - process.env.CONVEX_URL ?? - process.env.NEXT_PUBLIC_CONVEX_URL; - return typeof raw === "string" && raw.trim().length > 0 ? raw.trim() : null; -} +type DurableChatRunEventBody = + | { + items: PersistHistoryItem[]; + final?: { + status: ConvexRunStatus; + error?: string; + }; + } + | { + items?: PersistHistoryItem[]; + final: { + status: ConvexRunStatus; + error?: string; + }; + }; -function readConvexSecret(): string | null { - const raw = process.env.CONVEX_SECRET; - return typeof raw === "string" && raw.trim().length > 0 ? raw.trim() : null; +function buildAuthHeaders(token: string): Record { + return { + "Content-Type": "application/json", + Authorization: `Bearer ${token}`, + }; } export class DurableChatRunReporter { private readonly assistantMessageId: string; - private readonly convexSiteUrl: string; - private readonly convexSecret: string; private latestAssistantMessage: AgentMessage | null = null; private readonly knownToolResults = new Map< string, @@ -56,16 +63,15 @@ export class DurableChatRunReporter { GatewayMessageRequest["durableRun"] >, ) { - const convexSiteUrl = readConvexSiteUrl(); - const convexSecret = readConvexSecret(); - if (!convexSiteUrl || !convexSecret) { + if ( + durableRun.callbackUrl.trim().length === 0 || + durableRun.callbackToken.trim().length === 0 + ) { throw new Error( - "Durable chat run reporting requires CONVEX_SITE_URL/CONVEX_URL and CONVEX_SECRET", + "Durable chat run reporting requires callbackUrl and callbackToken", ); } - this.convexSiteUrl = convexSiteUrl; - this.convexSecret = convexSecret; - this.assistantMessageId = `run:${durableRun.runId}:assistant`; + this.assistantMessageId = `run:${this.durableRun.runId}:assistant`; } handleSessionEvent( @@ -116,17 +122,11 @@ export class DurableChatRunReporter { status = "failed"; errorMessage = normalizeErrorMessage(error); } - - const endpoint = - status === "completed" - ? "/api/chat/complete-run" - : status === "interrupted" - ? "/api/chat/interrupt-run" - : "/api/chat/fail-run"; - - await this.callConvexHttpAction(endpoint, { - runId: this.durableRun.runId, - ...(status === "failed" && errorMessage ? { error: errorMessage } : {}), + await this.postEvent({ + final: { + status, + ...(status === "failed" && errorMessage ? { error: errorMessage } : {}), + }, }); } @@ -152,12 +152,7 @@ export class DurableChatRunReporter { const flushPromise = this.flushChain.then(async () => { this.throwIfFlushFailed(); - await this.callConvexHttpAction("/api/chat/run-messages", { - runId: this.durableRun.runId, - userId: this.durableRun.userId, - agentId: this.durableRun.agentId, - threadId: this.durableRun.threadId, - sessionKey: this.durableRun.sessionKey, + await this.postEvent({ items, }); }); @@ -177,8 +172,6 @@ export class DurableChatRunReporter { } private buildItems(): PersistHistoryItem[] { - const items: PersistHistoryItem[] = []; - const assistantParts = this.latestAssistantMessage?.role === "assistant" ? messageContentToHistoryParts(this.latestAssistantMessage) @@ -201,40 +194,36 @@ export class DurableChatRunReporter { this.latestAssistantMessage?.role === "assistant" || assistantParts.length > 0 ) { - items.push({ - role: "assistant", - text: - this.latestAssistantMessage?.role === "assistant" - ? extractMessageText(this.latestAssistantMessage) || undefined - : undefined, - partsJson: JSON.stringify(assistantParts), - timestamp: - this.latestAssistantMessage?.timestamp ?? - firstToolResult?.timestamp ?? - Date.now(), - idempotencyKey: this.assistantMessageId, - }); + return [ + { + role: "assistant", + text: + this.latestAssistantMessage?.role === "assistant" + ? extractMessageText(this.latestAssistantMessage) || undefined + : undefined, + partsJson: JSON.stringify(assistantParts), + timestamp: + this.latestAssistantMessage?.timestamp ?? + firstToolResult?.timestamp ?? + Date.now(), + idempotencyKey: this.assistantMessageId, + }, + ]; } - return items; + return []; } - private async callConvexHttpAction( - path: string, - body: Record, - ): Promise { - const response = await fetch(`${this.convexSiteUrl}${path}`, { + private async postEvent(body: DurableChatRunEventBody): Promise { + const response = await fetch(this.durableRun.callbackUrl, { method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${this.convexSecret}`, - }, + headers: buildAuthHeaders(this.durableRun.callbackToken), body: JSON.stringify(body), }); if (!response.ok) { const text = await response.text().catch(() => ""); throw new Error( - `Convex HTTP action failed for ${path}: ${response.status} ${text}`, + `Chat run relay failed: ${response.status} ${text}`.trim(), ); } } diff --git a/packages/coding-agent/src/core/gateway/runtime.ts b/packages/coding-agent/src/core/gateway/runtime.ts index f2026f9..586979a 100644 --- a/packages/coding-agent/src/core/gateway/runtime.ts +++ b/packages/coding-agent/src/core/gateway/runtime.ts @@ -117,21 +117,17 @@ function readDurableRun( } const runId = readString(value.runId); - const userId = readString(value.userId); - const agentId = readString(value.agentId); - const threadId = readString(value.threadId); - const sessionKey = readString(value.sessionKey); + const callbackUrl = readString(value.callbackUrl); + const callbackToken = readString(value.callbackToken); - if (!runId || !userId || !agentId || !threadId || !sessionKey) { + if (!runId || !callbackUrl || !callbackToken) { return undefined; } return { runId, - userId, - agentId, - threadId, - sessionKey, + callbackUrl, + callbackToken, }; } @@ -516,7 +512,6 @@ export class GatewayRuntime { response, sessionKey: managedSession.sessionKey, }; - queued.resolve(result); } catch (error) { const message = error instanceof Error ? error.message : String(error); this.log( @@ -540,24 +535,37 @@ export class GatewayRuntime { error: message, sessionKey: managedSession.sessionKey, }; - queued.resolve(result); } finally { queued.onFinish?.(); + if (durableRunReporter) { + try { + await durableRunReporter.finalize(result); + } catch (error) { + const message = + error instanceof Error ? error.message : String(error); + this.log( + `[chat-run] session=${managedSession.sessionKey} finalize error: ${message}`, + ); + this.emit(managedSession, { + type: "error", + sessionKey: managedSession.sessionKey, + error: message, + }); + result = { + ok: false, + response: result.response, + error: message, + sessionKey: managedSession.sessionKey, + }; + } + } + queued.resolve(result); managedSession.processing = false; managedSession.activeDurableRun = null; managedSession.activeAssistantMessage = null; managedSession.pendingToolResults = []; managedSession.lastActiveAt = Date.now(); this.emitState(managedSession); - if (durableRunReporter) { - await durableRunReporter.finalize(result).catch((error) => { - const message = - error instanceof Error ? error.message : String(error); - this.log( - `[chat-run] session=${managedSession.sessionKey} finalize error: ${message}`, - ); - }); - } if (managedSession.queue.length > 0) { void this.processNext(managedSession); } diff --git a/packages/coding-agent/src/core/gateway/types.ts b/packages/coding-agent/src/core/gateway/types.ts index 23113ea..2311a43 100644 --- a/packages/coding-agent/src/core/gateway/types.ts +++ b/packages/coding-agent/src/core/gateway/types.ts @@ -28,10 +28,8 @@ export interface GatewayMessageRequest { metadata?: Record; durableRun?: { runId: string; - userId: string; - agentId: string; - threadId: string; - sessionKey: string; + callbackUrl: string; + callbackToken: string; }; } diff --git a/packages/coding-agent/test/durable-chat-run.test.ts b/packages/coding-agent/test/durable-chat-run.test.ts index b613016..6a774da 100644 --- a/packages/coding-agent/test/durable-chat-run.test.ts +++ b/packages/coding-agent/test/durable-chat-run.test.ts @@ -1,9 +1,6 @@ import { afterEach, describe, expect, it, vi } from "vitest"; import { DurableChatRunReporter } from "../src/core/gateway/durable-chat-run.js"; -const originalConvexUrl = process.env.CONVEX_URL; -const originalConvexSecret = process.env.CONVEX_SECRET; - function mockOkResponse() { return { ok: true, @@ -15,31 +12,16 @@ function mockOkResponse() { describe("DurableChatRunReporter", () => { afterEach(() => { vi.restoreAllMocks(); - if (originalConvexUrl === undefined) { - delete process.env.CONVEX_URL; - } else { - process.env.CONVEX_URL = originalConvexUrl; - } - if (originalConvexSecret === undefined) { - delete process.env.CONVEX_SECRET; - } else { - process.env.CONVEX_SECRET = originalConvexSecret; - } }); - it("upserts a single assistant message with tool results and completes the run", async () => { - process.env.CONVEX_URL = "https://convex.example"; - process.env.CONVEX_SECRET = "test-secret"; - + it("posts assistant state to the relay and completes the run", async () => { const fetchMock = vi.fn().mockResolvedValue(mockOkResponse()); vi.stubGlobal("fetch", fetchMock); const reporter = new DurableChatRunReporter({ runId: "run-1", - userId: "user-1", - agentId: "agent-1", - threadId: "thread-1", - sessionKey: "session-1", + callbackUrl: "https://web.example/api/chat/runs/run-1/events", + callbackToken: "callback-token", }); const assistantMessage = { @@ -94,24 +76,28 @@ describe("DurableChatRunReporter", () => { expect(fetchMock).toHaveBeenCalledTimes(2); expect(fetchMock.mock.calls[0]?.[0]).toBe( - "https://convex.example/api/chat/run-messages", + "https://web.example/api/chat/runs/run-1/events", ); expect(fetchMock.mock.calls[1]?.[0]).toBe( - "https://convex.example/api/chat/complete-run", + "https://web.example/api/chat/runs/run-1/events", ); - const runMessagesBody = JSON.parse( - String(fetchMock.mock.calls[0]?.[1]?.body), - ) as { + expect(fetchMock.mock.calls[0]?.[1]?.headers).toMatchObject({ + Authorization: "Bearer callback-token", + "Content-Type": "application/json", + }); + + const runMessagesCall = fetchMock.mock.calls.find((call) => + String(call[1]?.body).includes('"items"'), + ); + const runMessagesBody = JSON.parse(String(runMessagesCall?.[1]?.body)) as { items: Array<{ - role: string; idempotencyKey: string; partsJson: string; }>; }; expect(runMessagesBody.items).toHaveLength(1); expect(runMessagesBody.items[0]).toMatchObject({ - role: "assistant", idempotencyKey: "run:run-1:assistant", }); expect(JSON.parse(runMessagesBody.items[0]?.partsJson ?? "[]")).toEqual( @@ -129,21 +115,24 @@ describe("DurableChatRunReporter", () => { }), ]), ); + + expect( + JSON.parse(String(fetchMock.mock.calls[1]?.[1]?.body)), + ).toMatchObject({ + final: { + status: "completed", + }, + }); }); it("marks aborted runs as interrupted", async () => { - process.env.CONVEX_URL = "https://convex.example"; - process.env.CONVEX_SECRET = "test-secret"; - const fetchMock = vi.fn().mockResolvedValue(mockOkResponse()); vi.stubGlobal("fetch", fetchMock); const reporter = new DurableChatRunReporter({ runId: "run-2", - userId: "user-1", - agentId: "agent-1", - threadId: "thread-1", - sessionKey: "session-1", + callbackUrl: "https://web.example/api/chat/runs/run-2/events", + callbackToken: "callback-token", }); await reporter.finalize({ @@ -155,7 +144,14 @@ describe("DurableChatRunReporter", () => { expect(fetchMock).toHaveBeenCalledTimes(1); expect(fetchMock.mock.calls[0]?.[0]).toBe( - "https://convex.example/api/chat/interrupt-run", + "https://web.example/api/chat/runs/run-2/events", ); + expect( + JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body)), + ).toMatchObject({ + final: { + status: "interrupted", + }, + }); }); });