Fix markdown streaming duplication by splitting newlines first

- Added string-width library for proper terminal column width calculation
- Fixed wrapLine() to split by newlines before wrapping (like Text component)
- Fixed Loader interval leak by stopping before container removal
- Changed loader message from 'Loading...' to 'Working...'
This commit is contained in:
Mario Zechner 2025-11-11 19:27:58 +01:00
parent 985f955ea0
commit c5083bb7cb
16 changed files with 429 additions and 372 deletions

View file

@ -87,33 +87,32 @@ export class Agent {
subscribe(fn: (e: AgentEvent) => void): () => void {
this.listeners.add(fn);
fn({ type: "state-update", state: this._state });
return () => this.listeners.delete(fn);
}
// State mutators
// State mutators - update internal state without emitting events
setSystemPrompt(v: string) {
this.patch({ systemPrompt: v });
this._state.systemPrompt = v;
}
setModel(m: typeof this._state.model) {
this.patch({ model: m });
this._state.model = m;
}
setThinkingLevel(l: ThinkingLevel) {
this.patch({ thinkingLevel: l });
this._state.thinkingLevel = l;
}
setTools(t: typeof this._state.tools) {
this.patch({ tools: t });
this._state.tools = t;
}
replaceMessages(ms: AppMessage[]) {
this.patch({ messages: ms.slice() });
this._state.messages = ms.slice();
}
appendMessage(m: AppMessage) {
this.patch({ messages: [...this._state.messages, m] });
this._state.messages = [...this._state.messages, m];
}
async queueMessage(m: AppMessage) {
@ -126,7 +125,7 @@ export class Agent {
}
clearMessages() {
this.patch({ messages: [] });
this._state.messages = [];
}
abort() {
@ -163,8 +162,12 @@ export class Agent {
};
this.abortController = new AbortController();
this.patch({ isStreaming: true, streamMessage: null, error: undefined });
this.emit({ type: "started" });
this._state.isStreaming = true;
this._state.streamMessage = null;
this._state.error = undefined;
// Emit agent_start
this.emit({ type: "agent_start" });
const reasoning =
this._state.thinkingLevel === "off"
@ -186,6 +189,9 @@ export class Agent {
},
};
// Track all messages generated in this prompt
const generatedMessages: AppMessage[] = [];
try {
let partial: Message | null = null;
@ -198,38 +204,51 @@ export class Agent {
cfg,
this.abortController.signal,
)) {
// Pass through all events directly
this.emit(ev as AgentEvent);
// Update internal state as needed
switch (ev.type) {
case "message_start":
case "message_update": {
case "message_start": {
// Track streaming message
partial = ev.message;
this.patch({ streamMessage: ev.message });
this._state.streamMessage = ev.message;
break;
}
case "message_update": {
// Update streaming message
partial = ev.message;
this._state.streamMessage = ev.message;
break;
}
case "message_end": {
// Add completed message to state
partial = null;
this._state.streamMessage = null;
this.appendMessage(ev.message as AppMessage);
this.patch({ streamMessage: null });
generatedMessages.push(ev.message as AppMessage);
break;
}
case "tool_execution_start": {
const s = new Set(this._state.pendingToolCalls);
s.add(ev.toolCallId);
this.patch({ pendingToolCalls: s });
this._state.pendingToolCalls = s;
break;
}
case "tool_execution_end": {
const s = new Set(this._state.pendingToolCalls);
s.delete(ev.toolCallId);
this.patch({ pendingToolCalls: s });
this._state.pendingToolCalls = s;
break;
}
case "agent_end": {
this.patch({ streamMessage: null });
this._state.streamMessage = null;
break;
}
}
}
// Handle any remaining partial message
if (partial && partial.role === "assistant" && partial.content.length > 0) {
const onlyEmpty = !partial.content.some(
(c) =>
@ -239,6 +258,7 @@ export class Agent {
);
if (!onlyEmpty) {
this.appendMessage(partial as AppMessage);
generatedMessages.push(partial as AppMessage);
} else {
if (this.abortController?.signal.aborted) {
throw new Error("Request was aborted");
@ -264,17 +284,17 @@ export class Agent {
timestamp: Date.now(),
};
this.appendMessage(msg as AppMessage);
this.patch({ error: err?.message || String(err) });
generatedMessages.push(msg as AppMessage);
this._state.error = err?.message || String(err);
} finally {
this.patch({ isStreaming: false, streamMessage: null, pendingToolCalls: new Set<string>() });
this._state.isStreaming = false;
this._state.streamMessage = null;
this._state.pendingToolCalls = new Set<string>();
this.abortController = undefined;
this.emit({ type: "completed" });
}
}
private patch(p: Partial<AgentState>): void {
this._state = { ...this._state, ...p };
this.emit({ type: "state-update", state: this._state });
// Emit agent_end with all generated messages
this.emit({ type: "agent_end", messages: generatedMessages });
}
}
private emit(e: AgentEvent) {

View file

@ -1,4 +1,11 @@
import type { AgentTool, AssistantMessage, Message, Model, UserMessage } from "@mariozechner/pi-ai";
import type {
AgentTool,
AssistantMessage,
AssistantMessageEvent,
Message,
Model,
UserMessage,
} from "@mariozechner/pi-ai";
/**
* Attachment type definition.
@ -71,5 +78,20 @@ export interface AgentState {
/**
* Events emitted by the Agent for UI updates.
* These events provide fine-grained lifecycle information for messages, turns, and tool executions.
*/
export type AgentEvent = { type: "state-update"; state: AgentState } | { type: "started" } | { type: "completed" };
export type AgentEvent =
// Agent lifecycle
| { type: "agent_start" }
| { type: "agent_end"; messages: AppMessage[] }
// Turn lifecycle - a turn is one assistant response + any tool calls/results
| { type: "turn_start" }
| { type: "turn_end"; message: AppMessage; toolResults: AppMessage[] }
// Message lifecycle - emitted for user, assistant, and toolResult messages
| { type: "message_start"; message: AppMessage }
// Only emitted for assistant messages during streaming
| { type: "message_update"; message: AppMessage; assistantMessageEvent: AssistantMessageEvent }
| { type: "message_end"; message: AppMessage }
// Tool execution lifecycle
| { type: "tool_execution_start"; toolCallId: string; toolName: string; args: any }
| { type: "tool_execution_end"; toolCallId: string; toolName: string; result: any; isError: boolean };