From 9a26604001072df11804705111b32ed8a5800099 Mon Sep 17 00:00:00 2001 From: Franklin Date: Fri, 6 Feb 2026 17:17:00 -0500 Subject: [PATCH] wip: pi working with variatns --- docs/conversion.mdx | 7 +- docs/openapi.json | 4 - .../sandbox-agent/src/opencode_compat.rs | 9 - server/packages/sandbox-agent/src/router.rs | 1690 ++++------------- .../tests/agent-flows/pi_rpc_integration.rs | 384 ++-- .../tests/http/agent_endpoints.rs | 73 + 6 files changed, 530 insertions(+), 1637 deletions(-) diff --git a/docs/conversion.mdx b/docs/conversion.mdx index 72daabb..8fcd38c 100644 --- a/docs/conversion.mdx +++ b/docs/conversion.mdx @@ -84,9 +84,6 @@ Message normalization notes - If Pi message_update events omit messageId, we synthesize a stable message id and emit a synthetic item.started before the first delta so streaming text stays grouped. - Pi auto_compaction_start/auto_compaction_end and auto_retry_start/auto_retry_end events are mapped to status items (label `pi.*`). - Pi extension_ui_request/extension_error events are mapped to status items. -<<<<<<< Updated upstream - Pi RPC from pi-coding-agent does not include sessionId in events; each daemon session owns a dedicated Pi RPC process, so events are routed by runtime ownership (parallel sessions supported). -======= -- Pi runtime mode is capability-selected: default is per-session process isolation, while shared multiplexing is used only for allowlisted Pi capabilities. -- In shared mode, pi-coding-agent events without sessionId are routed using the current-session mapping. ->>>>>>> Stashed changes +- PI `variant` maps directly to PI RPC `set_thinking_level.level` before prompts are sent. +- PI remains source of truth for thinking-level constraints: unsupported levels (including non-reasoning models and model-specific limits such as `xhigh`) are PI-native clamped or rejected. diff --git a/docs/openapi.json b/docs/openapi.json index 57059f7..bbe1c2e 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -10,11 +10,7 @@ "license": { "name": "Apache-2.0" }, -<<<<<<< Updated upstream "version": "0.1.7" -======= - "version": "0.1.6" ->>>>>>> Stashed changes }, "servers": [ { diff --git a/server/packages/sandbox-agent/src/opencode_compat.rs b/server/packages/sandbox-agent/src/opencode_compat.rs index 276577f..1393f94 100644 --- a/server/packages/sandbox-agent/src/opencode_compat.rs +++ b/server/packages/sandbox-agent/src/opencode_compat.rs @@ -2628,12 +2628,8 @@ pub fn build_opencode_router(state: Arc) -> Router { responses((status = 200)), tag = "opencode" )] -<<<<<<< Updated upstream async fn oc_agent_list(State(state): State>) -> impl IntoResponse { let name = state.inner.branding.product_name(); -======= -async fn oc_agent_list(State(_state): State>) -> impl IntoResponse { ->>>>>>> Stashed changes let agent = json!({ "name": name, "description": format!("{name} compatibility layer"), @@ -4278,7 +4274,6 @@ async fn oc_file_list() -> impl IntoResponse { tag = "opencode" )] async fn oc_file_content(Query(query): Query) -> impl IntoResponse { - let _directory = query.directory.as_deref(); if query.path.is_none() { return bad_request("path is required").into_response(); } @@ -4309,7 +4304,6 @@ async fn oc_file_status() -> impl IntoResponse { tag = "opencode" )] async fn oc_find_text(Query(query): Query) -> impl IntoResponse { - let _directory = query.directory.as_deref(); if query.pattern.is_none() { return bad_request("pattern is required").into_response(); } @@ -4323,7 +4317,6 @@ async fn oc_find_text(Query(query): Query) -> impl IntoResponse { tag = "opencode" )] async fn oc_find_files(Query(query): Query) -> impl IntoResponse { - let _directory = query.directory.as_deref(); if query.query.is_none() { return bad_request("query is required").into_response(); } @@ -4337,7 +4330,6 @@ async fn oc_find_files(Query(query): Query) -> impl IntoResponse tag = "opencode" )] async fn oc_find_symbols(Query(query): Query) -> impl IntoResponse { - let _directory = query.directory.as_deref(); if query.query.is_none() { return bad_request("query is required").into_response(); } @@ -4454,7 +4446,6 @@ async fn oc_tool_ids() -> impl IntoResponse { tag = "opencode" )] async fn oc_tool_list(Query(query): Query) -> impl IntoResponse { - let _directory = query.directory.as_deref(); if query.provider.is_none() || query.model.is_none() { return bad_request("provider and model are required").into_response(); } diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index f1d46c9..d0908b8 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -22,19 +22,11 @@ use reqwest::Client; use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError}; use sandbox_agent_universal_agent_schema::{ codex as codex_schema, convert_amp, convert_claude, convert_codex, convert_opencode, -<<<<<<< Updated upstream convert_pi, pi as pi_schema, turn_completed_event, AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource, FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason, SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, UniversalEvent, UniversalEventData, UniversalEventType, UniversalItem, -======= - convert_pi, pi as pi_schema, AgentUnparsedData, ContentPart, ErrorData, EventConversion, - EventSource, FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, - PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, - SessionEndReason, SessionEndedData, SessionStartedData, StderrOutput, TerminatedBy, - UniversalEvent, UniversalEventData, UniversalEventType, UniversalItem, ->>>>>>> Stashed changes }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -376,12 +368,7 @@ struct SessionState { model: Option, variant: Option, native_session_id: Option, -<<<<<<< Updated upstream pi_runtime: Option>, -======= - pi_runtime_mode: Option, - pi_state: Option, ->>>>>>> Stashed changes ended: bool, ended_exit_code: Option, ended_message: Option, @@ -440,12 +427,7 @@ impl SessionState { model: request.model.clone(), variant: request.variant.clone(), native_session_id: None, -<<<<<<< Updated upstream pi_runtime: None, -======= - pi_runtime_mode: None, - pi_state: None, ->>>>>>> Stashed changes ended: false, ended_exit_code: None, ended_message: None, @@ -981,327 +963,9 @@ impl CodexServer { } } -<<<<<<< Updated upstream /// Long-lived Pi RPC process dedicated to exactly one daemon session. struct PiSessionRuntime { /// Sender for writing to the process stdin. -======= -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum PiRpcDialect { - Mono, - CodingAgent, -} - -fn detect_pi_dialect(path: &PathBuf) -> PiRpcDialect { - let mut file = match std::fs::File::open(path) { - Ok(file) => file, - Err(_) => return PiRpcDialect::Mono, - }; - let mut buffer = [0u8; 256]; - let read = match file.read(&mut buffer) { - Ok(read) => read, - Err(_) => return PiRpcDialect::Mono, - }; - if read == 0 { - return PiRpcDialect::Mono; - } - let sample = &buffer[..read]; - if sample.starts_with(b"#!") { - return PiRpcDialect::CodingAgent; - } - if sample.iter().any(|byte| *byte == 0) { - return PiRpcDialect::Mono; - } - let is_text = sample.iter().all(|byte| byte.is_ascii()); - if is_text { - return PiRpcDialect::CodingAgent; - } - PiRpcDialect::Mono -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum PiRuntimeMode { - Shared, - PerSession, -} - -impl PiRuntimeMode { - fn as_str(self) -> &'static str { - match self { - PiRuntimeMode::Shared => "shared", - PiRuntimeMode::PerSession => "per_session", - } - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] -struct ParsedPiVersion { - major: u64, - minor: u64, - patch: u64, -} - -impl ParsedPiVersion { - fn parse(raw: &str) -> Option { - let trimmed = raw.trim(); - let start = trimmed.find(|ch: char| ch.is_ascii_digit())?; - let version_like = &trimmed[start..]; - let end = version_like - .find(|ch: char| !(ch.is_ascii_digit() || ch == '.')) - .unwrap_or(version_like.len()); - let core = &version_like[..end]; - if core.is_empty() { - return None; - } - let mut parts = core.split('.'); - let major = parts.next()?.parse::().ok()?; - let minor = parts.next().unwrap_or("0").parse::().ok()?; - let patch = parts.next().unwrap_or("0").parse::().ok()?; - Some(Self { - major, - minor, - patch, - }) - } -} - -#[derive(Debug, Clone, Copy)] -enum PiVersionConstraint { - Range { - min: ParsedPiVersion, - max_exclusive: Option, - }, -} - -impl PiVersionConstraint { - fn matches(self, version: ParsedPiVersion) -> bool { - match self { - PiVersionConstraint::Range { min, max_exclusive } => { - if version < min { - return false; - } - if let Some(max_exclusive) = max_exclusive { - version < max_exclusive - } else { - true - } - } - } - } -} - -#[derive(Debug, Clone, Copy)] -struct PiCapabilityAllowlistEntry { - dialect: PiRpcDialect, - version: PiVersionConstraint, - events_include_session_id: bool, - supports_multi_session_rpc: bool, -} - -const PI_SHARED_ALLOWLIST: &[PiCapabilityAllowlistEntry] = &[PiCapabilityAllowlistEntry { - dialect: PiRpcDialect::Mono, - version: PiVersionConstraint::Range { - min: ParsedPiVersion { - major: 0, - minor: 0, - patch: 0, - }, - max_exclusive: None, - }, - events_include_session_id: true, - supports_multi_session_rpc: true, -}]; - -#[derive(Debug, Clone)] -struct PiCapabilityKey { - dialect: PiRpcDialect, - version_raw: Option, - parsed_version: Option, - events_include_session_id: bool, - supports_multi_session_rpc: bool, -} - -impl PiCapabilityKey { - fn from_detected(dialect: PiRpcDialect, version_raw: Option) -> Self { - let parsed_version = version_raw.as_deref().and_then(ParsedPiVersion::parse); - Self { - dialect, - version_raw, - parsed_version, - events_include_session_id: matches!(dialect, PiRpcDialect::Mono), - supports_multi_session_rpc: matches!(dialect, PiRpcDialect::Mono), - } - } -} - -#[derive(Debug, Clone)] -struct PiRuntimeSelection { - mode: PiRuntimeMode, - capability: PiCapabilityKey, - reason: String, -} - -fn supports_shared_multiplexing(capability: &PiCapabilityKey) -> bool { - let Some(version) = capability.parsed_version else { - return false; - }; - PI_SHARED_ALLOWLIST.iter().any(|entry| { - entry.dialect == capability.dialect - && entry.version.matches(version) - && (!entry.events_include_session_id || capability.events_include_session_id) - && (!entry.supports_multi_session_rpc || capability.supports_multi_session_rpc) - }) -} - -fn pi_runtime_mode_override() -> Option { - let value = std::env::var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE").ok()?; - match value.trim().to_ascii_lowercase().as_str() { - "shared" => Some(PiRuntimeMode::Shared), - "per-session" | "per_session" | "persession" => Some(PiRuntimeMode::PerSession), - _ => None, - } -} - -#[derive(Debug)] -struct QueuedPiPrompt { - prompt: String, -} - -#[derive(Clone)] -struct SessionPiRuntime { - stdin_sender: mpsc::UnboundedSender, - pending_requests: Arc>>>, - next_id: Arc, - converter: Arc>, - child: Arc>>, -} - -impl std::fmt::Debug for SessionPiRuntime { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let pending = self - .pending_requests - .lock() - .map(|pending| pending.len()) - .unwrap_or(0); - f.debug_struct("SessionPiRuntime") - .field("pending_requests", &pending) - .field("next_id", &self.next_id.load(Ordering::SeqCst)) - .finish() - } -} - -impl SessionPiRuntime { - fn next_request_id(&self) -> i64 { - self.next_id.fetch_add(1, Ordering::SeqCst) - } - - fn send_request( - &self, - id: i64, - request: &Value, - ) -> Result, SandboxError> { - let line = serde_json::to_string(request).map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })?; - let (tx, rx) = oneshot::channel(); - { - let mut pending = self.pending_requests.lock().unwrap(); - pending.insert(id, tx); - } - if self.stdin_sender.send(line).is_err() { - let mut pending = self.pending_requests.lock().unwrap(); - pending.remove(&id); - return Err(SandboxError::StreamError { - message: "pi session stdin unavailable".to_string(), - }); - } - Ok(rx) - } - - fn complete_request(&self, id: i64, result: Value) { - let tx = { - let mut pending = self.pending_requests.lock().unwrap(); - pending.remove(&id) - }; - if let Some(tx) = tx { - let _ = tx.send(result); - } - } - - fn clear_pending_requests(&self) { - let mut pending = self.pending_requests.lock().unwrap(); - pending.clear(); - } - - fn kill_child(&self) { - if let Ok(mut guard) = self.child.lock() { - if let Some(child) = guard.as_mut() { - let _ = child.kill(); - } - *guard = None; - } - } -} - -struct SessionPiState { - runtime: SessionPiRuntime, - dialect: PiRpcDialect, - output_loop_running: bool, - turn_queue: VecDeque, - turn_active: bool, -} - -impl std::fmt::Debug for SessionPiState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SessionPiState") - .field("dialect", &self.dialect) - .field("output_loop_running", &self.output_loop_running) - .field("turn_queue_len", &self.turn_queue.len()) - .field("turn_active", &self.turn_active) - .finish() - } -} - -impl SessionPiState { - fn new(runtime: SessionPiRuntime, dialect: PiRpcDialect) -> Self { - Self { - runtime, - dialect, - output_loop_running: true, - turn_queue: VecDeque::new(), - turn_active: false, - } - } - - fn enqueue_prompt(&mut self, prompt: String) { - self.turn_queue.push_back(QueuedPiPrompt { prompt }); - } - - fn begin_next_turn_if_idle(&mut self) -> Option { - if self.turn_active { - return None; - } - let next = self.turn_queue.pop_front()?; - self.turn_active = true; - Some(next) - } - - fn mark_turn_complete(&mut self) { - self.turn_active = false; - } - - fn clear_for_terminate(&mut self) { - self.turn_queue.clear(); - self.turn_active = false; - self.output_loop_running = false; - self.runtime.clear_pending_requests(); - } -} - -/// Shared Pi RPC process that multiplexes sessions via newline-delimited JSON. -struct PiServer { - /// Sender for writing to the process stdin ->>>>>>> Stashed changes stdin_sender: mpsc::UnboundedSender, /// Pending RPC requests awaiting responses, keyed by request ID. pending_requests: std::sync::Mutex>>, @@ -1829,98 +1493,6 @@ impl AgentServerManager { )) } -<<<<<<< Updated upstream -======= - async fn spawn_pi_server( - self: &Arc, - ) -> Result< - ( - Arc, - mpsc::UnboundedReceiver, - Arc>>, - ), - SandboxError, - > { - let manager = self.agent_manager.clone(); - let log_dir = self.log_base_dir.clone(); - let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::(); - let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::(); - - let child = tokio::task::spawn_blocking( - move || -> Result<(std::process::Child, PiRpcDialect), SandboxError> { - let path = manager - .resolve_binary(AgentId::Pi) - .map_err(|err| map_spawn_error(AgentId::Pi, err))?; - let dialect = detect_pi_dialect(&path); - let mut command = std::process::Command::new(path); - let stderr = AgentServerLogs::new(log_dir, AgentId::Pi.as_str()).open()?; - command - .arg("--mode") - .arg("rpc") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(stderr); - - let mut child = command.spawn().map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })?; - - let stdin = child - .stdin - .take() - .ok_or_else(|| SandboxError::StreamError { - message: "pi stdin unavailable".to_string(), - })?; - let stdout = child - .stdout - .take() - .ok_or_else(|| SandboxError::StreamError { - message: "pi stdout unavailable".to_string(), - })?; - - let stdin_rx_mut = std::sync::Mutex::new(stdin_rx); - std::thread::spawn(move || { - let mut stdin = stdin; - let mut rx = stdin_rx_mut.lock().unwrap(); - while let Some(line) = rx.blocking_recv() { - if writeln!(stdin, "{line}").is_err() { - break; - } - if stdin.flush().is_err() { - break; - } - } - }); - - std::thread::spawn(move || { - let reader = BufReader::new(stdout); - for line in reader.lines() { - let Ok(line) = line else { break }; - if stdout_tx.send(line).is_err() { - break; - } - } - }); - - Ok((child, dialect)) - }, - ) - .await - .map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })??; - - let (child, dialect) = child; - let server = Arc::new(PiServer::new(stdin_tx, dialect)); - - Ok(( - server, - stdout_rx, - Arc::new(std::sync::Mutex::new(Some(child))), - )) - } - ->>>>>>> Stashed changes fn spawn_monitor_task( self: &Arc, agent: AgentId, @@ -2209,51 +1781,22 @@ impl SessionManager { model: session.model.clone(), variant: session.variant.clone(), native_session_id: None, - pi_runtime_mode: None, }; let thread_id = self.create_codex_thread(&session_id, &snapshot).await?; session.native_session_id = Some(thread_id); } if agent_id == AgentId::Pi { -<<<<<<< Updated upstream let pi = self .create_pi_session(&session_id, session.model.as_deref()) .await?; session.native_session_id = Some(pi.native_session_id); - session.pi_runtime = Some(pi.runtime); -======= - let selection = self.select_pi_runtime_mode().await?; - tracing::debug!( - dialect = ?selection.capability.dialect, - version = ?selection.capability.version_raw, - runtime_mode = selection.mode.as_str(), - reason = selection.reason.as_str(), - "selected Pi runtime mode" - ); - session.pi_runtime_mode = Some(selection.mode); - match selection.mode { - PiRuntimeMode::Shared => { - let snapshot = SessionSnapshot { - session_id: session_id.clone(), - agent: agent_id, - agent_mode: session.agent_mode.clone(), - permission_mode: session.permission_mode.clone(), - model: session.model.clone(), - variant: session.variant.clone(), - native_session_id: None, - pi_runtime_mode: Some(PiRuntimeMode::Shared), - }; - let native_id = self.create_pi_session(&session_id, &snapshot).await?; - session.native_session_id = Some(native_id); - } - PiRuntimeMode::PerSession => { - let (native_id, pi_state) = - self.create_pi_session_per_session(&session_id).await?; - session.native_session_id = Some(native_id); - session.pi_state = Some(pi_state); + session.pi_runtime = Some(pi.runtime.clone()); + if let Some(variant) = session.variant.as_deref() { + if let Err(err) = self.set_pi_thinking_level(&pi.runtime, variant).await { + pi.runtime.shutdown(); + return Err(err); } } ->>>>>>> Stashed changes } if agent_id == AgentId::Mock { session.native_session_id = Some(format!("mock-{session_id}")); @@ -2291,23 +1834,12 @@ impl SessionManager { } let native_session_id = session.native_session_id.clone(); -<<<<<<< Updated upstream let telemetry_agent_mode = session.agent_mode.clone(); let telemetry_permission_mode = session.permission_mode.clone(); let mut sessions = self.sessions.lock().await; sessions.push(session); drop(sessions); if agent_id == AgentId::Opencode || agent_id == AgentId::Codex { -======= - let pi_runtime_mode = session.pi_runtime_mode; - let mut sessions = self.sessions.lock().await; - sessions.push(session); - drop(sessions); - let should_register_pi_shared = - agent_id == AgentId::Pi && pi_runtime_mode == Some(PiRuntimeMode::Shared); - if agent_id == AgentId::Opencode || agent_id == AgentId::Codex || should_register_pi_shared - { ->>>>>>> Stashed changes self.server_manager .register_session(agent_id, &session_id, native_session_id.as_deref()) .await; @@ -2438,18 +1970,7 @@ impl SessionManager { return Ok(()); } if session_snapshot.agent == AgentId::Pi { - match session_snapshot - .pi_runtime_mode - .unwrap_or(PiRuntimeMode::Shared) - { - PiRuntimeMode::Shared => { - self.send_pi_prompt(&session_snapshot, &message).await?; - } - PiRuntimeMode::PerSession => { - self.enqueue_pi_prompt(&session_snapshot.session_id, message.clone()) - .await?; - } - } + self.send_pi_prompt(&session_snapshot, &message).await?; if !agent_supports_item_started(session_snapshot.agent) { let _ = self .emit_synthetic_assistant_start(&session_snapshot.session_id) @@ -2565,34 +2086,12 @@ impl SessionManager { session.record_conversions(vec![ended]); let agent = session.agent; let native_session_id = session.native_session_id.clone(); -<<<<<<< Updated upstream let pi_runtime = session.pi_runtime.take(); drop(sessions); if let Some(runtime) = pi_runtime { runtime.shutdown(); } if agent == AgentId::Opencode || agent == AgentId::Codex { -======= - let pi_runtime_mode = session.pi_runtime_mode; - let per_session_runtime = - if agent == AgentId::Pi && pi_runtime_mode == Some(PiRuntimeMode::PerSession) { - if let Some(pi_state) = session.pi_state.as_mut() { - pi_state.clear_for_terminate(); - Some(pi_state.runtime.clone()) - } else { - None - } - } else { - None - }; - drop(sessions); - if let Some(runtime) = per_session_runtime { - runtime.kill_child(); - } - let should_unregister_shared_pi = - agent == AgentId::Pi && pi_runtime_mode == Some(PiRuntimeMode::Shared); - if agent == AgentId::Opencode || agent == AgentId::Codex || should_unregister_shared_pi { ->>>>>>> Stashed changes self.server_manager .unregister_session(agent, &session_id, native_session_id.as_deref()) .await; @@ -3471,11 +2970,6 @@ impl SessionManager { reason.clone(), terminated_by.clone(), ); - if let Some(pi_state) = session.pi_state.as_mut() { - pi_state.turn_active = false; - pi_state.turn_queue.clear(); - pi_state.runtime.clear_pending_requests(); - } let (error_message, error_exit_code, error_stderr) = if reason == SessionEndReason::Error { (Some(message.to_string()), exit_code, stderr) @@ -3930,7 +3424,6 @@ impl SessionManager { Ok(()) } -<<<<<<< Updated upstream fn apply_pi_model_args(command: &mut std::process::Command, model: Option<&str>) { let Some(model) = model else { return; @@ -3942,608 +3435,6 @@ impl SessionManager { .arg("--model") .arg(model_id); return; -======= - async fn select_pi_runtime_mode(&self) -> Result { - let manager = self.agent_manager.clone(); - let (dialect, version_raw) = tokio::task::spawn_blocking( - move || -> Result<(PiRpcDialect, Option), ManagerError> { - let path = manager.resolve_binary(AgentId::Pi)?; - let dialect = detect_pi_dialect(&path); - let version_raw = manager.version(AgentId::Pi).ok().flatten(); - Ok((dialect, version_raw)) - }, - ) - .await - .map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })? - .map_err(|err| map_spawn_error(AgentId::Pi, err))?; - - let capability = PiCapabilityKey::from_detected(dialect, version_raw); - - let mode_override_raw = std::env::var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE").ok(); - if let Some(mode) = pi_runtime_mode_override() { - return Ok(PiRuntimeSelection { - mode, - capability, - reason: "forced by SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE".to_string(), - }); - } - if let Some(value) = mode_override_raw { - tracing::warn!( - override_value = value.as_str(), - "ignoring invalid SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE value" - ); - } - - if capability.parsed_version.is_none() { - return Ok(PiRuntimeSelection { - mode: PiRuntimeMode::PerSession, - capability, - reason: "unknown or unparseable version defaults to per-session".to_string(), - }); - } - - if supports_shared_multiplexing(&capability) { - return Ok(PiRuntimeSelection { - mode: PiRuntimeMode::Shared, - capability, - reason: "capability allowlist matched".to_string(), - }); - } - - Ok(PiRuntimeSelection { - mode: PiRuntimeMode::PerSession, - capability, - reason: "capability not allowlisted for shared multiplexing".to_string(), - }) - } - - async fn spawn_pi_session_runtime( - &self, - ) -> Result< - ( - SessionPiRuntime, - mpsc::UnboundedReceiver, - PiRpcDialect, - ), - SandboxError, - > { - let manager = self.agent_manager.clone(); - let log_dir = self.server_manager.log_base_dir.clone(); - let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::(); - let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::(); - - let (child, dialect) = tokio::task::spawn_blocking( - move || -> Result<(std::process::Child, PiRpcDialect), SandboxError> { - let path = manager - .resolve_binary(AgentId::Pi) - .map_err(|err| map_spawn_error(AgentId::Pi, err))?; - let dialect = detect_pi_dialect(&path); - let mut command = std::process::Command::new(path); - let stderr = AgentServerLogs::new(log_dir, AgentId::Pi.as_str()).open()?; - command - .arg("--mode") - .arg("rpc") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(stderr); - - let mut child = command.spawn().map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })?; - - let stdin = child - .stdin - .take() - .ok_or_else(|| SandboxError::StreamError { - message: "pi stdin unavailable".to_string(), - })?; - let stdout = child - .stdout - .take() - .ok_or_else(|| SandboxError::StreamError { - message: "pi stdout unavailable".to_string(), - })?; - - let stdin_rx_mut = std::sync::Mutex::new(stdin_rx); - std::thread::spawn(move || { - let mut stdin = stdin; - let mut rx = stdin_rx_mut.lock().unwrap(); - while let Some(line) = rx.blocking_recv() { - if writeln!(stdin, "{line}").is_err() { - break; - } - if stdin.flush().is_err() { - break; - } - } - }); - - std::thread::spawn(move || { - let reader = BufReader::new(stdout); - for line in reader.lines() { - let Ok(line) = line else { break }; - if stdout_tx.send(line).is_err() { - break; - } - } - }); - - Ok((child, dialect)) - }, - ) - .await - .map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })??; - - let runtime = SessionPiRuntime { - stdin_sender: stdin_tx, - pending_requests: Arc::new(std::sync::Mutex::new(HashMap::new())), - next_id: Arc::new(AtomicI64::new(1)), - converter: Arc::new(std::sync::Mutex::new( - convert_pi::PiEventConverter::default(), - )), - child: Arc::new(std::sync::Mutex::new(Some(child))), - }; - - Ok((runtime, stdout_rx, dialect)) - } - - fn spawn_pi_session_monitor( - self: &Arc, - session_id: String, - child: Arc>>, - ) { - let manager = Arc::clone(self); - tokio::spawn(async move { - loop { - let status = { - let mut guard = match child.lock() { - Ok(guard) => guard, - Err(_) => return, - }; - match guard.as_mut() { - Some(child) => match child.try_wait() { - Ok(status) => status, - Err(_) => None, - }, - None => return, - } - }; - - if let Some(status) = status { - manager - .handle_pi_session_process_exit(session_id.clone(), status) - .await; - break; - } - - sleep(Duration::from_millis(500)).await; - } - }); - } - - async fn handle_pi_session_process_exit( - &self, - session_id: String, - status: std::process::ExitStatus, - ) { - let should_emit_error = { - let mut sessions = self.sessions.lock().await; - let Some(session) = Self::session_mut(&mut sessions, &session_id) else { - return; - }; - if session.agent != AgentId::Pi - || session.pi_runtime_mode != Some(PiRuntimeMode::PerSession) - { - return; - } - if let Some(pi_state) = session.pi_state.as_mut() { - pi_state.output_loop_running = false; - pi_state.runtime.clear_pending_requests(); - pi_state.turn_queue.clear(); - pi_state.turn_active = false; - } - !session.ended - }; - - if !should_emit_error { - return; - } - - let message = format!("pi session process exited with status {:?}", status); - self.record_error( - &session_id, - message.clone(), - Some("pi_process_exit".to_string()), - None, - ) - .await; - let logs = self.read_agent_stderr(AgentId::Pi); - self.mark_session_ended( - &session_id, - status.code(), - &message, - SessionEndReason::Error, - TerminatedBy::Daemon, - logs, - ) - .await; - } - - async fn create_pi_session_per_session( - self: &Arc, - session_id: &str, - ) -> Result<(String, SessionPiState), SandboxError> { - let (runtime, stdout_rx, dialect) = self.spawn_pi_session_runtime().await?; - let session_for_output = session_id.to_string(); - let runtime_for_output = runtime.clone(); - let self_for_output = Arc::clone(self); - tokio::spawn(async move { - self_for_output - .handle_pi_session_output(session_for_output, runtime_for_output, stdout_rx) - .await; - }); - self.spawn_pi_session_monitor(session_id.to_string(), runtime.child.clone()); - - let request_id = runtime.next_request_id(); - let request = match dialect { - PiRpcDialect::Mono => json!({ - "type": "command", - "id": request_id, - "command": "new_session", - "params": { "sessionName": session_id } - }), - PiRpcDialect::CodingAgent => json!({ - "type": "new_session", - "id": request_id - }), - }; - - let native_session_id_result = async { - let rx = runtime.send_request(request_id, &request)?; - let result = tokio::time::timeout(Duration::from_secs(30), rx).await; - let response = match result { - Ok(Ok(response)) => response, - Ok(Err(_)) => { - return Err(SandboxError::StreamError { - message: "pi new_session request cancelled".to_string(), - }) - } - Err(_) => { - return Err(SandboxError::StreamError { - message: "pi new_session request timed out".to_string(), - }) - } - }; - if response - .get("success") - .and_then(Value::as_bool) - .is_some_and(|success| !success) - { - return Err(SandboxError::StreamError { - message: format!("pi new_session failed: {response}"), - }); - } - if response - .get("data") - .and_then(|value| value.get("cancelled")) - .and_then(Value::as_bool) - .is_some_and(|cancelled| cancelled) - { - return Err(SandboxError::StreamError { - message: "pi new_session request cancelled".to_string(), - }); - } - - if dialect == PiRpcDialect::CodingAgent { - let state_id = runtime.next_request_id(); - let request = json!({ - "type": "get_state", - "id": state_id - }); - let rx = runtime.send_request(state_id, &request)?; - let result = tokio::time::timeout(Duration::from_secs(30), rx).await; - let response = match result { - Ok(Ok(response)) => response, - Ok(Err(_)) => { - return Err(SandboxError::StreamError { - message: "pi get_state request cancelled".to_string(), - }) - } - Err(_) => { - return Err(SandboxError::StreamError { - message: "pi get_state request timed out".to_string(), - }) - } - }; - if response - .get("success") - .and_then(Value::as_bool) - .is_some_and(|success| !success) - { - return Err(SandboxError::StreamError { - message: format!("pi get_state failed: {response}"), - }); - } - let session_value = response.get("data").unwrap_or(&response); - return session_value - .get("sessionId") - .or_else(|| session_value.get("session_id")) - .and_then(Value::as_str) - .map(|value| value.to_string()) - .ok_or_else(|| SandboxError::StreamError { - message: "pi get_state response missing session id".to_string(), - }); - } - - let session_value = response.get("data").unwrap_or(&response); - session_value - .get("sessionId") - .or_else(|| session_value.get("session_id")) - .and_then(Value::as_str) - .map(|value| value.to_string()) - .ok_or_else(|| SandboxError::StreamError { - message: "pi new_session response missing session id".to_string(), - }) - } - .await; - - if native_session_id_result.is_err() { - runtime.clear_pending_requests(); - runtime.kill_child(); - } - - native_session_id_result.map(|native_session_id| { - let state = SessionPiState::new(runtime, dialect); - (native_session_id, state) - }) - } - - async fn session_native_session_id(&self, session_id: &str) -> Option { - let sessions = self.sessions.lock().await; - Self::session_ref(&sessions, session_id) - .and_then(|session| session.native_session_id.clone()) - } - - async fn handle_pi_session_output( - self: Arc, - session_id: String, - runtime: SessionPiRuntime, - mut stdout_rx: mpsc::UnboundedReceiver, - ) { - while let Some(line) = stdout_rx.recv().await { - let trimmed = line.trim(); - if trimmed.is_empty() { - continue; - } - - let value: Value = match serde_json::from_str(trimmed) { - Ok(v) => v, - Err(err) => { - self.record_pi_unparsed( - Some(session_id.clone()), - &err.to_string(), - Value::String(trimmed.to_string()), - ) - .await; - self.mark_pi_turn_terminal(&session_id).await; - continue; - } - }; - - let message_type = value.get("type").and_then(Value::as_str).unwrap_or(""); - if message_type == "response" { - let id = value - .get("id") - .and_then(Value::as_i64) - .or_else(|| value.get("id").and_then(Value::as_str)?.parse::().ok()); - if let Some(id) = id { - runtime.complete_request(id, value.clone()); - } - continue; - } - - let _event: pi_schema::RpcEvent = match serde_json::from_value(value.clone()) { - Ok(event) => event, - Err(err) => { - self.record_pi_unparsed( - Some(session_id.clone()), - &err.to_string(), - value.clone(), - ) - .await; - self.mark_pi_turn_terminal(&session_id).await; - continue; - } - }; - - let conversions = { - let mut converter = runtime.converter.lock().unwrap(); - converter.event_value_to_universal(&value) - }; - - let mut conversions = match conversions { - Ok(conversions) => conversions, - Err(err) => { - self.record_pi_unparsed(Some(session_id.clone()), &err, value.clone()) - .await; - self.mark_pi_turn_terminal(&session_id).await; - continue; - } - }; - - let native_session_id = match extract_pi_session_id(&value) { - Some(native) => Some(native), - None => self.session_native_session_id(&session_id).await, - }; - for conversion in &mut conversions { - if conversion.native_session_id.is_none() { - conversion.native_session_id = native_session_id.clone(); - } - conversion.raw = Some(value.clone()); - } - - let turn_terminal = pi_turn_terminal_detected(&conversions); - let _ = self.record_conversions(&session_id, conversions).await; - if turn_terminal { - self.mark_pi_turn_terminal(&session_id).await; - } - } - - let mut sessions = self.sessions.lock().await; - if let Some(session) = Self::session_mut(&mut sessions, &session_id) { - if let Some(pi_state) = session.pi_state.as_mut() { - pi_state.output_loop_running = false; - } - } - } - - async fn dispatch_pi_prompt( - &self, - runtime: SessionPiRuntime, - dialect: PiRpcDialect, - native_session_id: Option, - prompt: String, - ) -> Result<(), SandboxError> { - let native_session_id = native_session_id.ok_or_else(|| SandboxError::InvalidRequest { - message: "missing Pi session id".to_string(), - })?; - - let id = runtime.next_request_id(); - let request = match dialect { - PiRpcDialect::Mono => json!({ - "type": "command", - "id": id, - "command": "prompt", - "params": { - "sessionId": native_session_id, - "message": { - "role": "user", - "content": [{ "type": "text", "text": prompt }] - } - } - }), - PiRpcDialect::CodingAgent => json!({ - "type": "prompt", - "id": id, - "message": prompt - }), - }; - - runtime.send_request(id, &request)?; - Ok(()) - } - - async fn enqueue_pi_prompt( - self: &Arc, - session_id: &str, - prompt: String, - ) -> Result<(), SandboxError> { - let dispatch = { - let mut sessions = self.sessions.lock().await; - let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| { - SandboxError::SessionNotFound { - session_id: session_id.to_string(), - } - })?; - let pi_state = - session - .pi_state - .as_mut() - .ok_or_else(|| SandboxError::InvalidRequest { - message: "pi per-session runtime unavailable".to_string(), - })?; - pi_state.enqueue_prompt(prompt); - pi_state.begin_next_turn_if_idle().map(|queued| { - ( - pi_state.runtime.clone(), - pi_state.dialect, - session.native_session_id.clone(), - queued.prompt, - ) - }) - }; - - if let Some((runtime, dialect, native_session_id, prompt)) = dispatch { - if let Err(err) = self - .dispatch_pi_prompt(runtime, dialect, native_session_id, prompt) - .await - { - let mut sessions = self.sessions.lock().await; - if let Some(session) = Self::session_mut(&mut sessions, session_id) { - if let Some(pi_state) = session.pi_state.as_mut() { - pi_state.mark_turn_complete(); - } - } - return Err(err); - } - Ok(()) - } else { - Ok(()) - } - } - - async fn mark_pi_turn_terminal(&self, session_id: &str) { - let next_dispatch = { - let mut sessions = self.sessions.lock().await; - let Some(session) = Self::session_mut(&mut sessions, session_id) else { - return; - }; - let Some(pi_state) = session.pi_state.as_mut() else { - return; - }; - if !pi_state.turn_active { - return; - } - pi_state.mark_turn_complete(); - pi_state.begin_next_turn_if_idle().map(|queued| { - ( - pi_state.runtime.clone(), - pi_state.dialect, - session.native_session_id.clone(), - queued.prompt, - ) - }) - }; - - if let Some((runtime, dialect, native_session_id, prompt)) = next_dispatch { - if let Err(err) = self - .dispatch_pi_prompt(runtime, dialect, native_session_id, prompt) - .await - { - self.record_error( - session_id, - err.to_string(), - Some("pi_prompt_dispatch".to_string()), - None, - ) - .await; - let mut sessions = self.sessions.lock().await; - if let Some(session) = Self::session_mut(&mut sessions, session_id) { - if let Some(pi_state) = session.pi_state.as_mut() { - pi_state.mark_turn_complete(); - } - } - } - } - } - - /// Ensures a shared Pi RPC process is running. - async fn ensure_pi_server(self: &Arc) -> Result, SandboxError> { - let (server, receiver) = self.server_manager.ensure_pi_server().await?; - - if let Some(stdout_rx) = receiver { - let server_for_task = server.clone(); - let self_for_task = Arc::clone(self); - tokio::spawn(async move { - self_for_task - .handle_pi_server_output(server_for_task, stdout_rx) - .await; - }); ->>>>>>> Stashed changes } command.arg("--model").arg(model); } @@ -4709,7 +3600,6 @@ impl SessionManager { let value: Value = match serde_json::from_str(trimmed) { Ok(v) => v, Err(err) => { -<<<<<<< Updated upstream if self.is_active_pi_runtime(&session_id, &runtime).await { self.record_pi_unparsed( &session_id, @@ -4718,14 +3608,6 @@ impl SessionManager { ) .await; } -======= - self.record_pi_unparsed( - None, - &err.to_string(), - Value::String(trimmed.to_string()), - ) - .await; ->>>>>>> Stashed changes continue; } }; @@ -4742,65 +3624,22 @@ impl SessionManager { continue; } -<<<<<<< Updated upstream if !self.is_active_pi_runtime(&session_id, &runtime).await { -======= - let native_session_id = extract_pi_session_id(&value).or_else(|| { - if server.dialect() == PiRpcDialect::CodingAgent { - server.current_native_session_id() - } else { - None - } - }); - let session_id = native_session_id - .as_ref() - .and_then(|id| server.session_for_native(id)) - .or_else(|| { - if server.dialect() == PiRpcDialect::CodingAgent { - server.current_session_id() - } else { - None - } - }); - let Some(session_id) = session_id else { - self.record_pi_unparsed(None, "pi event missing session id", value.clone()) - .await; ->>>>>>> Stashed changes continue; } - let _event: pi_schema::RpcEvent = match serde_json::from_value(value.clone()) { + let event: pi_schema::RpcEvent = match serde_json::from_value(value.clone()) { Ok(event) => event, Err(err) => { -<<<<<<< Updated upstream self.record_pi_unparsed(&session_id, &err.to_string(), value.clone()) .await; -======= - self.record_pi_unparsed( - Some(session_id.clone()), - &err.to_string(), - value.clone(), - ) - .await; ->>>>>>> Stashed changes continue; } }; let conversions = { -<<<<<<< Updated upstream let mut converter = runtime.converter.lock().unwrap(); converter.event_to_universal(&event) -======= - let mut converters = server.converters.lock().unwrap(); - let key = native_session_id - .clone() - .unwrap_or_else(|| session_id.clone()); - let converter = converters - .entry(key) - .or_insert_with(convert_pi::PiEventConverter::default); - converter.event_value_to_universal(&value) ->>>>>>> Stashed changes }; let mut conversions = match conversions { @@ -4898,7 +3737,6 @@ impl SessionManager { message: "pi new_session request cancelled".to_string(), }) } -<<<<<<< Updated upstream Err(_) => { return Err(SandboxError::StreamError { message: "pi new_session request timed out".to_string(), @@ -4913,67 +3751,6 @@ impl SessionManager { return Err(SandboxError::StreamError { message: format!("pi new_session failed: {new_response}"), }); -======= - - let native_session_id = if server.dialect() == PiRpcDialect::CodingAgent { - let state_id = server.next_request_id(); - let request = json!({ - "type": "get_state", - "id": state_id - }); - let rx = server.send_request(state_id, &request).ok_or_else(|| { - SandboxError::StreamError { - message: "failed to send pi get_state request".to_string(), - } - })?; - let result = tokio::time::timeout(Duration::from_secs(30), rx).await; - let response = match result { - Ok(Ok(response)) => response, - Ok(Err(_)) => { - return Err(SandboxError::StreamError { - message: "pi get_state request cancelled".to_string(), - }) - } - Err(_) => { - return Err(SandboxError::StreamError { - message: "pi get_state request timed out".to_string(), - }) - } - }; - if response - .get("success") - .and_then(Value::as_bool) - .is_some_and(|success| !success) - { - return Err(SandboxError::StreamError { - message: format!("pi get_state failed: {response}"), - }); - } - let session_value = response.get("data").unwrap_or(&response); - session_value - .get("sessionId") - .or_else(|| session_value.get("session_id")) - .and_then(Value::as_str) - .ok_or_else(|| SandboxError::StreamError { - message: "pi get_state response missing session id".to_string(), - })? - .to_string() - } else { - let session_value = response.get("data").unwrap_or(&response); - session_value - .get("sessionId") - .or_else(|| session_value.get("session_id")) - .and_then(Value::as_str) - .ok_or_else(|| SandboxError::StreamError { - message: "pi new_session response missing session id".to_string(), - })? - .to_string() - }; - - server.register_session(native_session_id.clone(), session_id.to_string()); - - Ok(native_session_id) ->>>>>>> Stashed changes } if new_response .get("data") @@ -5071,6 +3848,10 @@ impl SessionManager { })? }; + if let Some(level) = session.variant.as_deref() { + self.set_pi_thinking_level(&runtime, level).await?; + } + let id = runtime.next_request_id(); let request = json!({ "type": "prompt", @@ -5087,6 +3868,49 @@ impl SessionManager { Ok(()) } + async fn set_pi_thinking_level( + &self, + runtime: &Arc, + level: &str, + ) -> Result<(), SandboxError> { + let id = runtime.next_request_id(); + let request = json!({ + "type": "set_thinking_level", + "id": id, + "level": level + }); + let response_rx = + runtime + .send_request(id, &request) + .ok_or_else(|| SandboxError::StreamError { + message: "failed to send pi set_thinking_level request".to_string(), + })?; + let response = tokio::time::timeout(Duration::from_secs(30), response_rx) + .await + .map_err(|_| SandboxError::StreamError { + message: "pi set_thinking_level request timed out".to_string(), + })? + .map_err(|_| SandboxError::StreamError { + message: "pi set_thinking_level request cancelled".to_string(), + })?; + if response + .get("success") + .and_then(Value::as_bool) + .is_some_and(|success| !success) + { + let detail = response + .get("error") + .cloned() + .or_else(|| response.get("data").and_then(|data| data.get("error")).cloned()) + .unwrap_or_else(|| response.clone()); + return Err(SandboxError::InvalidRequest { + message: format!("pi set_thinking_level failed for '{level}': {detail}"), + }); + } + + Ok(()) + } + async fn record_pi_unparsed(&self, session_id: &str, error: &str, raw: Value) { let _ = self .record_conversions(session_id, vec![agent_unparsed("pi", error, raw)]) @@ -6464,12 +5288,8 @@ fn agent_capabilities_for(agent: AgentId) -> AgentCapabilities { mcp_tools: false, streaming_deltas: true, item_started: true, -<<<<<<< Updated upstream - variants: false, + variants: true, shared_process: false, // one dedicated rpc process per session -======= - shared_process: true, // runtime-selected; shared only for allowlisted capabilities ->>>>>>> Stashed changes }, AgentId::Mock => AgentCapabilities { plan_mode: true, @@ -6616,9 +5436,19 @@ fn codex_variants() -> Vec { .collect() } +fn pi_variants() -> Vec { + vec!["off", "minimal", "low", "medium", "high", "xhigh"] + .into_iter() + .map(|value| value.to_string()) + .collect() +} + fn parse_pi_models_output(output: &str) -> AgentModelsResponse { let mut models = Vec::new(); let mut seen = HashSet::new(); + let mut provider_col: Option = None; + let mut model_col: Option = None; + let mut thinking_col: Option = None; for line in output.lines() { let trimmed = line.trim(); @@ -6626,23 +5456,60 @@ fn parse_pi_models_output(output: &str) -> AgentModelsResponse { continue; } - let mut parts = trimmed.split_whitespace(); - let Some(provider) = parts.next() else { + let parts = trimmed.split_whitespace().collect::>(); + if parts.len() < 2 { + continue; + } + if parts.iter().all(|part| { + part.chars() + .all(|ch| matches!(ch, '-' | '=' | '+' | '|' | ':')) + }) { + continue; + } + + let lower = parts + .iter() + .map(|part| part.to_ascii_lowercase()) + .collect::>(); + if let (Some(provider_idx), Some(model_idx)) = ( + lower.iter().position(|part| part == "provider"), + lower.iter().position(|part| part == "model"), + ) { + provider_col = Some(provider_idx); + model_col = Some(model_idx); + thinking_col = lower.iter().position(|part| part == "thinking"); + continue; + } + + let provider_idx = provider_col.unwrap_or(0); + let model_idx = model_col.unwrap_or(1); + let Some(provider) = parts.get(provider_idx).copied() else { continue; }; - let Some(model) = parts.next() else { + let Some(model) = parts.get(model_idx).copied() else { continue; }; - if provider.eq_ignore_ascii_case("provider") && model.eq_ignore_ascii_case("model") { - continue; - } if provider.chars().all(|ch| ch == '-' || ch == '=') && model.chars().all(|ch| ch == '-' || ch == '=') { continue; } + let thinking_value = thinking_col + .and_then(|idx| parts.get(idx).copied()) + .or_else(|| { + parts.iter().rev().copied().find(|value| { + value.eq_ignore_ascii_case("yes") || value.eq_ignore_ascii_case("no") + }) + }); + let supports_thinking = thinking_value.is_some_and(|value| value.eq_ignore_ascii_case("yes")); + let (variants, default_variant) = if supports_thinking { + (Some(pi_variants()), Some("medium".to_string())) + } else { + (Some(vec!["off".to_string()]), Some("off".to_string())) + }; + let id = format!("{provider}/{model}"); if !seen.insert(id.clone()) { continue; @@ -6651,8 +5518,8 @@ fn parse_pi_models_output(output: &str) -> AgentModelsResponse { models.push(AgentModelInfo { id, name: None, - variants: None, - default_variant: None, + variants, + default_variant, }); } @@ -7616,8 +6483,8 @@ fn parse_agent_line(agent: AgentId, line: &str, session_id: &str) -> Vec vec![agent_unparsed("amp", &err.to_string(), value)], }, - AgentId::Pi => match serde_json::from_value::(value.clone()) { - Ok(_) => convert_pi::event_value_to_universal(&value) + AgentId::Pi => match serde_json::from_value(value.clone()) { + Ok(event) => convert_pi::event_to_universal(&event) .unwrap_or_else(|err| vec![agent_unparsed("pi", &err, value)]), Err(err) => vec![agent_unparsed("pi", &err.to_string(), value)], }, @@ -7674,75 +6541,95 @@ fn extract_nested_string(value: &Value, path: &[&str]) -> Option { } #[cfg(test)] -<<<<<<< Updated upstream mod pi_model_parser_tests { use super::*; #[test] - fn parse_pi_models_output_parses_rows_from_table() { + fn parse_pi_models_output_parses_variants_from_thinking_column() { let output = r#" -provider model aliases -openai gpt-4.1 gpt-4.1-latest -anthropic claude-sonnet-4-5-20250929 sonnet +provider model aliases thinking +openai gpt-4.1 gpt-4.1-latest yes +anthropic claude-sonnet-4-5-20250929 sonnet no "#; let parsed = parse_pi_models_output(output); - let ids = parsed + let anthropic = parsed .models .iter() - .map(|model| model.id.clone()) - .collect::>(); + .find(|model| model.id == "anthropic/claude-sonnet-4-5-20250929") + .expect("anthropic model"); + assert_eq!(anthropic.variants.as_deref(), Some(&["off".to_string()][..])); + assert_eq!(anthropic.default_variant.as_deref(), Some("off")); - assert_eq!( - ids, - vec!["anthropic/claude-sonnet-4-5-20250929", "openai/gpt-4.1"] - ); + let openai = parsed + .models + .iter() + .find(|model| model.id == "openai/gpt-4.1") + .expect("openai model"); + assert_eq!(openai.variants.as_ref(), Some(&pi_variants())); + assert_eq!(openai.default_variant.as_deref(), Some("medium")); assert_eq!(parsed.default_model, None); } #[test] fn parse_pi_models_output_skips_blank_header_separator_and_malformed_rows() { let output = r#" -provider model aliases --------- ----- ------- +provider model aliases thinking +-------- ----- ------- -------- openai malformed-row -groq llama-3.3-70b-versatile alias +groq llama-3.3-70b-versatile alias no "#; let parsed = parse_pi_models_output(output); - let ids = parsed + let models = parsed .models .iter() - .map(|model| model.id.as_str()) + .map(|model| (model.id.as_str(), model.default_variant.as_deref())) .collect::>(); - assert_eq!(ids, vec!["groq/llama-3.3-70b-versatile"]); + assert_eq!( + models, + vec![("groq/llama-3.3-70b-versatile", Some("off"))] + ); } #[test] fn parse_pi_models_output_handles_model_ids_with_slashes() { - let output = "openrouter qwen/qwen3-32b"; + let output = "openrouter qwen/qwen3-32b alias yes"; let parsed = parse_pi_models_output(output); - let ids = parsed + let models = parsed .models .iter() - .map(|model| model.id.as_str()) + .map(|model| { + ( + model.id.as_str(), + model.variants.clone().unwrap_or_default(), + model.default_variant.as_deref(), + ) + }) .collect::>(); - assert_eq!(ids, vec!["openrouter/qwen/qwen3-32b"]); + assert_eq!( + models, + vec![( + "openrouter/qwen/qwen3-32b", + pi_variants(), + Some("medium") + )] + ); } #[test] fn parse_pi_models_output_deduplicates_and_sorts_stably() { let output = r#" -zeta z-model -alpha a-model -zeta z-model -beta b-model -alpha a-model +zeta z-model yes +alpha a-model no +zeta z-model no +beta b-model yes +alpha a-model yes "#; let parsed = parse_pi_models_output(output); @@ -7755,6 +6642,32 @@ alpha a-model assert_eq!(ids, vec!["alpha/a-model", "beta/b-model", "zeta/z-model"]); assert_eq!(parsed.default_model, None); } + + #[test] + fn parse_pi_models_output_defaults_to_off_when_thinking_column_missing() { + let output = r#" +provider model aliases +openai gpt-4.1 gpt-4.1-latest +"#; + + let parsed = parse_pi_models_output(output); + assert_eq!(parsed.models.len(), 1); + assert_eq!( + parsed.models[0].variants.as_deref(), + Some(&["off".to_string()][..]) + ); + assert_eq!(parsed.models[0].default_variant.as_deref(), Some("off")); + } +} + +#[cfg(test)] +mod agent_capabilities_tests { + use super::*; + + #[test] + fn pi_capabilities_enable_variants() { + assert!(agent_capabilities_for(AgentId::Pi).variants); + } } #[cfg(test)] @@ -7820,9 +6733,14 @@ mod pi_runtime_tests { assert_eq!(args, vec!["--model", "gpt-5.2-codex"]); } - async fn setup_pi_session( + async fn setup_pi_session_with_stdin( session_id: &str, - ) -> (Arc, Arc, TempDir) { + ) -> ( + Arc, + Arc, + mpsc::UnboundedReceiver, + TempDir, + ) { let temp_dir = TempDir::new().expect("temp dir"); let agent_manager = Arc::new(AgentManager::new(temp_dir.path()).expect("agent manager")); let http_client = Client::builder().no_proxy().build().expect("http client"); @@ -7842,7 +6760,7 @@ mod pi_runtime_tests { .server_manager .set_owner(Arc::downgrade(&session_manager)); - let (stdin_tx, _stdin_rx) = mpsc::unbounded_channel::(); + let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::(); let runtime = Arc::new(PiSessionRuntime::new( stdin_tx, Arc::new(std::sync::Mutex::new(None)), @@ -7858,6 +6776,14 @@ mod pi_runtime_tests { session.pi_runtime = Some(runtime.clone()); session_manager.sessions.lock().await.push(session); + (session_manager, runtime, stdin_rx, temp_dir) + } + + async fn setup_pi_session( + session_id: &str, + ) -> (Arc, Arc, TempDir) { + let (session_manager, runtime, _stdin_rx, temp_dir) = + setup_pi_session_with_stdin(session_id).await; (session_manager, runtime, temp_dir) } @@ -7889,6 +6815,128 @@ mod pi_runtime_tests { assert_eq!(result1.get("id").and_then(Value::as_i64), Some(id1)); } + #[tokio::test] + async fn pi_set_thinking_level_sends_rpc_and_waits_for_success() { + let (session_manager, runtime, mut stdin_rx, _temp_dir) = + setup_pi_session_with_stdin("pi-thinking-success").await; + let runtime_for_task = runtime.clone(); + let manager_for_task = session_manager.clone(); + let task = tokio::spawn(async move { + manager_for_task + .set_pi_thinking_level(&runtime_for_task, "high") + .await + }); + + let line = stdin_rx.recv().await.expect("set_thinking_level request"); + let request: Value = serde_json::from_str(&line).expect("json request"); + assert_eq!( + request.get("type").and_then(Value::as_str), + Some("set_thinking_level") + ); + assert_eq!(request.get("level").and_then(Value::as_str), Some("high")); + let request_id = request + .get("id") + .and_then(Value::as_i64) + .expect("request id"); + runtime.complete_request( + request_id, + json!({ + "type": "response", + "id": request_id, + "success": true + }), + ); + + task.await.expect("join").expect("set_thinking_level ok"); + } + + #[tokio::test] + async fn pi_set_thinking_level_maps_explicit_rpc_error_to_invalid_request() { + let (session_manager, runtime, mut stdin_rx, _temp_dir) = + setup_pi_session_with_stdin("pi-thinking-error").await; + let runtime_for_task = runtime.clone(); + let manager_for_task = session_manager.clone(); + let task = tokio::spawn(async move { + manager_for_task + .set_pi_thinking_level(&runtime_for_task, "invalid-level") + .await + }); + + let line = stdin_rx.recv().await.expect("set_thinking_level request"); + let request: Value = serde_json::from_str(&line).expect("json request"); + let request_id = request + .get("id") + .and_then(Value::as_i64) + .expect("request id"); + runtime.complete_request( + request_id, + json!({ + "type": "response", + "id": request_id, + "success": false, + "error": "unsupported level" + }), + ); + + let err = task + .await + .expect("join") + .expect_err("set_thinking_level should fail"); + match err { + SandboxError::InvalidRequest { message } => { + assert!(message.contains("unsupported level"), "{message}"); + } + other => panic!("expected InvalidRequest, got {other:?}"), + } + } + + #[tokio::test] + async fn send_pi_prompt_reapplies_variant_before_prompt() { + let (session_manager, runtime, mut stdin_rx, _temp_dir) = + setup_pi_session_with_stdin("pi-prompt-variant").await; + let snapshot = SessionSnapshot { + session_id: "pi-prompt-variant".to_string(), + agent: AgentId::Pi, + agent_mode: "build".to_string(), + permission_mode: "default".to_string(), + model: None, + variant: Some("high".to_string()), + native_session_id: Some("native-pi-prompt-variant".to_string()), + }; + let manager_for_task = session_manager.clone(); + let task = + tokio::spawn(async move { manager_for_task.send_pi_prompt(&snapshot, "Hello").await }); + + let set_line = stdin_rx.recv().await.expect("set_thinking_level request"); + let set_request: Value = serde_json::from_str(&set_line).expect("json request"); + assert_eq!( + set_request.get("type").and_then(Value::as_str), + Some("set_thinking_level") + ); + let set_id = set_request + .get("id") + .and_then(Value::as_i64) + .expect("set id"); + runtime.complete_request( + set_id, + json!({ + "type": "response", + "id": set_id, + "success": true + }), + ); + + let prompt_line = stdin_rx.recv().await.expect("prompt request"); + let prompt_request: Value = serde_json::from_str(&prompt_line).expect("json request"); + assert_eq!(prompt_request.get("type").and_then(Value::as_str), Some("prompt")); + assert_eq!( + prompt_request.get("message").and_then(Value::as_str), + Some("Hello") + ); + + task.await.expect("join").expect("send_pi_prompt ok"); + } + #[tokio::test] async fn pi_runtime_output_non_json_emits_agent_unparsed() { let (session_manager, runtime, _temp_dir) = setup_pi_session("pi-unparsed").await; @@ -7979,79 +7027,6 @@ mod pi_runtime_tests { }) .unwrap_or_default(); assert_eq!(text, "Hello"); -======= -mod pi_runtime_tests { - use super::*; - - fn dummy_pi_state() -> SessionPiState { - let (stdin_sender, _stdin_rx) = mpsc::unbounded_channel::(); - let runtime = SessionPiRuntime { - stdin_sender, - pending_requests: Arc::new(std::sync::Mutex::new(HashMap::new())), - next_id: Arc::new(AtomicI64::new(1)), - converter: Arc::new(std::sync::Mutex::new( - convert_pi::PiEventConverter::default(), - )), - child: Arc::new(std::sync::Mutex::new(None)), - }; - SessionPiState::new(runtime, PiRpcDialect::Mono) - } - - #[test] - fn pi_capability_allowlist_shared_for_known_mono_version() { - let capability = - PiCapabilityKey::from_detected(PiRpcDialect::Mono, Some("pi 1.2.3".to_string())); - assert!(supports_shared_multiplexing(&capability)); - } - - #[test] - fn pi_capability_allowlist_unknown_version_not_shared() { - let capability = PiCapabilityKey::from_detected(PiRpcDialect::Mono, None); - assert!(!supports_shared_multiplexing(&capability)); - } - - #[test] - fn pi_capability_allowlist_coding_agent_not_shared() { - let capability = - PiCapabilityKey::from_detected(PiRpcDialect::CodingAgent, Some("1.2.3".to_string())); - assert!(!supports_shared_multiplexing(&capability)); - } - - #[test] - fn pi_turn_queue_is_fifo_and_gated() { - let mut state = dummy_pi_state(); - state.enqueue_prompt("first".to_string()); - state.enqueue_prompt("second".to_string()); - state.enqueue_prompt("third".to_string()); - - let first = state.begin_next_turn_if_idle().expect("first prompt"); - assert_eq!(first.prompt, "first"); - assert!(state.begin_next_turn_if_idle().is_none()); - - state.mark_turn_complete(); - let second = state.begin_next_turn_if_idle().expect("second prompt"); - assert_eq!(second.prompt, "second"); - - state.mark_turn_complete(); - let third = state.begin_next_turn_if_idle().expect("third prompt"); - assert_eq!(third.prompt, "third"); - } - - #[test] - fn pi_turn_queue_clear_for_terminate_resets_state() { - let mut state = dummy_pi_state(); - state.enqueue_prompt("queued".to_string()); - let _ = state.begin_next_turn_if_idle(); - let (tx, _rx) = oneshot::channel::(); - state.runtime.pending_requests.lock().unwrap().insert(1, tx); - - state.clear_for_terminate(); - - assert!(state.turn_queue.is_empty()); - assert!(!state.turn_active); - assert!(!state.output_loop_running); - assert!(state.runtime.pending_requests.lock().unwrap().is_empty()); ->>>>>>> Stashed changes } } @@ -9304,23 +8279,6 @@ fn is_turn_terminal(event: &UniversalEvent, _agent: AgentId) -> bool { } } -fn pi_turn_terminal_detected(conversions: &[EventConversion]) -> bool { - conversions - .iter() - .any(|conversion| match conversion.event_type { - UniversalEventType::Error - | UniversalEventType::AgentUnparsed - | UniversalEventType::SessionEnded => true, - UniversalEventType::ItemCompleted => { - let UniversalEventData::Item(ItemEventData { item }) = &conversion.data else { - return false; - }; - item.kind == ItemKind::Message && matches!(item.role, Some(ItemRole::Assistant)) - } - _ => false, - }) -} - fn status_label(item: &UniversalItem) -> Option<&str> { if item.kind != ItemKind::Status { return None; @@ -9349,7 +8307,6 @@ struct SessionSnapshot { model: Option, variant: Option, native_session_id: Option, - pi_runtime_mode: Option, } impl From<&SessionState> for SessionSnapshot { @@ -9362,7 +8319,6 @@ impl From<&SessionState> for SessionSnapshot { model: session.model.clone(), variant: session.variant.clone(), native_session_id: session.native_session_id.clone(), - pi_runtime_mode: session.pi_runtime_mode, } } } diff --git a/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs b/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs index e6cb772..993836c 100644 --- a/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs +++ b/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs @@ -1,32 +1,6 @@ // Pi RPC integration tests (gated via SANDBOX_TEST_PI + PATH). include!("../common/http.rs"); -<<<<<<< Updated upstream -======= -struct EnvVarGuard { - key: String, - previous: Option, -} - -impl Drop for EnvVarGuard { - fn drop(&mut self) { - match &self.previous { - Some(value) => std::env::set_var(&self.key, value), - None => std::env::remove_var(&self.key), - } - } -} - -fn set_env_var(key: &str, value: &str) -> EnvVarGuard { - let previous = std::env::var(key).ok(); - std::env::set_var(key, value); - EnvVarGuard { - key: key.to_string(), - previous, - } -} - ->>>>>>> Stashed changes fn pi_test_config() -> Option { let configs = match test_agents_from_env() { Ok(configs) => configs, @@ -39,96 +13,9 @@ fn pi_test_config() -> Option { .into_iter() .find(|config| config.agent == AgentId::Pi) } -<<<<<<< Updated upstream -======= - -async fn create_pi_session_checked(app: &Router, session_id: &str) -> Value { - let (status, payload) = send_json( - app, - Method::POST, - &format!("/v1/sessions/{session_id}"), - Some(json!({ - "agent": "pi", - "permissionMode": test_permission_mode(AgentId::Pi), - })), - ) - .await; - assert_eq!(status, StatusCode::OK, "create pi session {session_id}"); - payload -} - -async fn poll_events_until_assistant_count( - app: &Router, - session_id: &str, - expected_assistant_messages: usize, - timeout: Duration, -) -> Vec { - let start = Instant::now(); - let mut offset = 0u64; - let mut events = Vec::new(); - - while start.elapsed() < timeout { - let path = format!("/v1/sessions/{session_id}/events?offset={offset}&limit=200"); - let (status, payload) = send_json(app, Method::GET, &path, None).await; - assert_eq!(status, StatusCode::OK, "poll events"); - let new_events = payload - .get("events") - .and_then(Value::as_array) - .cloned() - .unwrap_or_default(); - - if !new_events.is_empty() { - if let Some(last) = new_events - .last() - .and_then(|event| event.get("sequence")) - .and_then(Value::as_u64) - { - offset = last; - } - events.extend(new_events); - } - - if events.iter().any(is_unparsed_event) { - break; - } - - let assistant_count = events - .iter() - .filter(|event| is_assistant_message(event)) - .count(); - if assistant_count >= expected_assistant_messages { - break; - } - - if events.iter().any(is_error_event) { - break; - } - - tokio::time::sleep(Duration::from_millis(800)).await; - } - - events -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn pi_rpc_session_and_stream() { - let Some(config) = pi_test_config() else { - return; - }; ->>>>>>> Stashed changes async fn create_pi_session_with_native(app: &Router, session_id: &str) -> String { - let (status, payload) = send_json( - app, - Method::POST, - &format!("/v1/sessions/{session_id}"), - Some(json!({ - "agent": "pi", - "permissionMode": test_permission_mode(AgentId::Pi), - })), - ) - .await; - assert_eq!(status, StatusCode::OK, "create pi session"); + let payload = create_pi_session(app, session_id, None, None).await; let native_session_id = payload .get("native_session_id") .and_then(Value::as_str) @@ -141,6 +28,53 @@ async fn create_pi_session_with_native(app: &Router, session_id: &str) -> String native_session_id } +async fn create_pi_session( + app: &Router, + session_id: &str, + model: Option<&str>, + variant: Option<&str>, +) -> Value { + let mut body = Map::new(); + body.insert("agent".to_string(), json!("pi")); + body.insert( + "permissionMode".to_string(), + json!(test_permission_mode(AgentId::Pi)), + ); + if let Some(model) = model { + body.insert("model".to_string(), json!(model)); + } + if let Some(variant) = variant { + body.insert("variant".to_string(), json!(variant)); + } + let (status, payload) = send_json( + app, + Method::POST, + &format!("/v1/sessions/{session_id}"), + Some(Value::Object(body)), + ) + .await; + assert_eq!(status, StatusCode::OK, "create pi session"); + payload +} + +async fn fetch_pi_models(app: &Router) -> Vec { + let (status, payload) = send_json(app, Method::GET, "/v1/agents/pi/models", None).await; + assert_eq!(status, StatusCode::OK, "pi models endpoint"); + payload + .get("models") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default() +} + +fn model_variant_ids(model: &Value) -> Vec<&str> { + model + .get("variants") + .and_then(Value::as_array) + .map(|values| values.iter().filter_map(Value::as_str).collect::>()) + .unwrap_or_default() +} + fn assert_strictly_increasing_sequences(events: &[Value], label: &str) { let mut last_sequence = 0u64; for event in events { @@ -156,7 +90,6 @@ fn assert_strictly_increasing_sequences(events: &[Value], label: &str) { } } -<<<<<<< Updated upstream fn assert_all_events_for_session(events: &[Value], session_id: &str) { for event in events { let event_session_id = event @@ -197,11 +130,6 @@ fn assert_item_started_ids_unique(events: &[Value], label: &str) { #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn pi_rpc_session_and_stream() { -======= -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn pi_rpc_multi_session_create_per_session_mode() { - let _mode_guard = set_env_var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE", "per-session"); ->>>>>>> Stashed changes let Some(config) = pi_test_config() else { return; }; @@ -210,7 +138,6 @@ async fn pi_rpc_multi_session_create_per_session_mode() { let _guard = apply_credentials(&config.credentials); install_agent(&app.app, config.agent).await; -<<<<<<< Updated upstream let session_id = "pi-rpc-session"; let _native_session_id = create_pi_session_with_native(&app.app, session_id).await; @@ -228,34 +155,90 @@ async fn pi_rpc_multi_session_create_per_session_mode() { } #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn pi_parallel_sessions_turns() { -======= - let first = create_pi_session_checked(&app.app, "pi-multi-a").await; - let second = create_pi_session_checked(&app.app, "pi-multi-b").await; - - let first_native = first - .get("native_session_id") - .and_then(Value::as_str) - .unwrap_or(""); - let second_native = second - .get("native_session_id") - .and_then(Value::as_str) - .unwrap_or(""); - assert!(!first_native.is_empty(), "first native session id missing"); - assert!( - !second_native.is_empty(), - "second native session id missing" - ); - assert_ne!( - first_native, second_native, - "per-session mode should allocate independent native session ids" - ); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn pi_rpc_per_session_queue_and_termination_isolation() { - let _mode_guard = set_env_var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE", "per-session"); ->>>>>>> Stashed changes +async fn pi_variant_high_applies_for_thinking_model() { + let Some(config) = pi_test_config() else { + return; + }; + + let app = TestApp::new(); + let _guard = apply_credentials(&config.credentials); + install_agent(&app.app, config.agent).await; + + let models = fetch_pi_models(&app.app).await; + let thinking_model = models.iter().find_map(|model| { + let model_id = model.get("id").and_then(Value::as_str)?; + let variants = model_variant_ids(model); + if variants.contains(&"high") { + Some(model_id.to_string()) + } else { + None + } + }); + let Some(model_id) = thinking_model else { + eprintln!("Skipping PI variant thinking-model test: no model advertises high"); + return; + }; + + let session_id = "pi-variant-thinking-high"; + create_pi_session(&app.app, session_id, Some(&model_id), Some("high")).await; + + let events = read_turn_stream_events(&app.app, session_id, Duration::from_secs(120)).await; + assert!(!events.is_empty(), "no events from pi thinking-variant stream"); + assert!( + !events.iter().any(is_unparsed_event), + "agent.unparsed event encountered for thinking-variant session" + ); + assert!( + should_stop(&events), + "thinking-variant turn stream did not reach a terminal event" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pi_variant_high_on_non_thinking_model_uses_pi_native_clamping() { + let Some(config) = pi_test_config() else { + return; + }; + + let app = TestApp::new(); + let _guard = apply_credentials(&config.credentials); + install_agent(&app.app, config.agent).await; + + let models = fetch_pi_models(&app.app).await; + let non_thinking_model = models.iter().find_map(|model| { + let model_id = model.get("id").and_then(Value::as_str)?; + let variants = model_variant_ids(model); + if variants == vec!["off"] { + Some(model_id.to_string()) + } else { + None + } + }); + let Some(model_id) = non_thinking_model else { + eprintln!("Skipping PI non-thinking variant test: no off-only model reported"); + return; + }; + + let session_id = "pi-variant-nonthinking-high"; + create_pi_session(&app.app, session_id, Some(&model_id), Some("high")).await; + + let events = read_turn_stream_events(&app.app, session_id, Duration::from_secs(120)).await; + assert!( + !events.is_empty(), + "no events from pi non-thinking variant stream" + ); + assert!( + !events.iter().any(is_unparsed_event), + "agent.unparsed event encountered for non-thinking variant session" + ); + assert!( + should_stop(&events), + "non-thinking variant turn stream did not reach a terminal event" + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pi_parallel_sessions_turns() { let Some(config) = pi_test_config() else { return; }; @@ -264,7 +247,6 @@ async fn pi_rpc_per_session_queue_and_termination_isolation() { let _guard = apply_credentials(&config.credentials); install_agent(&app.app, config.agent).await; -<<<<<<< Updated upstream let session_a = "pi-parallel-a"; let session_b = "pi-parallel-b"; create_pi_session_with_native(&app.app, session_a).await; @@ -423,105 +405,3 @@ async fn pi_runtime_restart_scope() { ); assert_all_events_for_session(&events_b, session_b); } -======= - create_pi_session_checked(&app.app, "pi-queue-a").await; - create_pi_session_checked(&app.app, "pi-queue-b").await; - - let status = send_status( - &app.app, - Method::POST, - "/v1/sessions/pi-queue-a/messages", - Some(json!({ "message": "Reply with exactly FIRST." })), - ) - .await; - assert_eq!(status, StatusCode::NO_CONTENT, "send first prompt"); - - let status = send_status( - &app.app, - Method::POST, - "/v1/sessions/pi-queue-a/messages", - Some(json!({ "message": "Reply with exactly SECOND." })), - ) - .await; - assert_eq!(status, StatusCode::NO_CONTENT, "enqueue second prompt"); - - let status = send_status( - &app.app, - Method::POST, - "/v1/sessions/pi-queue-b/messages", - Some(json!({ "message": PROMPT })), - ) - .await; - assert_eq!( - status, - StatusCode::NO_CONTENT, - "send prompt to sibling session" - ); - - let events_a = - poll_events_until_assistant_count(&app.app, "pi-queue-a", 2, Duration::from_secs(240)) - .await; - let events_b = - poll_events_until_assistant_count(&app.app, "pi-queue-b", 1, Duration::from_secs(180)) - .await; - - assert!( - !events_a.iter().any(is_unparsed_event), - "session a emitted agent.unparsed" - ); - assert!( - !events_b.iter().any(is_unparsed_event), - "session b emitted agent.unparsed" - ); - let assistant_count_a = events_a - .iter() - .filter(|event| is_assistant_message(event)) - .count(); - let assistant_count_b = events_b - .iter() - .filter(|event| is_assistant_message(event)) - .count(); - assert!( - assistant_count_a >= 2, - "expected at least two assistant completions for queued session, got {assistant_count_a}" - ); - assert!( - assistant_count_b >= 1, - "expected assistant completion for sibling session, got {assistant_count_b}" - ); - - let status = send_status( - &app.app, - Method::POST, - "/v1/sessions/pi-queue-a/terminate", - Some(json!({})), - ) - .await; - assert_eq!(status, StatusCode::NO_CONTENT, "terminate first session"); - - let status = send_status( - &app.app, - Method::POST, - "/v1/sessions/pi-queue-b/messages", - Some(json!({ "message": PROMPT })), - ) - .await; - assert_eq!( - status, - StatusCode::NO_CONTENT, - "sibling session should continue after termination" - ); - - let events_b_after = - poll_events_until_assistant_count(&app.app, "pi-queue-b", 2, Duration::from_secs(180)) - .await; - let assistant_count_b_after = events_b_after - .iter() - .filter(|event| is_assistant_message(event)) - .count(); - assert!( - assistant_count_b_after >= 2, - "expected additional assistant completion for sibling session after termination" - ); -} ->>>>>>> Stashed changes diff --git a/server/packages/sandbox-agent/tests/http/agent_endpoints.rs b/server/packages/sandbox-agent/tests/http/agent_endpoints.rs index b0fa269..7c7623d 100644 --- a/server/packages/sandbox-agent/tests/http/agent_endpoints.rs +++ b/server/packages/sandbox-agent/tests/http/agent_endpoints.rs @@ -186,3 +186,76 @@ async fn agent_endpoints_snapshots() { }); } } + +fn pi_test_config() -> Option { + let configs = match test_agents_from_env() { + Ok(configs) => configs, + Err(err) => { + eprintln!("Skipping PI endpoint variant test: {err}"); + return None; + } + }; + configs + .into_iter() + .find(|config| config.agent == AgentId::Pi) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pi_capabilities_and_models_expose_variants() { + let Some(config) = pi_test_config() else { + return; + }; + + let app = TestApp::new(); + let _guard = apply_credentials(&config.credentials); + install_agent(&app.app, AgentId::Pi).await; + + let capabilities = fetch_capabilities(&app.app).await; + let pi_caps = capabilities.get("pi").expect("pi capabilities"); + assert!(pi_caps.variants, "pi capabilities should enable variants"); + + let (status, payload) = send_json(&app.app, Method::GET, "/v1/agents/pi/models", None).await; + assert_eq!(status, StatusCode::OK, "pi models endpoint"); + let models = payload + .get("models") + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(); + assert!(!models.is_empty(), "pi models should not be empty"); + + let full_levels = vec!["off", "minimal", "low", "medium", "high", "xhigh"]; + for model in models { + let model_id = model + .get("id") + .and_then(Value::as_str) + .unwrap_or(""); + let variants = model + .get("variants") + .and_then(Value::as_array) + .expect("pi model variants"); + let default_variant = model + .get("defaultVariant") + .and_then(Value::as_str) + .expect("pi model defaultVariant"); + let variant_ids = variants + .iter() + .filter_map(Value::as_str) + .collect::>(); + assert!(!variant_ids.is_empty(), "pi model {model_id} has no variants"); + if variant_ids == vec!["off"] { + assert_eq!( + default_variant, "off", + "pi model {model_id} expected default off for non-thinking model" + ); + } else { + assert_eq!( + variant_ids, full_levels, + "pi model {model_id} expected full thinking levels" + ); + assert_eq!( + default_variant, "medium", + "pi model {model_id} expected medium default for thinking model" + ); + } + } +}