mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-15 23:01:30 +00:00
Fix bash execution interleaving with tool calls: defer message insertion while streaming
This commit is contained in:
parent
14d99b5f86
commit
2c014c1b95
1 changed files with 67 additions and 47 deletions
|
|
@ -107,6 +107,7 @@ export class AgentSession {
|
|||
|
||||
// Bash execution state
|
||||
private _bashAbortController: AbortController | null = null;
|
||||
private _pendingBashMessages: BashExecutionMessage[] = [];
|
||||
|
||||
constructor(config: AgentSessionConfig) {
|
||||
this.agent = config.agent;
|
||||
|
|
@ -120,6 +121,34 @@ export class AgentSession {
|
|||
// Event Subscription
|
||||
// =========================================================================
|
||||
|
||||
/** Internal handler for agent events - shared by subscribe and reconnect */
|
||||
private _handleAgentEvent = async (event: AgentEvent): Promise<void> => {
|
||||
// Notify all listeners
|
||||
for (const l of this._eventListeners) {
|
||||
l(event);
|
||||
}
|
||||
|
||||
// Handle session persistence
|
||||
if (event.type === "message_end") {
|
||||
this.sessionManager.saveMessage(event.message);
|
||||
|
||||
// Initialize session after first user+assistant exchange
|
||||
if (this.sessionManager.shouldInitializeSession(this.agent.state.messages)) {
|
||||
this.sessionManager.startSession(this.agent.state);
|
||||
}
|
||||
|
||||
// Check auto-compaction after assistant messages
|
||||
if (event.message.role === "assistant") {
|
||||
await this.checkAutoCompaction();
|
||||
}
|
||||
}
|
||||
|
||||
// Flush pending bash messages after agent turn completes
|
||||
if (event.type === "agent_end") {
|
||||
this._flushPendingBashMessages();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Subscribe to agent events.
|
||||
* Session persistence is handled internally (saves messages on message_end).
|
||||
|
|
@ -130,27 +159,7 @@ export class AgentSession {
|
|||
|
||||
// Set up agent subscription if not already done
|
||||
if (!this._unsubscribeAgent) {
|
||||
this._unsubscribeAgent = this.agent.subscribe(async (event) => {
|
||||
// Notify all listeners
|
||||
for (const l of this._eventListeners) {
|
||||
l(event);
|
||||
}
|
||||
|
||||
// Handle session persistence
|
||||
if (event.type === "message_end") {
|
||||
this.sessionManager.saveMessage(event.message);
|
||||
|
||||
// Initialize session after first user+assistant exchange
|
||||
if (this.sessionManager.shouldInitializeSession(this.agent.state.messages)) {
|
||||
this.sessionManager.startSession(this.agent.state);
|
||||
}
|
||||
|
||||
// Check auto-compaction after assistant messages
|
||||
if (event.message.role === "assistant") {
|
||||
await this.checkAutoCompaction();
|
||||
}
|
||||
}
|
||||
});
|
||||
this._unsubscribeAgent = this.agent.subscribe(this._handleAgentEvent);
|
||||
}
|
||||
|
||||
// Return unsubscribe function for this specific listener
|
||||
|
|
@ -180,25 +189,7 @@ export class AgentSession {
|
|||
*/
|
||||
private _reconnectToAgent(): void {
|
||||
if (this._unsubscribeAgent) return; // Already connected
|
||||
|
||||
this._unsubscribeAgent = this.agent.subscribe(async (event) => {
|
||||
for (const l of this._eventListeners) {
|
||||
l(event);
|
||||
}
|
||||
|
||||
if (event.type === "message_end") {
|
||||
this.sessionManager.saveMessage(event.message);
|
||||
|
||||
if (this.sessionManager.shouldInitializeSession(this.agent.state.messages)) {
|
||||
this.sessionManager.startSession(this.agent.state);
|
||||
}
|
||||
|
||||
// Check auto-compaction after assistant messages
|
||||
if (event.message.role === "assistant") {
|
||||
await this.checkAutoCompaction();
|
||||
}
|
||||
}
|
||||
});
|
||||
this._unsubscribeAgent = this.agent.subscribe(this._handleAgentEvent);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -673,15 +664,21 @@ export class AgentSession {
|
|||
timestamp: Date.now(),
|
||||
};
|
||||
|
||||
// Add to agent state
|
||||
this.agent.appendMessage(bashMessage);
|
||||
// If agent is streaming, defer adding to avoid breaking tool_use/tool_result ordering
|
||||
if (this.isStreaming) {
|
||||
// Queue for later - will be flushed on agent_end
|
||||
this._pendingBashMessages.push(bashMessage);
|
||||
} else {
|
||||
// Add to agent state immediately
|
||||
this.agent.appendMessage(bashMessage);
|
||||
|
||||
// Save to session
|
||||
this.sessionManager.saveMessage(bashMessage);
|
||||
// Save to session
|
||||
this.sessionManager.saveMessage(bashMessage);
|
||||
|
||||
// Initialize session if needed
|
||||
if (this.sessionManager.shouldInitializeSession(this.agent.state.messages)) {
|
||||
this.sessionManager.startSession(this.agent.state);
|
||||
// Initialize session if needed
|
||||
if (this.sessionManager.shouldInitializeSession(this.agent.state.messages)) {
|
||||
this.sessionManager.startSession(this.agent.state);
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
|
|
@ -702,6 +699,29 @@ export class AgentSession {
|
|||
return this._bashAbortController !== null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush pending bash messages to agent state and session.
|
||||
* Called after agent turn completes to maintain proper message ordering.
|
||||
*/
|
||||
private _flushPendingBashMessages(): void {
|
||||
if (this._pendingBashMessages.length === 0) return;
|
||||
|
||||
for (const bashMessage of this._pendingBashMessages) {
|
||||
// Add to agent state
|
||||
this.agent.appendMessage(bashMessage);
|
||||
|
||||
// Save to session
|
||||
this.sessionManager.saveMessage(bashMessage);
|
||||
}
|
||||
|
||||
// Initialize session if needed
|
||||
if (this.sessionManager.shouldInitializeSession(this.agent.state.messages)) {
|
||||
this.sessionManager.startSession(this.agent.state);
|
||||
}
|
||||
|
||||
this._pendingBashMessages = [];
|
||||
}
|
||||
|
||||
// =========================================================================
|
||||
// Session Management
|
||||
// =========================================================================
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue