single chat sot

This commit is contained in:
Harivansh Rathi 2026-03-08 22:12:03 -07:00
parent aa70afbd7e
commit e4ee3e64f4
6 changed files with 443 additions and 84 deletions

View file

@ -7,7 +7,6 @@ import {
import { rm } from "node:fs/promises";
import { join } from "node:path";
import { URL } from "node:url";
import type { AgentMessage } from "@mariozechner/pi-agent-core";
import type { AgentSession, AgentSessionEvent } from "../agent-session.js";
import { extractMessageText, getLastAssistantText } from "./helpers.js";
import {
@ -24,6 +23,7 @@ import type {
GatewayMessageResult,
GatewayRuntimeOptions,
GatewaySessionFactory,
GatewaySessionState,
GatewaySessionSnapshot,
HistoryMessage,
HistoryPart,
@ -36,6 +36,10 @@ import {
extractUserText,
finishVercelStream,
} from "./vercel-ai-stream.js";
import {
buildGatewaySessionStateMessages,
messageContentToHistoryParts,
} from "./session-state.js";
export {
createGatewaySessionManager,
sanitizeSessionKey,
@ -47,6 +51,7 @@ export type {
GatewayMessageResult,
GatewayRuntimeOptions,
GatewaySessionFactory,
GatewaySessionState,
GatewaySessionSnapshot,
HistoryMessage,
HistoryPart,
@ -246,6 +251,8 @@ export class GatewayRuntime {
this.rejectQueuedMessages(managedSession, "Session reset");
await managedSession.session.newSession();
managedSession.processing = false;
managedSession.activeAssistantMessage = null;
managedSession.pendingToolResults = [];
managedSession.lastActiveAt = Date.now();
this.emitState(managedSession);
return;
@ -284,6 +291,8 @@ export class GatewayRuntime {
session,
queue: [],
processing: false,
activeAssistantMessage: null,
pendingToolResults: [],
createdAt: Date.now(),
lastActiveAt: Date.now(),
listeners: new Set(),
@ -359,6 +368,8 @@ export class GatewayRuntime {
} finally {
queued.onFinish?.();
managedSession.processing = false;
managedSession.activeAssistantMessage = null;
managedSession.pendingToolResults = [];
managedSession.lastActiveAt = Date.now();
this.emitState(managedSession);
if (managedSession.queue.length > 0) {
@ -396,18 +407,24 @@ export class GatewayRuntime {
): void {
switch (event.type) {
case "turn_start":
managedSession.lastActiveAt = Date.now();
this.emit(managedSession, {
type: "turn_start",
sessionKey: managedSession.sessionKey,
});
return;
case "turn_end":
managedSession.lastActiveAt = Date.now();
this.emit(managedSession, {
type: "turn_end",
sessionKey: managedSession.sessionKey,
});
return;
case "message_start":
managedSession.lastActiveAt = Date.now();
if (event.message.role === "assistant") {
managedSession.activeAssistantMessage = event.message;
}
this.emit(managedSession, {
type: "message_start",
sessionKey: managedSession.sessionKey,
@ -415,6 +432,10 @@ export class GatewayRuntime {
});
return;
case "message_update":
managedSession.lastActiveAt = Date.now();
if (event.message.role === "assistant") {
managedSession.activeAssistantMessage = event.message;
}
switch (event.assistantMessageEvent.type) {
case "text_delta":
this.emit(managedSession, {
@ -435,15 +456,32 @@ export class GatewayRuntime {
}
return;
case "message_end":
managedSession.lastActiveAt = Date.now();
if (event.message.role === "assistant") {
managedSession.activeAssistantMessage = null;
this.emit(managedSession, {
type: "message_complete",
sessionKey: managedSession.sessionKey,
text: extractMessageText(event.message),
});
return;
}
if (event.message.role === "toolResult") {
const toolCallId =
typeof (event.message as { toolCallId?: unknown }).toolCallId ===
"string"
? ((event.message as { toolCallId: string }).toolCallId ?? "")
: "";
if (toolCallId) {
managedSession.pendingToolResults =
managedSession.pendingToolResults.filter(
(entry) => entry.toolCallId !== toolCallId,
);
}
}
return;
case "tool_execution_start":
managedSession.lastActiveAt = Date.now();
this.emit(managedSession, {
type: "tool_start",
sessionKey: managedSession.sessionKey,
@ -453,6 +491,7 @@ export class GatewayRuntime {
});
return;
case "tool_execution_update":
managedSession.lastActiveAt = Date.now();
this.emit(managedSession, {
type: "tool_update",
sessionKey: managedSession.sessionKey,
@ -462,6 +501,19 @@ export class GatewayRuntime {
});
return;
case "tool_execution_end":
managedSession.lastActiveAt = Date.now();
managedSession.pendingToolResults = [
...managedSession.pendingToolResults.filter(
(entry) => entry.toolCallId !== event.toolCallId,
),
{
toolCallId: event.toolCallId,
toolName: event.toolName,
result: event.result,
isError: event.isError,
timestamp: Date.now(),
},
];
this.emit(managedSession, {
type: "tool_complete",
sessionKey: managedSession.sessionKey,
@ -491,6 +543,20 @@ export class GatewayRuntime {
});
}
private createSessionState(
managedSession: ManagedGatewaySession,
): GatewaySessionState {
return {
session: this.createSnapshot(managedSession),
messages: buildGatewaySessionStateMessages({
sessionKey: managedSession.sessionKey,
rawMessages: managedSession.session.messages,
activeAssistantMessage: managedSession.activeAssistantMessage,
pendingToolResults: managedSession.pendingToolResults,
}),
};
}
private createSnapshot(
managedSession: ManagedGatewaySession,
): GatewaySessionSnapshot {
@ -731,7 +797,7 @@ export class GatewayRuntime {
}
const sessionMatch = path.match(
/^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset|chat|history|model|reload))?$/,
/^\/sessions\/([^/]+)(?:\/(events|messages|abort|reset|chat|history|model|reload|state))?$/,
);
if (!sessionMatch) {
this.writeJson(response, 404, { error: "Not found" });
@ -809,6 +875,12 @@ export class GatewayRuntime {
return;
}
if (action === "state" && method === "GET") {
const session = await this.ensureSession(sessionKey);
this.writeJson(response, 200, this.createSessionState(session));
return;
}
if (action === "model" && method === "POST") {
const body = await this.readJsonBody(request);
const provider = typeof body.provider === "string" ? body.provider : "";
@ -1080,7 +1152,7 @@ export class GatewayRuntime {
messages.push({
id: `${msg.timestamp}-${msg.role}-${index}`,
role: msg.role,
parts: this.messageContentToParts(msg),
parts: messageContentToHistoryParts(msg),
timestamp: msg.timestamp,
});
}
@ -1173,87 +1245,6 @@ export class GatewayRuntime {
managed.session.settingsManager.reload();
}
private messageContentToParts(msg: AgentMessage): HistoryPart[] {
if (msg.role === "user") {
const content = msg.content;
if (typeof content === "string") {
return [{ type: "text", text: content }];
}
if (Array.isArray(content)) {
return content
.filter(
(c): c is { type: "text"; text: string } =>
typeof c === "object" && c !== null && c.type === "text",
)
.map((c) => ({ type: "text" as const, text: c.text }));
}
return [];
}
if (msg.role === "assistant") {
const content = msg.content;
if (!Array.isArray(content)) return [];
const parts: HistoryPart[] = [];
for (const c of content) {
if (typeof c !== "object" || c === null) continue;
if (c.type === "text") {
parts.push({
type: "text",
text: (c as { type: "text"; text: string }).text,
});
} else if (c.type === "thinking") {
parts.push({
type: "reasoning",
text: (c as { type: "thinking"; thinking: string }).thinking,
});
} else if (c.type === "toolCall") {
const tc = c as {
type: "toolCall";
id: string;
name: string;
arguments: unknown;
};
parts.push({
type: "tool-invocation",
toolCallId: tc.id,
toolName: tc.name,
args: tc.arguments,
state: "call",
});
}
}
return parts;
}
if (msg.role === "toolResult") {
const tr = msg as {
role: "toolResult";
toolCallId: string;
toolName: string;
content: unknown;
isError: boolean;
};
const textParts = Array.isArray(tr.content)
? (tr.content as { type: string; text?: string }[])
.filter((c) => c.type === "text" && typeof c.text === "string")
.map((c) => c.text as string)
.join("")
: "";
return [
{
type: "tool-invocation",
toolCallId: tr.toolCallId,
toolName: tr.toolName,
args: undefined,
state: tr.isError ? "error" : "result",
result: textParts,
},
];
}
return [];
}
getGatewaySessionDir(sessionKey: string): string {
return join(this.sessionDirRoot, sanitizeSessionKey(sessionKey));
}