WP3: Add AgentSession event subscription with session persistence

This commit is contained in:
Mario Zechner 2025-12-09 00:05:54 +01:00
parent 29d96ab25a
commit eba196f4ac
2 changed files with 93 additions and 4 deletions

View file

@ -340,9 +340,10 @@ private unsubscribeAll(): void {
**Verification:**
1. `npm run check` passes
- [ ] Add `subscribe()` method to AgentSession
- [ ] Add `unsubscribeAll()` private method
- [ ] Verify with `npm run check`
- [x] Add `subscribe()` method to AgentSession
- [x] Add `unsubscribeAll()` method
- [x] Add `resubscribe()` method
- [x] Verify with `npm run check`
---

View file

@ -13,12 +13,15 @@
* Modes use this class and add their own I/O layer on top.
*/
import type { Agent, AgentState, AppMessage, ThinkingLevel } from "@mariozechner/pi-agent-core";
import type { Agent, AgentEvent, AgentState, AppMessage, ThinkingLevel } from "@mariozechner/pi-agent-core";
import type { Model } from "@mariozechner/pi-ai";
import type { SessionManager } from "../session-manager.js";
import type { SettingsManager } from "../settings-manager.js";
import type { FileSlashCommand } from "../slash-commands.js";
/** Listener function for agent events */
export type AgentEventListener = (event: AgentEvent) => void;
// ============================================================================
// Types
// ============================================================================
@ -45,6 +48,10 @@ export class AgentSession {
private _scopedModels: Array<{ model: Model<any>; thinkingLevel: ThinkingLevel }>;
private _fileCommands: FileSlashCommand[];
// Event subscription state
private _unsubscribeAgent?: () => void;
private _eventListeners: AgentEventListener[] = [];
constructor(config: AgentSessionConfig) {
this.agent = config.agent;
this.sessionManager = config.sessionManager;
@ -53,6 +60,87 @@ export class AgentSession {
this._fileCommands = config.fileCommands ?? [];
}
// =========================================================================
// Event Subscription
// =========================================================================
/**
* Subscribe to agent events.
* Session persistence is handled internally (saves messages on message_end).
* Multiple listeners can be added. Returns unsubscribe function for this listener.
*/
subscribe(listener: AgentEventListener): () => void {
this._eventListeners.push(listener);
// Set up agent subscription if not already done
if (!this._unsubscribeAgent) {
this._unsubscribeAgent = this.agent.subscribe(async (event) => {
// Notify all listeners
for (const l of this._eventListeners) {
l(event);
}
// Handle session persistence
if (event.type === "message_end") {
this.sessionManager.saveMessage(event.message);
// Initialize session after first user+assistant exchange
if (this.sessionManager.shouldInitializeSession(this.agent.state.messages)) {
this.sessionManager.startSession(this.agent.state);
}
// Check auto-compaction after assistant messages
// (will be implemented in WP7)
// if (event.message.role === "assistant") {
// await this.checkAutoCompaction();
// }
}
});
}
// Return unsubscribe function for this specific listener
return () => {
const index = this._eventListeners.indexOf(listener);
if (index !== -1) {
this._eventListeners.splice(index, 1);
}
};
}
/**
* Unsubscribe from agent entirely and clear all listeners.
* Used during reset/cleanup operations.
*/
unsubscribeAll(): void {
if (this._unsubscribeAgent) {
this._unsubscribeAgent();
this._unsubscribeAgent = undefined;
}
this._eventListeners = [];
}
/**
* Re-subscribe to agent after unsubscribeAll.
* Call this after operations that require temporary unsubscription.
*/
resubscribe(): void {
if (this._unsubscribeAgent) return; // Already subscribed
this._unsubscribeAgent = this.agent.subscribe(async (event) => {
for (const l of this._eventListeners) {
l(event);
}
if (event.type === "message_end") {
this.sessionManager.saveMessage(event.message);
if (this.sessionManager.shouldInitializeSession(this.agent.state.messages)) {
this.sessionManager.startSession(this.agent.state);
}
}
});
}
// =========================================================================
// Read-only State Access
// =========================================================================