mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-16 00:03:00 +00:00
Previously, errors in turn_end events (e.g., from OpenRouter Auto Router) were not captured in agent.state.error, making failed requests appear as successful completions. Fixes #6
306 lines
8.6 KiB
TypeScript
306 lines
8.6 KiB
TypeScript
import type { ImageContent, Message, QueuedMessage, TextContent } from "@mariozechner/pi-ai";
|
|
import { getModel } from "@mariozechner/pi-ai";
|
|
import type { AgentTransport } from "./transports/types.js";
|
|
import type { AgentEvent, AgentState, AppMessage, Attachment, ThinkingLevel } from "./types.js";
|
|
|
|
/**
|
|
* Default message transformer: Keep only LLM-compatible messages, strip app-specific fields.
|
|
* Converts attachments to proper content blocks (images → ImageContent, documents → TextContent).
|
|
*/
|
|
function defaultMessageTransformer(messages: AppMessage[]): Message[] {
|
|
return messages
|
|
.filter((m) => {
|
|
// Only keep standard LLM message roles
|
|
return m.role === "user" || m.role === "assistant" || m.role === "toolResult";
|
|
})
|
|
.map((m) => {
|
|
if (m.role === "user") {
|
|
const { attachments, ...rest } = m as any;
|
|
|
|
// If no attachments, return as-is
|
|
if (!attachments || attachments.length === 0) {
|
|
return rest as Message;
|
|
}
|
|
|
|
// Convert attachments to content blocks
|
|
const content = Array.isArray(rest.content) ? [...rest.content] : [{ type: "text", text: rest.content }];
|
|
|
|
for (const attachment of attachments as Attachment[]) {
|
|
// Add image blocks for image attachments
|
|
if (attachment.type === "image") {
|
|
content.push({
|
|
type: "image",
|
|
data: attachment.content,
|
|
mimeType: attachment.mimeType,
|
|
} as ImageContent);
|
|
}
|
|
// Add text blocks for documents with extracted text
|
|
else if (attachment.type === "document" && attachment.extractedText) {
|
|
content.push({
|
|
type: "text",
|
|
text: `\n\n[Document: ${attachment.fileName}]\n${attachment.extractedText}`,
|
|
isDocument: true,
|
|
} as TextContent);
|
|
}
|
|
}
|
|
|
|
return { ...rest, content } as Message;
|
|
}
|
|
return m as Message;
|
|
});
|
|
}
|
|
|
|
export interface AgentOptions {
|
|
initialState?: Partial<AgentState>;
|
|
transport: AgentTransport;
|
|
// Transform app messages to LLM-compatible messages before sending to transport
|
|
messageTransformer?: (messages: AppMessage[]) => Message[] | Promise<Message[]>;
|
|
}
|
|
|
|
export class Agent {
|
|
private _state: AgentState = {
|
|
systemPrompt: "",
|
|
model: getModel("google", "gemini-2.5-flash-lite-preview-06-17"),
|
|
thinkingLevel: "off",
|
|
tools: [],
|
|
messages: [],
|
|
isStreaming: false,
|
|
streamMessage: null,
|
|
pendingToolCalls: new Set<string>(),
|
|
error: undefined,
|
|
};
|
|
private listeners = new Set<(e: AgentEvent) => void>();
|
|
private abortController?: AbortController;
|
|
private transport: AgentTransport;
|
|
private messageTransformer: (messages: AppMessage[]) => Message[] | Promise<Message[]>;
|
|
private messageQueue: Array<QueuedMessage<AppMessage>> = [];
|
|
|
|
constructor(opts: AgentOptions) {
|
|
this._state = { ...this._state, ...opts.initialState };
|
|
this.transport = opts.transport;
|
|
this.messageTransformer = opts.messageTransformer || defaultMessageTransformer;
|
|
}
|
|
|
|
get state(): AgentState {
|
|
return this._state;
|
|
}
|
|
|
|
subscribe(fn: (e: AgentEvent) => void): () => void {
|
|
this.listeners.add(fn);
|
|
return () => this.listeners.delete(fn);
|
|
}
|
|
|
|
// State mutators - update internal state without emitting events
|
|
setSystemPrompt(v: string) {
|
|
this._state.systemPrompt = v;
|
|
}
|
|
|
|
setModel(m: typeof this._state.model) {
|
|
this._state.model = m;
|
|
}
|
|
|
|
setThinkingLevel(l: ThinkingLevel) {
|
|
this._state.thinkingLevel = l;
|
|
}
|
|
|
|
setTools(t: typeof this._state.tools) {
|
|
this._state.tools = t;
|
|
}
|
|
|
|
replaceMessages(ms: AppMessage[]) {
|
|
this._state.messages = ms.slice();
|
|
}
|
|
|
|
appendMessage(m: AppMessage) {
|
|
this._state.messages = [...this._state.messages, m];
|
|
}
|
|
|
|
async queueMessage(m: AppMessage) {
|
|
// Transform message and queue it for injection at next turn
|
|
const transformed = await this.messageTransformer([m]);
|
|
this.messageQueue.push({
|
|
original: m,
|
|
llm: transformed[0], // undefined if filtered out
|
|
});
|
|
}
|
|
|
|
clearMessages() {
|
|
this._state.messages = [];
|
|
}
|
|
|
|
abort() {
|
|
this.abortController?.abort();
|
|
}
|
|
|
|
async prompt(input: string, attachments?: Attachment[]) {
|
|
const model = this._state.model;
|
|
if (!model) {
|
|
throw new Error("No model configured");
|
|
}
|
|
|
|
// Build user message with attachments
|
|
const content: Array<TextContent | ImageContent> = [{ type: "text", text: input }];
|
|
if (attachments?.length) {
|
|
for (const a of attachments) {
|
|
if (a.type === "image") {
|
|
content.push({ type: "image", data: a.content, mimeType: a.mimeType });
|
|
} else if (a.type === "document" && a.extractedText) {
|
|
content.push({
|
|
type: "text",
|
|
text: `\n\n[Document: ${a.fileName}]\n${a.extractedText}`,
|
|
isDocument: true,
|
|
} as TextContent);
|
|
}
|
|
}
|
|
}
|
|
|
|
const userMessage: AppMessage = {
|
|
role: "user",
|
|
content,
|
|
attachments: attachments?.length ? attachments : undefined,
|
|
timestamp: Date.now(),
|
|
};
|
|
|
|
this.abortController = new AbortController();
|
|
this._state.isStreaming = true;
|
|
this._state.streamMessage = null;
|
|
this._state.error = undefined;
|
|
|
|
const reasoning =
|
|
this._state.thinkingLevel === "off"
|
|
? undefined
|
|
: this._state.thinkingLevel === "minimal"
|
|
? "low"
|
|
: this._state.thinkingLevel;
|
|
|
|
const cfg = {
|
|
systemPrompt: this._state.systemPrompt,
|
|
tools: this._state.tools,
|
|
model,
|
|
reasoning,
|
|
getQueuedMessages: async <T>() => {
|
|
// Return queued messages (they'll be added to state via message_end event)
|
|
const queued = this.messageQueue.slice();
|
|
this.messageQueue = [];
|
|
return queued as QueuedMessage<T>[];
|
|
},
|
|
};
|
|
|
|
// Track all messages generated in this prompt
|
|
const generatedMessages: AppMessage[] = [];
|
|
|
|
try {
|
|
let partial: Message | null = null;
|
|
|
|
// Transform app messages to LLM-compatible messages (initial set)
|
|
const llmMessages = await this.messageTransformer(this._state.messages);
|
|
|
|
for await (const ev of this.transport.run(
|
|
llmMessages,
|
|
userMessage as Message,
|
|
cfg,
|
|
this.abortController.signal,
|
|
)) {
|
|
// Pass through all events directly
|
|
this.emit(ev as AgentEvent);
|
|
|
|
// Update internal state as needed
|
|
switch (ev.type) {
|
|
case "message_start": {
|
|
// Track streaming message
|
|
partial = ev.message;
|
|
this._state.streamMessage = ev.message;
|
|
break;
|
|
}
|
|
case "message_update": {
|
|
// Update streaming message
|
|
partial = ev.message;
|
|
this._state.streamMessage = ev.message;
|
|
break;
|
|
}
|
|
case "message_end": {
|
|
// Add completed message to state
|
|
partial = null;
|
|
this._state.streamMessage = null;
|
|
this.appendMessage(ev.message as AppMessage);
|
|
generatedMessages.push(ev.message as AppMessage);
|
|
break;
|
|
}
|
|
case "tool_execution_start": {
|
|
const s = new Set(this._state.pendingToolCalls);
|
|
s.add(ev.toolCallId);
|
|
this._state.pendingToolCalls = s;
|
|
break;
|
|
}
|
|
case "tool_execution_end": {
|
|
const s = new Set(this._state.pendingToolCalls);
|
|
s.delete(ev.toolCallId);
|
|
this._state.pendingToolCalls = s;
|
|
break;
|
|
}
|
|
case "turn_end": {
|
|
// Capture error from turn_end event
|
|
if (ev.message.role === "assistant" && ev.message.errorMessage) {
|
|
this._state.error = ev.message.errorMessage;
|
|
}
|
|
break;
|
|
}
|
|
case "agent_end": {
|
|
this._state.streamMessage = null;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Handle any remaining partial message
|
|
if (partial && partial.role === "assistant" && partial.content.length > 0) {
|
|
const onlyEmpty = !partial.content.some(
|
|
(c) =>
|
|
(c.type === "thinking" && c.thinking.trim().length > 0) ||
|
|
(c.type === "text" && c.text.trim().length > 0) ||
|
|
(c.type === "toolCall" && c.name.trim().length > 0),
|
|
);
|
|
if (!onlyEmpty) {
|
|
this.appendMessage(partial as AppMessage);
|
|
generatedMessages.push(partial as AppMessage);
|
|
} else {
|
|
if (this.abortController?.signal.aborted) {
|
|
throw new Error("Request was aborted");
|
|
}
|
|
}
|
|
}
|
|
} catch (err: any) {
|
|
const msg: Message = {
|
|
role: "assistant",
|
|
content: [{ type: "text", text: "" }],
|
|
api: model.api,
|
|
provider: model.provider,
|
|
model: model.id,
|
|
usage: {
|
|
input: 0,
|
|
output: 0,
|
|
cacheRead: 0,
|
|
cacheWrite: 0,
|
|
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
|
|
},
|
|
stopReason: this.abortController?.signal.aborted ? "aborted" : "error",
|
|
errorMessage: err?.message || String(err),
|
|
timestamp: Date.now(),
|
|
};
|
|
this.appendMessage(msg as AppMessage);
|
|
generatedMessages.push(msg as AppMessage);
|
|
this._state.error = err?.message || String(err);
|
|
} finally {
|
|
this._state.isStreaming = false;
|
|
this._state.streamMessage = null;
|
|
this._state.pendingToolCalls = new Set<string>();
|
|
this.abortController = undefined;
|
|
}
|
|
}
|
|
|
|
private emit(e: AgentEvent) {
|
|
for (const listener of this.listeners) {
|
|
listener(e);
|
|
}
|
|
}
|
|
}
|