This commit is contained in:
Harivansh Rathi 2026-03-14 14:53:32 -04:00
parent c79ae1e621
commit cb30fa5fd1
4 changed files with 114 additions and 123 deletions

View file

@ -24,24 +24,31 @@ function normalizeErrorMessage(error: unknown): string {
return typeof error === "string" ? error : String(error);
}
function readConvexSiteUrl(): string | null {
const raw =
process.env.CONVEX_SITE_URL ??
process.env.NEXT_PUBLIC_CONVEX_SITE_URL ??
process.env.CONVEX_URL ??
process.env.NEXT_PUBLIC_CONVEX_URL;
return typeof raw === "string" && raw.trim().length > 0 ? raw.trim() : null;
}
type DurableChatRunEventBody =
| {
items: PersistHistoryItem[];
final?: {
status: ConvexRunStatus;
error?: string;
};
}
| {
items?: PersistHistoryItem[];
final: {
status: ConvexRunStatus;
error?: string;
};
};
function readConvexSecret(): string | null {
const raw = process.env.CONVEX_SECRET;
return typeof raw === "string" && raw.trim().length > 0 ? raw.trim() : null;
function buildAuthHeaders(token: string): Record<string, string> {
return {
"Content-Type": "application/json",
Authorization: `Bearer ${token}`,
};
}
export class DurableChatRunReporter {
private readonly assistantMessageId: string;
private readonly convexSiteUrl: string;
private readonly convexSecret: string;
private latestAssistantMessage: AgentMessage | null = null;
private readonly knownToolResults = new Map<
string,
@ -56,16 +63,15 @@ export class DurableChatRunReporter {
GatewayMessageRequest["durableRun"]
>,
) {
const convexSiteUrl = readConvexSiteUrl();
const convexSecret = readConvexSecret();
if (!convexSiteUrl || !convexSecret) {
if (
durableRun.callbackUrl.trim().length === 0 ||
durableRun.callbackToken.trim().length === 0
) {
throw new Error(
"Durable chat run reporting requires CONVEX_SITE_URL/CONVEX_URL and CONVEX_SECRET",
"Durable chat run reporting requires callbackUrl and callbackToken",
);
}
this.convexSiteUrl = convexSiteUrl;
this.convexSecret = convexSecret;
this.assistantMessageId = `run:${durableRun.runId}:assistant`;
this.assistantMessageId = `run:${this.durableRun.runId}:assistant`;
}
handleSessionEvent(
@ -116,17 +122,11 @@ export class DurableChatRunReporter {
status = "failed";
errorMessage = normalizeErrorMessage(error);
}
const endpoint =
status === "completed"
? "/api/chat/complete-run"
: status === "interrupted"
? "/api/chat/interrupt-run"
: "/api/chat/fail-run";
await this.callConvexHttpAction(endpoint, {
runId: this.durableRun.runId,
...(status === "failed" && errorMessage ? { error: errorMessage } : {}),
await this.postEvent({
final: {
status,
...(status === "failed" && errorMessage ? { error: errorMessage } : {}),
},
});
}
@ -152,12 +152,7 @@ export class DurableChatRunReporter {
const flushPromise = this.flushChain.then(async () => {
this.throwIfFlushFailed();
await this.callConvexHttpAction("/api/chat/run-messages", {
runId: this.durableRun.runId,
userId: this.durableRun.userId,
agentId: this.durableRun.agentId,
threadId: this.durableRun.threadId,
sessionKey: this.durableRun.sessionKey,
await this.postEvent({
items,
});
});
@ -177,8 +172,6 @@ export class DurableChatRunReporter {
}
private buildItems(): PersistHistoryItem[] {
const items: PersistHistoryItem[] = [];
const assistantParts =
this.latestAssistantMessage?.role === "assistant"
? messageContentToHistoryParts(this.latestAssistantMessage)
@ -201,40 +194,36 @@ export class DurableChatRunReporter {
this.latestAssistantMessage?.role === "assistant" ||
assistantParts.length > 0
) {
items.push({
role: "assistant",
text:
this.latestAssistantMessage?.role === "assistant"
? extractMessageText(this.latestAssistantMessage) || undefined
: undefined,
partsJson: JSON.stringify(assistantParts),
timestamp:
this.latestAssistantMessage?.timestamp ??
firstToolResult?.timestamp ??
Date.now(),
idempotencyKey: this.assistantMessageId,
});
return [
{
role: "assistant",
text:
this.latestAssistantMessage?.role === "assistant"
? extractMessageText(this.latestAssistantMessage) || undefined
: undefined,
partsJson: JSON.stringify(assistantParts),
timestamp:
this.latestAssistantMessage?.timestamp ??
firstToolResult?.timestamp ??
Date.now(),
idempotencyKey: this.assistantMessageId,
},
];
}
return items;
return [];
}
private async callConvexHttpAction(
path: string,
body: Record<string, unknown>,
): Promise<void> {
const response = await fetch(`${this.convexSiteUrl}${path}`, {
private async postEvent(body: DurableChatRunEventBody): Promise<void> {
const response = await fetch(this.durableRun.callbackUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
Authorization: `Bearer ${this.convexSecret}`,
},
headers: buildAuthHeaders(this.durableRun.callbackToken),
body: JSON.stringify(body),
});
if (!response.ok) {
const text = await response.text().catch(() => "");
throw new Error(
`Convex HTTP action failed for ${path}: ${response.status} ${text}`,
`Chat run relay failed: ${response.status} ${text}`.trim(),
);
}
}

View file

@ -117,21 +117,17 @@ function readDurableRun(
}
const runId = readString(value.runId);
const userId = readString(value.userId);
const agentId = readString(value.agentId);
const threadId = readString(value.threadId);
const sessionKey = readString(value.sessionKey);
const callbackUrl = readString(value.callbackUrl);
const callbackToken = readString(value.callbackToken);
if (!runId || !userId || !agentId || !threadId || !sessionKey) {
if (!runId || !callbackUrl || !callbackToken) {
return undefined;
}
return {
runId,
userId,
agentId,
threadId,
sessionKey,
callbackUrl,
callbackToken,
};
}
@ -516,7 +512,6 @@ export class GatewayRuntime {
response,
sessionKey: managedSession.sessionKey,
};
queued.resolve(result);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
this.log(
@ -540,24 +535,37 @@ export class GatewayRuntime {
error: message,
sessionKey: managedSession.sessionKey,
};
queued.resolve(result);
} finally {
queued.onFinish?.();
if (durableRunReporter) {
try {
await durableRunReporter.finalize(result);
} catch (error) {
const message =
error instanceof Error ? error.message : String(error);
this.log(
`[chat-run] session=${managedSession.sessionKey} finalize error: ${message}`,
);
this.emit(managedSession, {
type: "error",
sessionKey: managedSession.sessionKey,
error: message,
});
result = {
ok: false,
response: result.response,
error: message,
sessionKey: managedSession.sessionKey,
};
}
}
queued.resolve(result);
managedSession.processing = false;
managedSession.activeDurableRun = null;
managedSession.activeAssistantMessage = null;
managedSession.pendingToolResults = [];
managedSession.lastActiveAt = Date.now();
this.emitState(managedSession);
if (durableRunReporter) {
await durableRunReporter.finalize(result).catch((error) => {
const message =
error instanceof Error ? error.message : String(error);
this.log(
`[chat-run] session=${managedSession.sessionKey} finalize error: ${message}`,
);
});
}
if (managedSession.queue.length > 0) {
void this.processNext(managedSession);
}

View file

@ -28,10 +28,8 @@ export interface GatewayMessageRequest {
metadata?: Record<string, unknown>;
durableRun?: {
runId: string;
userId: string;
agentId: string;
threadId: string;
sessionKey: string;
callbackUrl: string;
callbackToken: string;
};
}

View file

@ -1,9 +1,6 @@
import { afterEach, describe, expect, it, vi } from "vitest";
import { DurableChatRunReporter } from "../src/core/gateway/durable-chat-run.js";
const originalConvexUrl = process.env.CONVEX_URL;
const originalConvexSecret = process.env.CONVEX_SECRET;
function mockOkResponse() {
return {
ok: true,
@ -15,31 +12,16 @@ function mockOkResponse() {
describe("DurableChatRunReporter", () => {
afterEach(() => {
vi.restoreAllMocks();
if (originalConvexUrl === undefined) {
delete process.env.CONVEX_URL;
} else {
process.env.CONVEX_URL = originalConvexUrl;
}
if (originalConvexSecret === undefined) {
delete process.env.CONVEX_SECRET;
} else {
process.env.CONVEX_SECRET = originalConvexSecret;
}
});
it("upserts a single assistant message with tool results and completes the run", async () => {
process.env.CONVEX_URL = "https://convex.example";
process.env.CONVEX_SECRET = "test-secret";
it("posts assistant state to the relay and completes the run", async () => {
const fetchMock = vi.fn<typeof fetch>().mockResolvedValue(mockOkResponse());
vi.stubGlobal("fetch", fetchMock);
const reporter = new DurableChatRunReporter({
runId: "run-1",
userId: "user-1",
agentId: "agent-1",
threadId: "thread-1",
sessionKey: "session-1",
callbackUrl: "https://web.example/api/chat/runs/run-1/events",
callbackToken: "callback-token",
});
const assistantMessage = {
@ -94,24 +76,28 @@ describe("DurableChatRunReporter", () => {
expect(fetchMock).toHaveBeenCalledTimes(2);
expect(fetchMock.mock.calls[0]?.[0]).toBe(
"https://convex.example/api/chat/run-messages",
"https://web.example/api/chat/runs/run-1/events",
);
expect(fetchMock.mock.calls[1]?.[0]).toBe(
"https://convex.example/api/chat/complete-run",
"https://web.example/api/chat/runs/run-1/events",
);
const runMessagesBody = JSON.parse(
String(fetchMock.mock.calls[0]?.[1]?.body),
) as {
expect(fetchMock.mock.calls[0]?.[1]?.headers).toMatchObject({
Authorization: "Bearer callback-token",
"Content-Type": "application/json",
});
const runMessagesCall = fetchMock.mock.calls.find((call) =>
String(call[1]?.body).includes('"items"'),
);
const runMessagesBody = JSON.parse(String(runMessagesCall?.[1]?.body)) as {
items: Array<{
role: string;
idempotencyKey: string;
partsJson: string;
}>;
};
expect(runMessagesBody.items).toHaveLength(1);
expect(runMessagesBody.items[0]).toMatchObject({
role: "assistant",
idempotencyKey: "run:run-1:assistant",
});
expect(JSON.parse(runMessagesBody.items[0]?.partsJson ?? "[]")).toEqual(
@ -129,21 +115,24 @@ describe("DurableChatRunReporter", () => {
}),
]),
);
expect(
JSON.parse(String(fetchMock.mock.calls[1]?.[1]?.body)),
).toMatchObject({
final: {
status: "completed",
},
});
});
it("marks aborted runs as interrupted", async () => {
process.env.CONVEX_URL = "https://convex.example";
process.env.CONVEX_SECRET = "test-secret";
const fetchMock = vi.fn<typeof fetch>().mockResolvedValue(mockOkResponse());
vi.stubGlobal("fetch", fetchMock);
const reporter = new DurableChatRunReporter({
runId: "run-2",
userId: "user-1",
agentId: "agent-1",
threadId: "thread-1",
sessionKey: "session-1",
callbackUrl: "https://web.example/api/chat/runs/run-2/events",
callbackToken: "callback-token",
});
await reporter.finalize({
@ -155,7 +144,14 @@ describe("DurableChatRunReporter", () => {
expect(fetchMock).toHaveBeenCalledTimes(1);
expect(fetchMock.mock.calls[0]?.[0]).toBe(
"https://convex.example/api/chat/interrupt-run",
"https://web.example/api/chat/runs/run-2/events",
);
expect(
JSON.parse(String(fetchMock.mock.calls[0]?.[1]?.body)),
).toMatchObject({
final: {
status: "interrupted",
},
});
});
});