From bef2e84d0ca2e0efecc0b68fdf6cc59dc19b81dc Mon Sep 17 00:00:00 2001 From: Franklin Date: Fri, 6 Feb 2026 16:54:53 -0500 Subject: [PATCH] wip: pi working --- docs/conversion.mdx | 5 + docs/inspector.mdx | 1 + docs/openapi.json | 4 + docs/pi-support-plan.md | 10 + .../sandbox-agent/src/opencode_compat.rs | 9 + server/packages/sandbox-agent/src/router.rs | 1334 ++++++++++++++++- .../tests/agent-flows/pi_rpc_integration.rs | 240 +++ .../universal-agent-schema/src/agents/pi.rs | 140 +- .../tests/pi_conversion.rs | 43 + 9 files changed, 1747 insertions(+), 39 deletions(-) diff --git a/docs/conversion.mdx b/docs/conversion.mdx index 0c7b05c..72daabb 100644 --- a/docs/conversion.mdx +++ b/docs/conversion.mdx @@ -84,4 +84,9 @@ Message normalization notes - If Pi message_update events omit messageId, we synthesize a stable message id and emit a synthetic item.started before the first delta so streaming text stays grouped. - Pi auto_compaction_start/auto_compaction_end and auto_retry_start/auto_retry_end events are mapped to status items (label `pi.*`). - Pi extension_ui_request/extension_error events are mapped to status items. +<<<<<<< Updated upstream - Pi RPC from pi-coding-agent does not include sessionId in events; each daemon session owns a dedicated Pi RPC process, so events are routed by runtime ownership (parallel sessions supported). +======= +- Pi runtime mode is capability-selected: default is per-session process isolation, while shared multiplexing is used only for allowlisted Pi capabilities. +- In shared mode, pi-coding-agent events without sessionId are routed using the current-session mapping. +>>>>>>> Stashed changes diff --git a/docs/inspector.mdx b/docs/inspector.mdx index 8e80c22..4c0aad8 100644 --- a/docs/inspector.mdx +++ b/docs/inspector.mdx @@ -35,6 +35,7 @@ console.log(url); - **Send messages**: Post messages to a session directly from the UI - **Agent selection**: Switch between agents and modes - **Request log**: View raw HTTP requests and responses for debugging +- **Pi concurrent sessions**: Pi sessions run concurrently by default via per-session runtime processes ## When to Use diff --git a/docs/openapi.json b/docs/openapi.json index bbe1c2e..57059f7 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -10,7 +10,11 @@ "license": { "name": "Apache-2.0" }, +<<<<<<< Updated upstream "version": "0.1.7" +======= + "version": "0.1.6" +>>>>>>> Stashed changes }, "servers": [ { diff --git a/docs/pi-support-plan.md b/docs/pi-support-plan.md index 43f404e..5e207a5 100644 --- a/docs/pi-support-plan.md +++ b/docs/pi-support-plan.md @@ -1,5 +1,15 @@ # Pi Agent Support Plan (pi-mono) +## Implementation Status Update + +- Runtime selection now supports two internal modes: + - `PerSession` (default for unknown/non-allowlisted Pi capabilities) + - `Shared` (allowlist-only compatibility path) +- Pi sessions now use per-session process isolation by default, enabling true concurrent Pi sessions in Inspector and API clients. +- Shared Pi server code remains available and is used only when capability checks allow multiplexing. +- Session termination for per-session Pi mode hard-kills the underlying Pi process and clears queued prompts/pending waiters. +- In-session concurrent sends are serialized with an unbounded daemon-side FIFO queue per session. + ## Investigation Summary ### Pi CLI modes and RPC protocol diff --git a/server/packages/sandbox-agent/src/opencode_compat.rs b/server/packages/sandbox-agent/src/opencode_compat.rs index 1393f94..276577f 100644 --- a/server/packages/sandbox-agent/src/opencode_compat.rs +++ b/server/packages/sandbox-agent/src/opencode_compat.rs @@ -2628,8 +2628,12 @@ pub fn build_opencode_router(state: Arc) -> Router { responses((status = 200)), tag = "opencode" )] +<<<<<<< Updated upstream async fn oc_agent_list(State(state): State>) -> impl IntoResponse { let name = state.inner.branding.product_name(); +======= +async fn oc_agent_list(State(_state): State>) -> impl IntoResponse { +>>>>>>> Stashed changes let agent = json!({ "name": name, "description": format!("{name} compatibility layer"), @@ -4274,6 +4278,7 @@ async fn oc_file_list() -> impl IntoResponse { tag = "opencode" )] async fn oc_file_content(Query(query): Query) -> impl IntoResponse { + let _directory = query.directory.as_deref(); if query.path.is_none() { return bad_request("path is required").into_response(); } @@ -4304,6 +4309,7 @@ async fn oc_file_status() -> impl IntoResponse { tag = "opencode" )] async fn oc_find_text(Query(query): Query) -> impl IntoResponse { + let _directory = query.directory.as_deref(); if query.pattern.is_none() { return bad_request("pattern is required").into_response(); } @@ -4317,6 +4323,7 @@ async fn oc_find_text(Query(query): Query) -> impl IntoResponse { tag = "opencode" )] async fn oc_find_files(Query(query): Query) -> impl IntoResponse { + let _directory = query.directory.as_deref(); if query.query.is_none() { return bad_request("query is required").into_response(); } @@ -4330,6 +4337,7 @@ async fn oc_find_files(Query(query): Query) -> impl IntoResponse tag = "opencode" )] async fn oc_find_symbols(Query(query): Query) -> impl IntoResponse { + let _directory = query.directory.as_deref(); if query.query.is_none() { return bad_request("query is required").into_response(); } @@ -4446,6 +4454,7 @@ async fn oc_tool_ids() -> impl IntoResponse { tag = "opencode" )] async fn oc_tool_list(Query(query): Query) -> impl IntoResponse { + let _directory = query.directory.as_deref(); if query.provider.is_none() || query.model.is_none() { return bad_request("provider and model are required").into_response(); } diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index 8ab2187..f1d46c9 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -22,11 +22,19 @@ use reqwest::Client; use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError}; use sandbox_agent_universal_agent_schema::{ codex as codex_schema, convert_amp, convert_claude, convert_codex, convert_opencode, +<<<<<<< Updated upstream convert_pi, pi as pi_schema, turn_completed_event, AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource, FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason, SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent, UniversalEventData, UniversalEventType, UniversalItem, +======= + convert_pi, pi as pi_schema, AgentUnparsedData, ContentPart, ErrorData, EventConversion, + EventSource, FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, + PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, + SessionEndReason, SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, + UniversalEvent, UniversalEventData, UniversalEventType, UniversalItem, +>>>>>>> Stashed changes }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -368,7 +376,12 @@ struct SessionState { model: Option, variant: Option, native_session_id: Option, +<<<<<<< Updated upstream pi_runtime: Option>, +======= + pi_runtime_mode: Option, + pi_state: Option, +>>>>>>> Stashed changes ended: bool, ended_exit_code: Option, ended_message: Option, @@ -427,7 +440,12 @@ impl SessionState { model: request.model.clone(), variant: request.variant.clone(), native_session_id: None, +<<<<<<< Updated upstream pi_runtime: None, +======= + pi_runtime_mode: None, + pi_state: None, +>>>>>>> Stashed changes ended: false, ended_exit_code: None, ended_message: None, @@ -963,9 +981,327 @@ impl CodexServer { } } +<<<<<<< Updated upstream /// Long-lived Pi RPC process dedicated to exactly one daemon session. struct PiSessionRuntime { /// Sender for writing to the process stdin. +======= +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PiRpcDialect { + Mono, + CodingAgent, +} + +fn detect_pi_dialect(path: &PathBuf) -> PiRpcDialect { + let mut file = match std::fs::File::open(path) { + Ok(file) => file, + Err(_) => return PiRpcDialect::Mono, + }; + let mut buffer = [0u8; 256]; + let read = match file.read(&mut buffer) { + Ok(read) => read, + Err(_) => return PiRpcDialect::Mono, + }; + if read == 0 { + return PiRpcDialect::Mono; + } + let sample = &buffer[..read]; + if sample.starts_with(b"#!") { + return PiRpcDialect::CodingAgent; + } + if sample.iter().any(|byte| *byte == 0) { + return PiRpcDialect::Mono; + } + let is_text = sample.iter().all(|byte| byte.is_ascii()); + if is_text { + return PiRpcDialect::CodingAgent; + } + PiRpcDialect::Mono +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PiRuntimeMode { + Shared, + PerSession, +} + +impl PiRuntimeMode { + fn as_str(self) -> &'static str { + match self { + PiRuntimeMode::Shared => "shared", + PiRuntimeMode::PerSession => "per_session", + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +struct ParsedPiVersion { + major: u64, + minor: u64, + patch: u64, +} + +impl ParsedPiVersion { + fn parse(raw: &str) -> Option { + let trimmed = raw.trim(); + let start = trimmed.find(|ch: char| ch.is_ascii_digit())?; + let version_like = &trimmed[start..]; + let end = version_like + .find(|ch: char| !(ch.is_ascii_digit() || ch == '.')) + .unwrap_or(version_like.len()); + let core = &version_like[..end]; + if core.is_empty() { + return None; + } + let mut parts = core.split('.'); + let major = parts.next()?.parse::().ok()?; + let minor = parts.next().unwrap_or("0").parse::().ok()?; + let patch = parts.next().unwrap_or("0").parse::().ok()?; + Some(Self { + major, + minor, + patch, + }) + } +} + +#[derive(Debug, Clone, Copy)] +enum PiVersionConstraint { + Range { + min: ParsedPiVersion, + max_exclusive: Option, + }, +} + +impl PiVersionConstraint { + fn matches(self, version: ParsedPiVersion) -> bool { + match self { + PiVersionConstraint::Range { min, max_exclusive } => { + if version < min { + return false; + } + if let Some(max_exclusive) = max_exclusive { + version < max_exclusive + } else { + true + } + } + } + } +} + +#[derive(Debug, Clone, Copy)] +struct PiCapabilityAllowlistEntry { + dialect: PiRpcDialect, + version: PiVersionConstraint, + events_include_session_id: bool, + supports_multi_session_rpc: bool, +} + +const PI_SHARED_ALLOWLIST: &[PiCapabilityAllowlistEntry] = &[PiCapabilityAllowlistEntry { + dialect: PiRpcDialect::Mono, + version: PiVersionConstraint::Range { + min: ParsedPiVersion { + major: 0, + minor: 0, + patch: 0, + }, + max_exclusive: None, + }, + events_include_session_id: true, + supports_multi_session_rpc: true, +}]; + +#[derive(Debug, Clone)] +struct PiCapabilityKey { + dialect: PiRpcDialect, + version_raw: Option, + parsed_version: Option, + events_include_session_id: bool, + supports_multi_session_rpc: bool, +} + +impl PiCapabilityKey { + fn from_detected(dialect: PiRpcDialect, version_raw: Option) -> Self { + let parsed_version = version_raw.as_deref().and_then(ParsedPiVersion::parse); + Self { + dialect, + version_raw, + parsed_version, + events_include_session_id: matches!(dialect, PiRpcDialect::Mono), + supports_multi_session_rpc: matches!(dialect, PiRpcDialect::Mono), + } + } +} + +#[derive(Debug, Clone)] +struct PiRuntimeSelection { + mode: PiRuntimeMode, + capability: PiCapabilityKey, + reason: String, +} + +fn supports_shared_multiplexing(capability: &PiCapabilityKey) -> bool { + let Some(version) = capability.parsed_version else { + return false; + }; + PI_SHARED_ALLOWLIST.iter().any(|entry| { + entry.dialect == capability.dialect + && entry.version.matches(version) + && (!entry.events_include_session_id || capability.events_include_session_id) + && (!entry.supports_multi_session_rpc || capability.supports_multi_session_rpc) + }) +} + +fn pi_runtime_mode_override() -> Option { + let value = std::env::var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE").ok()?; + match value.trim().to_ascii_lowercase().as_str() { + "shared" => Some(PiRuntimeMode::Shared), + "per-session" | "per_session" | "persession" => Some(PiRuntimeMode::PerSession), + _ => None, + } +} + +#[derive(Debug)] +struct QueuedPiPrompt { + prompt: String, +} + +#[derive(Clone)] +struct SessionPiRuntime { + stdin_sender: mpsc::UnboundedSender, + pending_requests: Arc>>>, + next_id: Arc, + converter: Arc>, + child: Arc>>, +} + +impl std::fmt::Debug for SessionPiRuntime { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let pending = self + .pending_requests + .lock() + .map(|pending| pending.len()) + .unwrap_or(0); + f.debug_struct("SessionPiRuntime") + .field("pending_requests", &pending) + .field("next_id", &self.next_id.load(Ordering::SeqCst)) + .finish() + } +} + +impl SessionPiRuntime { + fn next_request_id(&self) -> i64 { + self.next_id.fetch_add(1, Ordering::SeqCst) + } + + fn send_request( + &self, + id: i64, + request: &Value, + ) -> Result, SandboxError> { + let line = serde_json::to_string(request).map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + let (tx, rx) = oneshot::channel(); + { + let mut pending = self.pending_requests.lock().unwrap(); + pending.insert(id, tx); + } + if self.stdin_sender.send(line).is_err() { + let mut pending = self.pending_requests.lock().unwrap(); + pending.remove(&id); + return Err(SandboxError::StreamError { + message: "pi session stdin unavailable".to_string(), + }); + } + Ok(rx) + } + + fn complete_request(&self, id: i64, result: Value) { + let tx = { + let mut pending = self.pending_requests.lock().unwrap(); + pending.remove(&id) + }; + if let Some(tx) = tx { + let _ = tx.send(result); + } + } + + fn clear_pending_requests(&self) { + let mut pending = self.pending_requests.lock().unwrap(); + pending.clear(); + } + + fn kill_child(&self) { + if let Ok(mut guard) = self.child.lock() { + if let Some(child) = guard.as_mut() { + let _ = child.kill(); + } + *guard = None; + } + } +} + +struct SessionPiState { + runtime: SessionPiRuntime, + dialect: PiRpcDialect, + output_loop_running: bool, + turn_queue: VecDeque, + turn_active: bool, +} + +impl std::fmt::Debug for SessionPiState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SessionPiState") + .field("dialect", &self.dialect) + .field("output_loop_running", &self.output_loop_running) + .field("turn_queue_len", &self.turn_queue.len()) + .field("turn_active", &self.turn_active) + .finish() + } +} + +impl SessionPiState { + fn new(runtime: SessionPiRuntime, dialect: PiRpcDialect) -> Self { + Self { + runtime, + dialect, + output_loop_running: true, + turn_queue: VecDeque::new(), + turn_active: false, + } + } + + fn enqueue_prompt(&mut self, prompt: String) { + self.turn_queue.push_back(QueuedPiPrompt { prompt }); + } + + fn begin_next_turn_if_idle(&mut self) -> Option { + if self.turn_active { + return None; + } + let next = self.turn_queue.pop_front()?; + self.turn_active = true; + Some(next) + } + + fn mark_turn_complete(&mut self) { + self.turn_active = false; + } + + fn clear_for_terminate(&mut self) { + self.turn_queue.clear(); + self.turn_active = false; + self.output_loop_running = false; + self.runtime.clear_pending_requests(); + } +} + +/// Shared Pi RPC process that multiplexes sessions via newline-delimited JSON. +struct PiServer { + /// Sender for writing to the process stdin +>>>>>>> Stashed changes stdin_sender: mpsc::UnboundedSender, /// Pending RPC requests awaiting responses, keyed by request ID. pending_requests: std::sync::Mutex>>, @@ -1493,6 +1829,98 @@ impl AgentServerManager { )) } +<<<<<<< Updated upstream +======= + async fn spawn_pi_server( + self: &Arc, + ) -> Result< + ( + Arc, + mpsc::UnboundedReceiver, + Arc>>, + ), + SandboxError, + > { + let manager = self.agent_manager.clone(); + let log_dir = self.log_base_dir.clone(); + let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::(); + let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::(); + + let child = tokio::task::spawn_blocking( + move || -> Result<(std::process::Child, PiRpcDialect), SandboxError> { + let path = manager + .resolve_binary(AgentId::Pi) + .map_err(|err| map_spawn_error(AgentId::Pi, err))?; + let dialect = detect_pi_dialect(&path); + let mut command = std::process::Command::new(path); + let stderr = AgentServerLogs::new(log_dir, AgentId::Pi.as_str()).open()?; + command + .arg("--mode") + .arg("rpc") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(stderr); + + let mut child = command.spawn().map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + + let stdin = child + .stdin + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "pi stdin unavailable".to_string(), + })?; + let stdout = child + .stdout + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "pi stdout unavailable".to_string(), + })?; + + let stdin_rx_mut = std::sync::Mutex::new(stdin_rx); + std::thread::spawn(move || { + let mut stdin = stdin; + let mut rx = stdin_rx_mut.lock().unwrap(); + while let Some(line) = rx.blocking_recv() { + if writeln!(stdin, "{line}").is_err() { + break; + } + if stdin.flush().is_err() { + break; + } + } + }); + + std::thread::spawn(move || { + let reader = BufReader::new(stdout); + for line in reader.lines() { + let Ok(line) = line else { break }; + if stdout_tx.send(line).is_err() { + break; + } + } + }); + + Ok((child, dialect)) + }, + ) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })??; + + let (child, dialect) = child; + let server = Arc::new(PiServer::new(stdin_tx, dialect)); + + Ok(( + server, + stdout_rx, + Arc::new(std::sync::Mutex::new(Some(child))), + )) + } + +>>>>>>> Stashed changes fn spawn_monitor_task( self: &Arc, agent: AgentId, @@ -1781,16 +2209,51 @@ impl SessionManager { model: session.model.clone(), variant: session.variant.clone(), native_session_id: None, + pi_runtime_mode: None, }; let thread_id = self.create_codex_thread(&session_id, &snapshot).await?; session.native_session_id = Some(thread_id); } if agent_id == AgentId::Pi { +<<<<<<< Updated upstream let pi = self .create_pi_session(&session_id, session.model.as_deref()) .await?; session.native_session_id = Some(pi.native_session_id); session.pi_runtime = Some(pi.runtime); +======= + let selection = self.select_pi_runtime_mode().await?; + tracing::debug!( + dialect = ?selection.capability.dialect, + version = ?selection.capability.version_raw, + runtime_mode = selection.mode.as_str(), + reason = selection.reason.as_str(), + "selected Pi runtime mode" + ); + session.pi_runtime_mode = Some(selection.mode); + match selection.mode { + PiRuntimeMode::Shared => { + let snapshot = SessionSnapshot { + session_id: session_id.clone(), + agent: agent_id, + agent_mode: session.agent_mode.clone(), + permission_mode: session.permission_mode.clone(), + model: session.model.clone(), + variant: session.variant.clone(), + native_session_id: None, + pi_runtime_mode: Some(PiRuntimeMode::Shared), + }; + let native_id = self.create_pi_session(&session_id, &snapshot).await?; + session.native_session_id = Some(native_id); + } + PiRuntimeMode::PerSession => { + let (native_id, pi_state) = + self.create_pi_session_per_session(&session_id).await?; + session.native_session_id = Some(native_id); + session.pi_state = Some(pi_state); + } + } +>>>>>>> Stashed changes } if agent_id == AgentId::Mock { session.native_session_id = Some(format!("mock-{session_id}")); @@ -1828,12 +2291,23 @@ impl SessionManager { } let native_session_id = session.native_session_id.clone(); +<<<<<<< Updated upstream let telemetry_agent_mode = session.agent_mode.clone(); let telemetry_permission_mode = session.permission_mode.clone(); let mut sessions = self.sessions.lock().await; sessions.push(session); drop(sessions); if agent_id == AgentId::Opencode || agent_id == AgentId::Codex { +======= + let pi_runtime_mode = session.pi_runtime_mode; + let mut sessions = self.sessions.lock().await; + sessions.push(session); + drop(sessions); + let should_register_pi_shared = + agent_id == AgentId::Pi && pi_runtime_mode == Some(PiRuntimeMode::Shared); + if agent_id == AgentId::Opencode || agent_id == AgentId::Codex || should_register_pi_shared + { +>>>>>>> Stashed changes self.server_manager .register_session(agent_id, &session_id, native_session_id.as_deref()) .await; @@ -1964,7 +2438,18 @@ impl SessionManager { return Ok(()); } if session_snapshot.agent == AgentId::Pi { - self.send_pi_prompt(&session_snapshot, &message).await?; + match session_snapshot + .pi_runtime_mode + .unwrap_or(PiRuntimeMode::Shared) + { + PiRuntimeMode::Shared => { + self.send_pi_prompt(&session_snapshot, &message).await?; + } + PiRuntimeMode::PerSession => { + self.enqueue_pi_prompt(&session_snapshot.session_id, message.clone()) + .await?; + } + } if !agent_supports_item_started(session_snapshot.agent) { let _ = self .emit_synthetic_assistant_start(&session_snapshot.session_id) @@ -2080,12 +2565,34 @@ impl SessionManager { session.record_conversions(vec![ended]); let agent = session.agent; let native_session_id = session.native_session_id.clone(); +<<<<<<< Updated upstream let pi_runtime = session.pi_runtime.take(); drop(sessions); if let Some(runtime) = pi_runtime { runtime.shutdown(); } if agent == AgentId::Opencode || agent == AgentId::Codex { +======= + let pi_runtime_mode = session.pi_runtime_mode; + let per_session_runtime = + if agent == AgentId::Pi && pi_runtime_mode == Some(PiRuntimeMode::PerSession) { + if let Some(pi_state) = session.pi_state.as_mut() { + pi_state.clear_for_terminate(); + Some(pi_state.runtime.clone()) + } else { + None + } + } else { + None + }; + drop(sessions); + if let Some(runtime) = per_session_runtime { + runtime.kill_child(); + } + let should_unregister_shared_pi = + agent == AgentId::Pi && pi_runtime_mode == Some(PiRuntimeMode::Shared); + if agent == AgentId::Opencode || agent == AgentId::Codex || should_unregister_shared_pi { +>>>>>>> Stashed changes self.server_manager .unregister_session(agent, &session_id, native_session_id.as_deref()) .await; @@ -2964,6 +3471,11 @@ impl SessionManager { reason.clone(), terminated_by.clone(), ); + if let Some(pi_state) = session.pi_state.as_mut() { + pi_state.turn_active = false; + pi_state.turn_queue.clear(); + pi_state.runtime.clear_pending_requests(); + } let (error_message, error_exit_code, error_stderr) = if reason == SessionEndReason::Error { (Some(message.to_string()), exit_code, stderr) @@ -3418,6 +3930,7 @@ impl SessionManager { Ok(()) } +<<<<<<< Updated upstream fn apply_pi_model_args(command: &mut std::process::Command, model: Option<&str>) { let Some(model) = model else { return; @@ -3429,6 +3942,608 @@ impl SessionManager { .arg("--model") .arg(model_id); return; +======= + async fn select_pi_runtime_mode(&self) -> Result { + let manager = self.agent_manager.clone(); + let (dialect, version_raw) = tokio::task::spawn_blocking( + move || -> Result<(PiRpcDialect, Option), ManagerError> { + let path = manager.resolve_binary(AgentId::Pi)?; + let dialect = detect_pi_dialect(&path); + let version_raw = manager.version(AgentId::Pi).ok().flatten(); + Ok((dialect, version_raw)) + }, + ) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })? + .map_err(|err| map_spawn_error(AgentId::Pi, err))?; + + let capability = PiCapabilityKey::from_detected(dialect, version_raw); + + let mode_override_raw = std::env::var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE").ok(); + if let Some(mode) = pi_runtime_mode_override() { + return Ok(PiRuntimeSelection { + mode, + capability, + reason: "forced by SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE".to_string(), + }); + } + if let Some(value) = mode_override_raw { + tracing::warn!( + override_value = value.as_str(), + "ignoring invalid SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE value" + ); + } + + if capability.parsed_version.is_none() { + return Ok(PiRuntimeSelection { + mode: PiRuntimeMode::PerSession, + capability, + reason: "unknown or unparseable version defaults to per-session".to_string(), + }); + } + + if supports_shared_multiplexing(&capability) { + return Ok(PiRuntimeSelection { + mode: PiRuntimeMode::Shared, + capability, + reason: "capability allowlist matched".to_string(), + }); + } + + Ok(PiRuntimeSelection { + mode: PiRuntimeMode::PerSession, + capability, + reason: "capability not allowlisted for shared multiplexing".to_string(), + }) + } + + async fn spawn_pi_session_runtime( + &self, + ) -> Result< + ( + SessionPiRuntime, + mpsc::UnboundedReceiver, + PiRpcDialect, + ), + SandboxError, + > { + let manager = self.agent_manager.clone(); + let log_dir = self.server_manager.log_base_dir.clone(); + let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::(); + let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::(); + + let (child, dialect) = tokio::task::spawn_blocking( + move || -> Result<(std::process::Child, PiRpcDialect), SandboxError> { + let path = manager + .resolve_binary(AgentId::Pi) + .map_err(|err| map_spawn_error(AgentId::Pi, err))?; + let dialect = detect_pi_dialect(&path); + let mut command = std::process::Command::new(path); + let stderr = AgentServerLogs::new(log_dir, AgentId::Pi.as_str()).open()?; + command + .arg("--mode") + .arg("rpc") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(stderr); + + let mut child = command.spawn().map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + + let stdin = child + .stdin + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "pi stdin unavailable".to_string(), + })?; + let stdout = child + .stdout + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "pi stdout unavailable".to_string(), + })?; + + let stdin_rx_mut = std::sync::Mutex::new(stdin_rx); + std::thread::spawn(move || { + let mut stdin = stdin; + let mut rx = stdin_rx_mut.lock().unwrap(); + while let Some(line) = rx.blocking_recv() { + if writeln!(stdin, "{line}").is_err() { + break; + } + if stdin.flush().is_err() { + break; + } + } + }); + + std::thread::spawn(move || { + let reader = BufReader::new(stdout); + for line in reader.lines() { + let Ok(line) = line else { break }; + if stdout_tx.send(line).is_err() { + break; + } + } + }); + + Ok((child, dialect)) + }, + ) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })??; + + let runtime = SessionPiRuntime { + stdin_sender: stdin_tx, + pending_requests: Arc::new(std::sync::Mutex::new(HashMap::new())), + next_id: Arc::new(AtomicI64::new(1)), + converter: Arc::new(std::sync::Mutex::new( + convert_pi::PiEventConverter::default(), + )), + child: Arc::new(std::sync::Mutex::new(Some(child))), + }; + + Ok((runtime, stdout_rx, dialect)) + } + + fn spawn_pi_session_monitor( + self: &Arc, + session_id: String, + child: Arc>>, + ) { + let manager = Arc::clone(self); + tokio::spawn(async move { + loop { + let status = { + let mut guard = match child.lock() { + Ok(guard) => guard, + Err(_) => return, + }; + match guard.as_mut() { + Some(child) => match child.try_wait() { + Ok(status) => status, + Err(_) => None, + }, + None => return, + } + }; + + if let Some(status) = status { + manager + .handle_pi_session_process_exit(session_id.clone(), status) + .await; + break; + } + + sleep(Duration::from_millis(500)).await; + } + }); + } + + async fn handle_pi_session_process_exit( + &self, + session_id: String, + status: std::process::ExitStatus, + ) { + let should_emit_error = { + let mut sessions = self.sessions.lock().await; + let Some(session) = Self::session_mut(&mut sessions, &session_id) else { + return; + }; + if session.agent != AgentId::Pi + || session.pi_runtime_mode != Some(PiRuntimeMode::PerSession) + { + return; + } + if let Some(pi_state) = session.pi_state.as_mut() { + pi_state.output_loop_running = false; + pi_state.runtime.clear_pending_requests(); + pi_state.turn_queue.clear(); + pi_state.turn_active = false; + } + !session.ended + }; + + if !should_emit_error { + return; + } + + let message = format!("pi session process exited with status {:?}", status); + self.record_error( + &session_id, + message.clone(), + Some("pi_process_exit".to_string()), + None, + ) + .await; + let logs = self.read_agent_stderr(AgentId::Pi); + self.mark_session_ended( + &session_id, + status.code(), + &message, + SessionEndReason::Error, + TerminatedBy::Daemon, + logs, + ) + .await; + } + + async fn create_pi_session_per_session( + self: &Arc, + session_id: &str, + ) -> Result<(String, SessionPiState), SandboxError> { + let (runtime, stdout_rx, dialect) = self.spawn_pi_session_runtime().await?; + let session_for_output = session_id.to_string(); + let runtime_for_output = runtime.clone(); + let self_for_output = Arc::clone(self); + tokio::spawn(async move { + self_for_output + .handle_pi_session_output(session_for_output, runtime_for_output, stdout_rx) + .await; + }); + self.spawn_pi_session_monitor(session_id.to_string(), runtime.child.clone()); + + let request_id = runtime.next_request_id(); + let request = match dialect { + PiRpcDialect::Mono => json!({ + "type": "command", + "id": request_id, + "command": "new_session", + "params": { "sessionName": session_id } + }), + PiRpcDialect::CodingAgent => json!({ + "type": "new_session", + "id": request_id + }), + }; + + let native_session_id_result = async { + let rx = runtime.send_request(request_id, &request)?; + let result = tokio::time::timeout(Duration::from_secs(30), rx).await; + let response = match result { + Ok(Ok(response)) => response, + Ok(Err(_)) => { + return Err(SandboxError::StreamError { + message: "pi new_session request cancelled".to_string(), + }) + } + Err(_) => { + return Err(SandboxError::StreamError { + message: "pi new_session request timed out".to_string(), + }) + } + }; + if response + .get("success") + .and_then(Value::as_bool) + .is_some_and(|success| !success) + { + return Err(SandboxError::StreamError { + message: format!("pi new_session failed: {response}"), + }); + } + if response + .get("data") + .and_then(|value| value.get("cancelled")) + .and_then(Value::as_bool) + .is_some_and(|cancelled| cancelled) + { + return Err(SandboxError::StreamError { + message: "pi new_session request cancelled".to_string(), + }); + } + + if dialect == PiRpcDialect::CodingAgent { + let state_id = runtime.next_request_id(); + let request = json!({ + "type": "get_state", + "id": state_id + }); + let rx = runtime.send_request(state_id, &request)?; + let result = tokio::time::timeout(Duration::from_secs(30), rx).await; + let response = match result { + Ok(Ok(response)) => response, + Ok(Err(_)) => { + return Err(SandboxError::StreamError { + message: "pi get_state request cancelled".to_string(), + }) + } + Err(_) => { + return Err(SandboxError::StreamError { + message: "pi get_state request timed out".to_string(), + }) + } + }; + if response + .get("success") + .and_then(Value::as_bool) + .is_some_and(|success| !success) + { + return Err(SandboxError::StreamError { + message: format!("pi get_state failed: {response}"), + }); + } + let session_value = response.get("data").unwrap_or(&response); + return session_value + .get("sessionId") + .or_else(|| session_value.get("session_id")) + .and_then(Value::as_str) + .map(|value| value.to_string()) + .ok_or_else(|| SandboxError::StreamError { + message: "pi get_state response missing session id".to_string(), + }); + } + + let session_value = response.get("data").unwrap_or(&response); + session_value + .get("sessionId") + .or_else(|| session_value.get("session_id")) + .and_then(Value::as_str) + .map(|value| value.to_string()) + .ok_or_else(|| SandboxError::StreamError { + message: "pi new_session response missing session id".to_string(), + }) + } + .await; + + if native_session_id_result.is_err() { + runtime.clear_pending_requests(); + runtime.kill_child(); + } + + native_session_id_result.map(|native_session_id| { + let state = SessionPiState::new(runtime, dialect); + (native_session_id, state) + }) + } + + async fn session_native_session_id(&self, session_id: &str) -> Option { + let sessions = self.sessions.lock().await; + Self::session_ref(&sessions, session_id) + .and_then(|session| session.native_session_id.clone()) + } + + async fn handle_pi_session_output( + self: Arc, + session_id: String, + runtime: SessionPiRuntime, + mut stdout_rx: mpsc::UnboundedReceiver, + ) { + while let Some(line) = stdout_rx.recv().await { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let value: Value = match serde_json::from_str(trimmed) { + Ok(v) => v, + Err(err) => { + self.record_pi_unparsed( + Some(session_id.clone()), + &err.to_string(), + Value::String(trimmed.to_string()), + ) + .await; + self.mark_pi_turn_terminal(&session_id).await; + continue; + } + }; + + let message_type = value.get("type").and_then(Value::as_str).unwrap_or(""); + if message_type == "response" { + let id = value + .get("id") + .and_then(Value::as_i64) + .or_else(|| value.get("id").and_then(Value::as_str)?.parse::().ok()); + if let Some(id) = id { + runtime.complete_request(id, value.clone()); + } + continue; + } + + let _event: pi_schema::RpcEvent = match serde_json::from_value(value.clone()) { + Ok(event) => event, + Err(err) => { + self.record_pi_unparsed( + Some(session_id.clone()), + &err.to_string(), + value.clone(), + ) + .await; + self.mark_pi_turn_terminal(&session_id).await; + continue; + } + }; + + let conversions = { + let mut converter = runtime.converter.lock().unwrap(); + converter.event_value_to_universal(&value) + }; + + let mut conversions = match conversions { + Ok(conversions) => conversions, + Err(err) => { + self.record_pi_unparsed(Some(session_id.clone()), &err, value.clone()) + .await; + self.mark_pi_turn_terminal(&session_id).await; + continue; + } + }; + + let native_session_id = match extract_pi_session_id(&value) { + Some(native) => Some(native), + None => self.session_native_session_id(&session_id).await, + }; + for conversion in &mut conversions { + if conversion.native_session_id.is_none() { + conversion.native_session_id = native_session_id.clone(); + } + conversion.raw = Some(value.clone()); + } + + let turn_terminal = pi_turn_terminal_detected(&conversions); + let _ = self.record_conversions(&session_id, conversions).await; + if turn_terminal { + self.mark_pi_turn_terminal(&session_id).await; + } + } + + let mut sessions = self.sessions.lock().await; + if let Some(session) = Self::session_mut(&mut sessions, &session_id) { + if let Some(pi_state) = session.pi_state.as_mut() { + pi_state.output_loop_running = false; + } + } + } + + async fn dispatch_pi_prompt( + &self, + runtime: SessionPiRuntime, + dialect: PiRpcDialect, + native_session_id: Option, + prompt: String, + ) -> Result<(), SandboxError> { + let native_session_id = native_session_id.ok_or_else(|| SandboxError::InvalidRequest { + message: "missing Pi session id".to_string(), + })?; + + let id = runtime.next_request_id(); + let request = match dialect { + PiRpcDialect::Mono => json!({ + "type": "command", + "id": id, + "command": "prompt", + "params": { + "sessionId": native_session_id, + "message": { + "role": "user", + "content": [{ "type": "text", "text": prompt }] + } + } + }), + PiRpcDialect::CodingAgent => json!({ + "type": "prompt", + "id": id, + "message": prompt + }), + }; + + runtime.send_request(id, &request)?; + Ok(()) + } + + async fn enqueue_pi_prompt( + self: &Arc, + session_id: &str, + prompt: String, + ) -> Result<(), SandboxError> { + let dispatch = { + let mut sessions = self.sessions.lock().await; + let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| { + SandboxError::SessionNotFound { + session_id: session_id.to_string(), + } + })?; + let pi_state = + session + .pi_state + .as_mut() + .ok_or_else(|| SandboxError::InvalidRequest { + message: "pi per-session runtime unavailable".to_string(), + })?; + pi_state.enqueue_prompt(prompt); + pi_state.begin_next_turn_if_idle().map(|queued| { + ( + pi_state.runtime.clone(), + pi_state.dialect, + session.native_session_id.clone(), + queued.prompt, + ) + }) + }; + + if let Some((runtime, dialect, native_session_id, prompt)) = dispatch { + if let Err(err) = self + .dispatch_pi_prompt(runtime, dialect, native_session_id, prompt) + .await + { + let mut sessions = self.sessions.lock().await; + if let Some(session) = Self::session_mut(&mut sessions, session_id) { + if let Some(pi_state) = session.pi_state.as_mut() { + pi_state.mark_turn_complete(); + } + } + return Err(err); + } + Ok(()) + } else { + Ok(()) + } + } + + async fn mark_pi_turn_terminal(&self, session_id: &str) { + let next_dispatch = { + let mut sessions = self.sessions.lock().await; + let Some(session) = Self::session_mut(&mut sessions, session_id) else { + return; + }; + let Some(pi_state) = session.pi_state.as_mut() else { + return; + }; + if !pi_state.turn_active { + return; + } + pi_state.mark_turn_complete(); + pi_state.begin_next_turn_if_idle().map(|queued| { + ( + pi_state.runtime.clone(), + pi_state.dialect, + session.native_session_id.clone(), + queued.prompt, + ) + }) + }; + + if let Some((runtime, dialect, native_session_id, prompt)) = next_dispatch { + if let Err(err) = self + .dispatch_pi_prompt(runtime, dialect, native_session_id, prompt) + .await + { + self.record_error( + session_id, + err.to_string(), + Some("pi_prompt_dispatch".to_string()), + None, + ) + .await; + let mut sessions = self.sessions.lock().await; + if let Some(session) = Self::session_mut(&mut sessions, session_id) { + if let Some(pi_state) = session.pi_state.as_mut() { + pi_state.mark_turn_complete(); + } + } + } + } + } + + /// Ensures a shared Pi RPC process is running. + async fn ensure_pi_server(self: &Arc) -> Result, SandboxError> { + let (server, receiver) = self.server_manager.ensure_pi_server().await?; + + if let Some(stdout_rx) = receiver { + let server_for_task = server.clone(); + let self_for_task = Arc::clone(self); + tokio::spawn(async move { + self_for_task + .handle_pi_server_output(server_for_task, stdout_rx) + .await; + }); +>>>>>>> Stashed changes } command.arg("--model").arg(model); } @@ -3594,6 +4709,7 @@ impl SessionManager { let value: Value = match serde_json::from_str(trimmed) { Ok(v) => v, Err(err) => { +<<<<<<< Updated upstream if self.is_active_pi_runtime(&session_id, &runtime).await { self.record_pi_unparsed( &session_id, @@ -3602,6 +4718,14 @@ impl SessionManager { ) .await; } +======= + self.record_pi_unparsed( + None, + &err.to_string(), + Value::String(trimmed.to_string()), + ) + .await; +>>>>>>> Stashed changes continue; } }; @@ -3618,22 +4742,65 @@ impl SessionManager { continue; } +<<<<<<< Updated upstream if !self.is_active_pi_runtime(&session_id, &runtime).await { +======= + let native_session_id = extract_pi_session_id(&value).or_else(|| { + if server.dialect() == PiRpcDialect::CodingAgent { + server.current_native_session_id() + } else { + None + } + }); + let session_id = native_session_id + .as_ref() + .and_then(|id| server.session_for_native(id)) + .or_else(|| { + if server.dialect() == PiRpcDialect::CodingAgent { + server.current_session_id() + } else { + None + } + }); + let Some(session_id) = session_id else { + self.record_pi_unparsed(None, "pi event missing session id", value.clone()) + .await; +>>>>>>> Stashed changes continue; } - let event: pi_schema::RpcEvent = match serde_json::from_value(value.clone()) { + let _event: pi_schema::RpcEvent = match serde_json::from_value(value.clone()) { Ok(event) => event, Err(err) => { +<<<<<<< Updated upstream self.record_pi_unparsed(&session_id, &err.to_string(), value.clone()) .await; +======= + self.record_pi_unparsed( + Some(session_id.clone()), + &err.to_string(), + value.clone(), + ) + .await; +>>>>>>> Stashed changes continue; } }; let conversions = { +<<<<<<< Updated upstream let mut converter = runtime.converter.lock().unwrap(); converter.event_to_universal(&event) +======= + let mut converters = server.converters.lock().unwrap(); + let key = native_session_id + .clone() + .unwrap_or_else(|| session_id.clone()); + let converter = converters + .entry(key) + .or_insert_with(convert_pi::PiEventConverter::default); + converter.event_value_to_universal(&value) +>>>>>>> Stashed changes }; let mut conversions = match conversions { @@ -3731,6 +4898,7 @@ impl SessionManager { message: "pi new_session request cancelled".to_string(), }) } +<<<<<<< Updated upstream Err(_) => { return Err(SandboxError::StreamError { message: "pi new_session request timed out".to_string(), @@ -3745,6 +4913,67 @@ impl SessionManager { return Err(SandboxError::StreamError { message: format!("pi new_session failed: {new_response}"), }); +======= + + let native_session_id = if server.dialect() == PiRpcDialect::CodingAgent { + let state_id = server.next_request_id(); + let request = json!({ + "type": "get_state", + "id": state_id + }); + let rx = server.send_request(state_id, &request).ok_or_else(|| { + SandboxError::StreamError { + message: "failed to send pi get_state request".to_string(), + } + })?; + let result = tokio::time::timeout(Duration::from_secs(30), rx).await; + let response = match result { + Ok(Ok(response)) => response, + Ok(Err(_)) => { + return Err(SandboxError::StreamError { + message: "pi get_state request cancelled".to_string(), + }) + } + Err(_) => { + return Err(SandboxError::StreamError { + message: "pi get_state request timed out".to_string(), + }) + } + }; + if response + .get("success") + .and_then(Value::as_bool) + .is_some_and(|success| !success) + { + return Err(SandboxError::StreamError { + message: format!("pi get_state failed: {response}"), + }); + } + let session_value = response.get("data").unwrap_or(&response); + session_value + .get("sessionId") + .or_else(|| session_value.get("session_id")) + .and_then(Value::as_str) + .ok_or_else(|| SandboxError::StreamError { + message: "pi get_state response missing session id".to_string(), + })? + .to_string() + } else { + let session_value = response.get("data").unwrap_or(&response); + session_value + .get("sessionId") + .or_else(|| session_value.get("session_id")) + .and_then(Value::as_str) + .ok_or_else(|| SandboxError::StreamError { + message: "pi new_session response missing session id".to_string(), + })? + .to_string() + }; + + server.register_session(native_session_id.clone(), session_id.to_string()); + + Ok(native_session_id) +>>>>>>> Stashed changes } if new_response .get("data") @@ -5235,8 +6464,12 @@ fn agent_capabilities_for(agent: AgentId) -> AgentCapabilities { mcp_tools: false, streaming_deltas: true, item_started: true, +<<<<<<< Updated upstream variants: false, shared_process: false, // one dedicated rpc process per session +======= + shared_process: true, // runtime-selected; shared only for allowlisted capabilities +>>>>>>> Stashed changes }, AgentId::Mock => AgentCapabilities { plan_mode: true, @@ -6383,8 +7616,8 @@ fn parse_agent_line(agent: AgentId, line: &str, session_id: &str) -> Vec vec![agent_unparsed("amp", &err.to_string(), value)], }, - AgentId::Pi => match serde_json::from_value(value.clone()) { - Ok(event) => convert_pi::event_to_universal(&event) + AgentId::Pi => match serde_json::from_value::(value.clone()) { + Ok(_) => convert_pi::event_value_to_universal(&value) .unwrap_or_else(|err| vec![agent_unparsed("pi", &err, value)]), Err(err) => vec![agent_unparsed("pi", &err.to_string(), value)], }, @@ -6441,6 +7674,7 @@ fn extract_nested_string(value: &Value, path: &[&str]) -> Option { } #[cfg(test)] +<<<<<<< Updated upstream mod pi_model_parser_tests { use super::*; @@ -6745,6 +7979,79 @@ mod pi_runtime_tests { }) .unwrap_or_default(); assert_eq!(text, "Hello"); +======= +mod pi_runtime_tests { + use super::*; + + fn dummy_pi_state() -> SessionPiState { + let (stdin_sender, _stdin_rx) = mpsc::unbounded_channel::(); + let runtime = SessionPiRuntime { + stdin_sender, + pending_requests: Arc::new(std::sync::Mutex::new(HashMap::new())), + next_id: Arc::new(AtomicI64::new(1)), + converter: Arc::new(std::sync::Mutex::new( + convert_pi::PiEventConverter::default(), + )), + child: Arc::new(std::sync::Mutex::new(None)), + }; + SessionPiState::new(runtime, PiRpcDialect::Mono) + } + + #[test] + fn pi_capability_allowlist_shared_for_known_mono_version() { + let capability = + PiCapabilityKey::from_detected(PiRpcDialect::Mono, Some("pi 1.2.3".to_string())); + assert!(supports_shared_multiplexing(&capability)); + } + + #[test] + fn pi_capability_allowlist_unknown_version_not_shared() { + let capability = PiCapabilityKey::from_detected(PiRpcDialect::Mono, None); + assert!(!supports_shared_multiplexing(&capability)); + } + + #[test] + fn pi_capability_allowlist_coding_agent_not_shared() { + let capability = + PiCapabilityKey::from_detected(PiRpcDialect::CodingAgent, Some("1.2.3".to_string())); + assert!(!supports_shared_multiplexing(&capability)); + } + + #[test] + fn pi_turn_queue_is_fifo_and_gated() { + let mut state = dummy_pi_state(); + state.enqueue_prompt("first".to_string()); + state.enqueue_prompt("second".to_string()); + state.enqueue_prompt("third".to_string()); + + let first = state.begin_next_turn_if_idle().expect("first prompt"); + assert_eq!(first.prompt, "first"); + assert!(state.begin_next_turn_if_idle().is_none()); + + state.mark_turn_complete(); + let second = state.begin_next_turn_if_idle().expect("second prompt"); + assert_eq!(second.prompt, "second"); + + state.mark_turn_complete(); + let third = state.begin_next_turn_if_idle().expect("third prompt"); + assert_eq!(third.prompt, "third"); + } + + #[test] + fn pi_turn_queue_clear_for_terminate_resets_state() { + let mut state = dummy_pi_state(); + state.enqueue_prompt("queued".to_string()); + let _ = state.begin_next_turn_if_idle(); + let (tx, _rx) = oneshot::channel::(); + state.runtime.pending_requests.lock().unwrap().insert(1, tx); + + state.clear_for_terminate(); + + assert!(state.turn_queue.is_empty()); + assert!(!state.turn_active); + assert!(!state.output_loop_running); + assert!(state.runtime.pending_requests.lock().unwrap().is_empty()); +>>>>>>> Stashed changes } } @@ -7997,6 +9304,23 @@ fn is_turn_terminal(event: &UniversalEvent, _agent: AgentId) -> bool { } } +fn pi_turn_terminal_detected(conversions: &[EventConversion]) -> bool { + conversions + .iter() + .any(|conversion| match conversion.event_type { + UniversalEventType::Error + | UniversalEventType::AgentUnparsed + | UniversalEventType::SessionEnded => true, + UniversalEventType::ItemCompleted => { + let UniversalEventData::Item(ItemEventData { item }) = &conversion.data else { + return false; + }; + item.kind == ItemKind::Message && matches!(item.role, Some(ItemRole::Assistant)) + } + _ => false, + }) +} + fn status_label(item: &UniversalItem) -> Option<&str> { if item.kind != ItemKind::Status { return None; @@ -8025,6 +9349,7 @@ struct SessionSnapshot { model: Option, variant: Option, native_session_id: Option, + pi_runtime_mode: Option, } impl From<&SessionState> for SessionSnapshot { @@ -8037,6 +9362,7 @@ impl From<&SessionState> for SessionSnapshot { model: session.model.clone(), variant: session.variant.clone(), native_session_id: session.native_session_id.clone(), + pi_runtime_mode: session.pi_runtime_mode, } } } diff --git a/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs b/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs index 5ca66ea..e6cb772 100644 --- a/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs +++ b/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs @@ -1,6 +1,32 @@ // Pi RPC integration tests (gated via SANDBOX_TEST_PI + PATH). include!("../common/http.rs"); +<<<<<<< Updated upstream +======= +struct EnvVarGuard { + key: String, + previous: Option, +} + +impl Drop for EnvVarGuard { + fn drop(&mut self) { + match &self.previous { + Some(value) => std::env::set_var(&self.key, value), + None => std::env::remove_var(&self.key), + } + } +} + +fn set_env_var(key: &str, value: &str) -> EnvVarGuard { + let previous = std::env::var(key).ok(); + std::env::set_var(key, value); + EnvVarGuard { + key: key.to_string(), + previous, + } +} + +>>>>>>> Stashed changes fn pi_test_config() -> Option { let configs = match test_agents_from_env() { Ok(configs) => configs, @@ -13,6 +39,83 @@ fn pi_test_config() -> Option { .into_iter() .find(|config| config.agent == AgentId::Pi) } +<<<<<<< Updated upstream +======= + +async fn create_pi_session_checked(app: &Router, session_id: &str) -> Value { + let (status, payload) = send_json( + app, + Method::POST, + &format!("/v1/sessions/{session_id}"), + Some(json!({ + "agent": "pi", + "permissionMode": test_permission_mode(AgentId::Pi), + })), + ) + .await; + assert_eq!(status, StatusCode::OK, "create pi session {session_id}"); + payload +} + +async fn poll_events_until_assistant_count( + app: &Router, + session_id: &str, + expected_assistant_messages: usize, + timeout: Duration, +) -> Vec { + let start = Instant::now(); + let mut offset = 0u64; + let mut events = Vec::new(); + + while start.elapsed() < timeout { + let path = format!("/v1/sessions/{session_id}/events?offset={offset}&limit=200"); + let (status, payload) = send_json(app, Method::GET, &path, None).await; + assert_eq!(status, StatusCode::OK, "poll events"); + let new_events = payload + .get("events") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(); + + if !new_events.is_empty() { + if let Some(last) = new_events + .last() + .and_then(|event| event.get("sequence")) + .and_then(Value::as_u64) + { + offset = last; + } + events.extend(new_events); + } + + if events.iter().any(is_unparsed_event) { + break; + } + + let assistant_count = events + .iter() + .filter(|event| is_assistant_message(event)) + .count(); + if assistant_count >= expected_assistant_messages { + break; + } + + if events.iter().any(is_error_event) { + break; + } + + tokio::time::sleep(Duration::from_millis(800)).await; + } + + events +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pi_rpc_session_and_stream() { + let Some(config) = pi_test_config() else { + return; + }; +>>>>>>> Stashed changes async fn create_pi_session_with_native(app: &Router, session_id: &str) -> String { let (status, payload) = send_json( @@ -53,6 +156,7 @@ fn assert_strictly_increasing_sequences(events: &[Value], label: &str) { } } +<<<<<<< Updated upstream fn assert_all_events_for_session(events: &[Value], session_id: &str) { for event in events { let event_session_id = event @@ -93,6 +197,11 @@ fn assert_item_started_ids_unique(events: &[Value], label: &str) { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pi_rpc_session_and_stream() { +======= +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pi_rpc_multi_session_create_per_session_mode() { + let _mode_guard = set_env_var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE", "per-session"); +>>>>>>> Stashed changes let Some(config) = pi_test_config() else { return; }; @@ -101,6 +210,7 @@ async fn pi_rpc_session_and_stream() { let _guard = apply_credentials(&config.credentials); install_agent(&app.app, config.agent).await; +<<<<<<< Updated upstream let session_id = "pi-rpc-session"; let _native_session_id = create_pi_session_with_native(&app.app, session_id).await; @@ -119,6 +229,33 @@ async fn pi_rpc_session_and_stream() { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pi_parallel_sessions_turns() { +======= + let first = create_pi_session_checked(&app.app, "pi-multi-a").await; + let second = create_pi_session_checked(&app.app, "pi-multi-b").await; + + let first_native = first + .get("native_session_id") + .and_then(Value::as_str) + .unwrap_or(""); + let second_native = second + .get("native_session_id") + .and_then(Value::as_str) + .unwrap_or(""); + assert!(!first_native.is_empty(), "first native session id missing"); + assert!( + !second_native.is_empty(), + "second native session id missing" + ); + assert_ne!( + first_native, second_native, + "per-session mode should allocate independent native session ids" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pi_rpc_per_session_queue_and_termination_isolation() { + let _mode_guard = set_env_var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE", "per-session"); +>>>>>>> Stashed changes let Some(config) = pi_test_config() else { return; }; @@ -127,6 +264,7 @@ async fn pi_parallel_sessions_turns() { let _guard = apply_credentials(&config.credentials); install_agent(&app.app, config.agent).await; +<<<<<<< Updated upstream let session_a = "pi-parallel-a"; let session_b = "pi-parallel-b"; create_pi_session_with_native(&app.app, session_a).await; @@ -285,3 +423,105 @@ async fn pi_runtime_restart_scope() { ); assert_all_events_for_session(&events_b, session_b); } +======= + create_pi_session_checked(&app.app, "pi-queue-a").await; + create_pi_session_checked(&app.app, "pi-queue-b").await; + + let status = send_status( + &app.app, + Method::POST, + "/v1/sessions/pi-queue-a/messages", + Some(json!({ "message": "Reply with exactly FIRST." })), + ) + .await; + assert_eq!(status, StatusCode::NO_CONTENT, "send first prompt"); + + let status = send_status( + &app.app, + Method::POST, + "/v1/sessions/pi-queue-a/messages", + Some(json!({ "message": "Reply with exactly SECOND." })), + ) + .await; + assert_eq!(status, StatusCode::NO_CONTENT, "enqueue second prompt"); + + let status = send_status( + &app.app, + Method::POST, + "/v1/sessions/pi-queue-b/messages", + Some(json!({ "message": PROMPT })), + ) + .await; + assert_eq!( + status, + StatusCode::NO_CONTENT, + "send prompt to sibling session" + ); + + let events_a = + poll_events_until_assistant_count(&app.app, "pi-queue-a", 2, Duration::from_secs(240)) + .await; + let events_b = + poll_events_until_assistant_count(&app.app, "pi-queue-b", 1, Duration::from_secs(180)) + .await; + + assert!( + !events_a.iter().any(is_unparsed_event), + "session a emitted agent.unparsed" + ); + assert!( + !events_b.iter().any(is_unparsed_event), + "session b emitted agent.unparsed" + ); + let assistant_count_a = events_a + .iter() + .filter(|event| is_assistant_message(event)) + .count(); + let assistant_count_b = events_b + .iter() + .filter(|event| is_assistant_message(event)) + .count(); + assert!( + assistant_count_a >= 2, + "expected at least two assistant completions for queued session, got {assistant_count_a}" + ); + assert!( + assistant_count_b >= 1, + "expected assistant completion for sibling session, got {assistant_count_b}" + ); + + let status = send_status( + &app.app, + Method::POST, + "/v1/sessions/pi-queue-a/terminate", + Some(json!({})), + ) + .await; + assert_eq!(status, StatusCode::NO_CONTENT, "terminate first session"); + + let status = send_status( + &app.app, + Method::POST, + "/v1/sessions/pi-queue-b/messages", + Some(json!({ "message": PROMPT })), + ) + .await; + assert_eq!( + status, + StatusCode::NO_CONTENT, + "sibling session should continue after termination" + ); + + let events_b_after = + poll_events_until_assistant_count(&app.app, "pi-queue-b", 2, Duration::from_secs(180)) + .await; + let assistant_count_b_after = events_b_after + .iter() + .filter(|event| is_assistant_message(event)) + .count(); + assert!( + assistant_count_b_after >= 2, + "expected additional assistant completion for sibling session after termination" + ); +} +>>>>>>> Stashed changes diff --git a/server/packages/universal-agent-schema/src/agents/pi.rs b/server/packages/universal-agent-schema/src/agents/pi.rs index ef91071..a863c43 100644 --- a/server/packages/universal-agent-schema/src/agents/pi.rs +++ b/server/packages/universal-agent-schema/src/agents/pi.rs @@ -21,6 +21,46 @@ pub struct PiEventConverter { } impl PiEventConverter { + pub fn event_value_to_universal( + &mut self, + raw: &Value, + ) -> Result, String> { + let event_type = raw + .get("type") + .and_then(Value::as_str) + .ok_or_else(|| "missing event type".to_string())?; + let native_session_id = extract_session_id(raw); + + let conversions = match event_type { + "message_start" => self.message_start(raw), + "message_update" => self.message_update(raw), + "message_end" => self.message_end(raw), + "tool_execution_start" => self.tool_execution_start(raw), + "tool_execution_update" => self.tool_execution_update(raw), + "tool_execution_end" => self.tool_execution_end(raw), + "agent_start" + | "agent_end" + | "turn_start" + | "turn_end" + | "auto_compaction" + | "auto_compaction_start" + | "auto_compaction_end" + | "auto_retry" + | "auto_retry_start" + | "auto_retry_end" + | "hook_error" => Ok(vec![status_event(event_type, raw)]), + "extension_ui_request" | "extension_ui_response" | "extension_error" => { + Ok(vec![status_event(event_type, raw)]) + } + other => Err(format!("unsupported Pi event type: {other}")), + }?; + + Ok(conversions + .into_iter() + .map(|conversion| attach_metadata(conversion, &native_session_id, raw)) + .collect()) + } + fn next_synthetic_message_id(&mut self) -> String { self.message_counter += 1; format!("pi_msg_{}", self.message_counter) @@ -72,40 +112,7 @@ impl PiEventConverter { event: &schema::RpcEvent, ) -> Result, String> { let raw = serde_json::to_value(event).map_err(|err| err.to_string())?; - let event_type = raw - .get("type") - .and_then(Value::as_str) - .ok_or_else(|| "missing event type".to_string())?; - let native_session_id = extract_session_id(&raw); - - let conversions = match event_type { - "message_start" => self.message_start(&raw), - "message_update" => self.message_update(&raw), - "message_end" => self.message_end(&raw), - "tool_execution_start" => self.tool_execution_start(&raw), - "tool_execution_update" => self.tool_execution_update(&raw), - "tool_execution_end" => self.tool_execution_end(&raw), - "agent_start" - | "agent_end" - | "turn_start" - | "turn_end" - | "auto_compaction" - | "auto_compaction_start" - | "auto_compaction_end" - | "auto_retry" - | "auto_retry_start" - | "auto_retry_end" - | "hook_error" => Ok(vec![status_event(event_type, &raw)]), - "extension_ui_request" | "extension_ui_response" | "extension_error" => { - Ok(vec![status_event(event_type, &raw)]) - } - other => Err(format!("unsupported Pi event type: {other}")), - }?; - - Ok(conversions - .into_iter() - .map(|conversion| attach_metadata(conversion, &native_session_id, &raw)) - .collect()) + self.event_value_to_universal(&raw) } fn message_start(&mut self, raw: &Value) -> Result, String> { @@ -265,6 +272,8 @@ impl PiEventConverter { message: Option<&Value>, ) -> EventConversion { let mut content = message.and_then(parse_message_content).unwrap_or_default(); + let failed = message_is_failed(message); + let message_error_text = extract_message_error_text(message); if let Some(id) = message_id.clone() { if content.is_empty() { @@ -292,6 +301,12 @@ impl PiEventConverter { self.message_started.remove(&id); } + if failed && content.is_empty() { + if let Some(text) = message_error_text { + content.push(ContentPart::Text { text }); + } + } + let item = UniversalItem { item_id: String::new(), native_item_id: message_id, @@ -299,7 +314,11 @@ impl PiEventConverter { kind: ItemKind::Message, role: Some(ItemRole::Assistant), content, - status: ItemStatus::Completed, + status: if failed { + ItemStatus::Failed + } else { + ItemStatus::Completed + }, }; EventConversion::new( UniversalEventType::ItemCompleted, @@ -434,6 +453,10 @@ pub fn event_to_universal(event: &schema::RpcEvent) -> Result Result, String> { + PiEventConverter::default().event_value_to_universal(raw) +} + fn attach_metadata( conversion: EventConversion, native_session_id: &Option, @@ -584,6 +607,53 @@ fn parse_message_content(message: &Value) -> Option> { Some(parts) } +fn message_is_failed(message: Option<&Value>) -> bool { + message + .and_then(|value| { + value + .get("stopReason") + .or_else(|| value.get("stop_reason")) + .and_then(Value::as_str) + }) + .is_some_and(|reason| reason == "error" || reason == "aborted") +} + +fn extract_message_error_text(message: Option<&Value>) -> Option { + let value = message?; + + if let Some(text) = value + .get("errorMessage") + .or_else(|| value.get("error_message")) + .and_then(Value::as_str) + { + let trimmed = text.trim(); + if !trimmed.is_empty() { + return Some(trimmed.to_string()); + } + } + + let error = value.get("error")?; + if let Some(text) = error.as_str() { + let trimmed = text.trim(); + if !trimmed.is_empty() { + return Some(trimmed.to_string()); + } + } + if let Some(text) = error + .get("errorMessage") + .or_else(|| error.get("error_message")) + .or_else(|| error.get("message")) + .and_then(Value::as_str) + { + let trimmed = text.trim(); + if !trimmed.is_empty() { + return Some(trimmed.to_string()); + } + } + + None +} + fn content_part_from_value(value: &Value) -> Option { if let Some(text) = value.as_str() { return Some(ContentPart::Text { diff --git a/server/packages/universal-agent-schema/tests/pi_conversion.rs b/server/packages/universal-agent-schema/tests/pi_conversion.rs index a59d137..971fde8 100644 --- a/server/packages/universal-agent-schema/tests/pi_conversion.rs +++ b/server/packages/universal-agent-schema/tests/pi_conversion.rs @@ -262,3 +262,46 @@ fn pi_message_done_completes_without_message_end() { panic!("expected item event"); } } + +#[test] +fn pi_message_end_error_surfaces_failed_status_and_error_text() { + let mut converter = PiEventConverter::default(); + + let start_event = parse_event(json!({ + "type": "message_start", + "sessionId": "session-1", + "messageId": "msg-err", + "message": { + "role": "assistant", + "content": [] + } + })); + let _ = converter + .event_to_universal(&start_event) + .expect("start conversions"); + + let end_raw = json!({ + "type": "message_end", + "sessionId": "session-1", + "messageId": "msg-err", + "message": { + "role": "assistant", + "content": [], + "stopReason": "error", + "errorMessage": "Connection error." + } + }); + let end_events = converter + .event_value_to_universal(&end_raw) + .expect("end conversions"); + + assert_eq!(end_events[0].event_type, UniversalEventType::ItemCompleted); + if let UniversalEventData::Item(item) = &end_events[0].data { + assert_eq!(item.item.status, ItemStatus::Failed); + assert!( + matches!(item.item.content.first(), Some(ContentPart::Text { text }) if text == "Connection error.") + ); + } else { + panic!("expected item event"); + } +}