co-mono/packages/agent/src/agent.ts
Mario Zechner 5a9d844f9a Simplify compaction: remove proactive abort, use Agent.continue() for retry
- Add agentLoopContinue() to pi-ai for resuming from existing context
- Add Agent.continue() method and transport.continue() interface
- Simplify AgentSession compaction to two cases: overflow (auto-retry) and threshold (no retry)
- Remove proactive mid-turn compaction abort
- Merge turn prefix summary into main summary
- Add isCompacting property to AgentSession and RPC state
- Block input during compaction in interactive mode
- Show compaction count on session resume
- Rename RPC.md to rpc.md for consistency

Related to #128
2025-12-09 21:43:49 +01:00

406 lines
11 KiB
TypeScript

import type { ImageContent, Message, QueuedMessage, ReasoningEffort, TextContent } from "@mariozechner/pi-ai";
import { getModel } from "@mariozechner/pi-ai";
import type { AgentTransport } from "./transports/types.js";
import type { AgentEvent, AgentState, AppMessage, Attachment, ThinkingLevel } from "./types.js";
/**
* Default message transformer: Keep only LLM-compatible messages, strip app-specific fields.
* Converts attachments to proper content blocks (images → ImageContent, documents → TextContent).
*/
function defaultMessageTransformer(messages: AppMessage[]): Message[] {
return messages
.filter((m) => {
// Only keep standard LLM message roles
return m.role === "user" || m.role === "assistant" || m.role === "toolResult";
})
.map((m) => {
if (m.role === "user") {
const { attachments, ...rest } = m as any;
// If no attachments, return as-is
if (!attachments || attachments.length === 0) {
return rest as Message;
}
// Convert attachments to content blocks
const content = Array.isArray(rest.content) ? [...rest.content] : [{ type: "text", text: rest.content }];
for (const attachment of attachments as Attachment[]) {
// Add image blocks for image attachments
if (attachment.type === "image") {
content.push({
type: "image",
data: attachment.content,
mimeType: attachment.mimeType,
} as ImageContent);
}
// Add text blocks for documents with extracted text
else if (attachment.type === "document" && attachment.extractedText) {
content.push({
type: "text",
text: `\n\n[Document: ${attachment.fileName}]\n${attachment.extractedText}`,
isDocument: true,
} as TextContent);
}
}
return { ...rest, content } as Message;
}
return m as Message;
});
}
export interface AgentOptions {
initialState?: Partial<AgentState>;
transport: AgentTransport;
// Transform app messages to LLM-compatible messages before sending to transport
messageTransformer?: (messages: AppMessage[]) => Message[] | Promise<Message[]>;
// Queue mode: "all" = send all queued messages at once, "one-at-a-time" = send one queued message per turn
queueMode?: "all" | "one-at-a-time";
}
export class Agent {
private _state: AgentState = {
systemPrompt: "",
model: getModel("google", "gemini-2.5-flash-lite-preview-06-17"),
thinkingLevel: "off",
tools: [],
messages: [],
isStreaming: false,
streamMessage: null,
pendingToolCalls: new Set<string>(),
error: undefined,
};
private listeners = new Set<(e: AgentEvent) => void>();
private abortController?: AbortController;
private transport: AgentTransport;
private messageTransformer: (messages: AppMessage[]) => Message[] | Promise<Message[]>;
private messageQueue: Array<QueuedMessage<AppMessage>> = [];
private queueMode: "all" | "one-at-a-time";
private runningPrompt?: Promise<void>;
private resolveRunningPrompt?: () => void;
constructor(opts: AgentOptions) {
this._state = { ...this._state, ...opts.initialState };
this.transport = opts.transport;
this.messageTransformer = opts.messageTransformer || defaultMessageTransformer;
this.queueMode = opts.queueMode || "one-at-a-time";
}
get state(): AgentState {
return this._state;
}
subscribe(fn: (e: AgentEvent) => void): () => void {
this.listeners.add(fn);
return () => this.listeners.delete(fn);
}
// State mutators - update internal state without emitting events
setSystemPrompt(v: string) {
this._state.systemPrompt = v;
}
setModel(m: typeof this._state.model) {
this._state.model = m;
}
setThinkingLevel(l: ThinkingLevel) {
this._state.thinkingLevel = l;
}
setQueueMode(mode: "all" | "one-at-a-time") {
this.queueMode = mode;
}
getQueueMode(): "all" | "one-at-a-time" {
return this.queueMode;
}
setTools(t: typeof this._state.tools) {
this._state.tools = t;
}
replaceMessages(ms: AppMessage[]) {
this._state.messages = ms.slice();
}
appendMessage(m: AppMessage) {
this._state.messages = [...this._state.messages, m];
}
async queueMessage(m: AppMessage) {
// Transform message and queue it for injection at next turn
const transformed = await this.messageTransformer([m]);
this.messageQueue.push({
original: m,
llm: transformed[0], // undefined if filtered out
});
}
clearMessageQueue() {
this.messageQueue = [];
}
clearMessages() {
this._state.messages = [];
}
abort() {
this.abortController?.abort();
}
/**
* Returns a promise that resolves when the current prompt completes.
* Returns immediately resolved promise if no prompt is running.
*/
waitForIdle(): Promise<void> {
return this.runningPrompt ?? Promise.resolve();
}
/**
* Clear all messages and state. Call abort() first if a prompt is in flight.
*/
reset() {
this._state.messages = [];
this._state.isStreaming = false;
this._state.streamMessage = null;
this._state.pendingToolCalls = new Set<string>();
this._state.error = undefined;
this.messageQueue = [];
}
async prompt(input: string, attachments?: Attachment[]) {
const model = this._state.model;
if (!model) {
throw new Error("No model configured");
}
// Build user message with attachments
const content: Array<TextContent | ImageContent> = [{ type: "text", text: input }];
if (attachments?.length) {
for (const a of attachments) {
if (a.type === "image") {
content.push({ type: "image", data: a.content, mimeType: a.mimeType });
} else if (a.type === "document" && a.extractedText) {
content.push({
type: "text",
text: `\n\n[Document: ${a.fileName}]\n${a.extractedText}`,
isDocument: true,
} as TextContent);
}
}
}
const userMessage: AppMessage = {
role: "user",
content,
attachments: attachments?.length ? attachments : undefined,
timestamp: Date.now(),
};
await this._runAgentLoop(userMessage);
}
/**
* Continue from the current context without adding a new user message.
* Used for retry after overflow recovery when context already has user message or tool results.
*/
async continue() {
const messages = this._state.messages;
if (messages.length === 0) {
throw new Error("No messages to continue from");
}
const lastMessage = messages[messages.length - 1];
if (lastMessage.role !== "user" && lastMessage.role !== "toolResult") {
throw new Error(`Cannot continue from message role: ${lastMessage.role}`);
}
await this._runAgentLoopContinue();
}
/**
* Internal: Run the agent loop with a new user message.
*/
private async _runAgentLoop(userMessage: AppMessage) {
const { llmMessages, cfg } = await this._prepareRun();
const events = this.transport.run(llmMessages, userMessage as Message, cfg, this.abortController!.signal);
await this._processEvents(events);
}
/**
* Internal: Continue the agent loop from current context.
*/
private async _runAgentLoopContinue() {
const { llmMessages, cfg } = await this._prepareRun();
const events = this.transport.continue(llmMessages, cfg, this.abortController!.signal);
await this._processEvents(events);
}
/**
* Prepare for running the agent loop.
*/
private async _prepareRun() {
const model = this._state.model;
if (!model) {
throw new Error("No model configured");
}
this.runningPrompt = new Promise<void>((resolve) => {
this.resolveRunningPrompt = resolve;
});
this.abortController = new AbortController();
this._state.isStreaming = true;
this._state.streamMessage = null;
this._state.error = undefined;
const reasoning: ReasoningEffort | undefined =
this._state.thinkingLevel === "off"
? undefined
: this._state.thinkingLevel === "minimal"
? "low"
: this._state.thinkingLevel;
const cfg = {
systemPrompt: this._state.systemPrompt,
tools: this._state.tools,
model,
reasoning,
getQueuedMessages: async <T>() => {
if (this.queueMode === "one-at-a-time") {
if (this.messageQueue.length > 0) {
const first = this.messageQueue[0];
this.messageQueue = this.messageQueue.slice(1);
return [first] as QueuedMessage<T>[];
}
return [];
} else {
const queued = this.messageQueue.slice();
this.messageQueue = [];
return queued as QueuedMessage<T>[];
}
},
};
const llmMessages = await this.messageTransformer(this._state.messages);
return { llmMessages, cfg, model };
}
/**
* Process events from the transport.
*/
private async _processEvents(events: AsyncIterable<AgentEvent>) {
const model = this._state.model!;
const generatedMessages: AppMessage[] = [];
let partial: AppMessage | null = null;
try {
for await (const ev of events) {
switch (ev.type) {
case "message_start": {
partial = ev.message as AppMessage;
this._state.streamMessage = ev.message as Message;
break;
}
case "message_update": {
partial = ev.message as AppMessage;
this._state.streamMessage = ev.message as Message;
break;
}
case "message_end": {
partial = null;
this._state.streamMessage = null;
this.appendMessage(ev.message as AppMessage);
generatedMessages.push(ev.message as AppMessage);
break;
}
case "tool_execution_start": {
const s = new Set(this._state.pendingToolCalls);
s.add(ev.toolCallId);
this._state.pendingToolCalls = s;
break;
}
case "tool_execution_end": {
const s = new Set(this._state.pendingToolCalls);
s.delete(ev.toolCallId);
this._state.pendingToolCalls = s;
break;
}
case "turn_end": {
if (ev.message.role === "assistant" && ev.message.errorMessage) {
this._state.error = ev.message.errorMessage;
}
break;
}
case "agent_end": {
this._state.streamMessage = null;
break;
}
}
this.emit(ev as AgentEvent);
}
// Handle any remaining partial message
if (partial && partial.role === "assistant" && partial.content.length > 0) {
const onlyEmpty = !partial.content.some(
(c) =>
(c.type === "thinking" && c.thinking.trim().length > 0) ||
(c.type === "text" && c.text.trim().length > 0) ||
(c.type === "toolCall" && c.name.trim().length > 0),
);
if (!onlyEmpty) {
this.appendMessage(partial as AppMessage);
generatedMessages.push(partial as AppMessage);
} else {
if (this.abortController?.signal.aborted) {
throw new Error("Request was aborted");
}
}
}
} catch (err: any) {
const msg: Message = {
role: "assistant",
content: [{ type: "text", text: "" }],
api: model.api,
provider: model.provider,
model: model.id,
usage: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
totalTokens: 0,
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
},
stopReason: this.abortController?.signal.aborted ? "aborted" : "error",
errorMessage: err?.message || String(err),
timestamp: Date.now(),
};
this.appendMessage(msg as AppMessage);
generatedMessages.push(msg as AppMessage);
this._state.error = err?.message || String(err);
} finally {
this._state.isStreaming = false;
this._state.streamMessage = null;
this._state.pendingToolCalls = new Set<string>();
this.abortController = undefined;
this.resolveRunningPrompt?.();
this.runningPrompt = undefined;
this.resolveRunningPrompt = undefined;
}
}
private emit(e: AgentEvent) {
for (const listener of this.listeners) {
listener(e);
}
}
}