diff --git a/docs/openapi.json b/docs/openapi.json index 59f66fe..f10b073 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -665,7 +665,8 @@ "commandExecution", "fileChanges", "mcpTools", - "streamingDeltas" + "streamingDeltas", + "sharedProcess" ], "properties": { "commandExecution": { @@ -701,6 +702,10 @@ "sessionLifecycle": { "type": "boolean" }, + "sharedProcess": { + "type": "boolean", + "description": "Whether this agent uses a shared long-running server process (vs per-turn subprocess)" + }, "streamingDeltas": { "type": "boolean" }, @@ -762,6 +767,14 @@ "type": "string", "nullable": true }, + "serverStatus": { + "allOf": [ + { + "$ref": "#/components/schemas/ServerStatusInfo" + } + ], + "nullable": true + }, "version": { "type": "string", "nullable": true @@ -1388,6 +1401,46 @@ "private" ] }, + "ServerStatus": { + "type": "string", + "description": "Status of a shared server process for an agent", + "enum": [ + "running", + "stopped", + "error" + ] + }, + "ServerStatusInfo": { + "type": "object", + "required": [ + "status", + "restartCount" + ], + "properties": { + "baseUrl": { + "type": "string", + "nullable": true + }, + "lastError": { + "type": "string", + "nullable": true + }, + "restartCount": { + "type": "integer", + "format": "int64", + "minimum": 0 + }, + "status": { + "$ref": "#/components/schemas/ServerStatus" + }, + "uptimeMs": { + "type": "integer", + "format": "int64", + "nullable": true, + "minimum": 0 + } + } + }, "SessionEndReason": { "type": "string", "enum": [ diff --git a/sdks/typescript/src/generated/openapi.ts b/sdks/typescript/src/generated/openapi.ts index bbdc5ea..910415c 100644 --- a/sdks/typescript/src/generated/openapi.ts +++ b/sdks/typescript/src/generated/openapi.ts @@ -65,6 +65,8 @@ export interface components { questions: boolean; reasoning: boolean; sessionLifecycle: boolean; + /** @description Whether this agent uses a shared long-running server process (vs per-turn subprocess) */ + sharedProcess: boolean; streamingDeltas: boolean; textMessages: boolean; toolCalls: boolean; @@ -82,6 +84,7 @@ export interface components { id: string; installed: boolean; path?: string | null; + serverStatus?: components["schemas"]["ServerStatusInfo"] | null; version?: string | null; }; AgentInstallRequest: { @@ -235,6 +238,20 @@ export interface components { QuestionStatus: "requested" | "answered" | "rejected"; /** @enum {string} */ ReasoningVisibility: "public" | "private"; + /** + * @description Status of a shared server process for an agent + * @enum {string} + */ + ServerStatus: "running" | "stopped" | "error"; + ServerStatusInfo: { + baseUrl?: string | null; + lastError?: string | null; + /** Format: int64 */ + restartCount: number; + status: components["schemas"]["ServerStatus"]; + /** Format: int64 */ + uptimeMs?: number | null; + }; /** @enum {string} */ SessionEndReason: "completed" | "error" | "terminated"; SessionEndedData: { diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index 99c067a..c1ab3c7 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -2,10 +2,11 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::convert::Infallible; use std::io::{BufRead, BufReader, Write}; use std::net::TcpListener; +use std::path::PathBuf; use std::process::Stdio; use std::sync::atomic::{AtomicI64, Ordering}; -use std::sync::Arc; -use std::time::Duration; +use std::sync::{Arc, Weak}; +use std::time::{Duration, Instant}; use axum::extract::{Path, Query, State}; use axum::http::{HeaderMap, HeaderValue, Request, StatusCode}; @@ -42,6 +43,7 @@ use sandbox_agent_agent_management::agents::{ use sandbox_agent_agent_management::credentials::{ extract_all_credentials, CredentialExtractionOptions, ExtractedCredentials, }; +use crate::agent_server_logs::AgentServerLogs; const MOCK_EVENT_DELAY_MS: u64 = 200; @@ -56,6 +58,9 @@ impl AppState { pub fn new(auth: AuthConfig, agent_manager: AgentManager) -> Self { let agent_manager = Arc::new(agent_manager); let session_manager = Arc::new(SessionManager::new(agent_manager.clone())); + session_manager + .server_manager + .set_owner(Arc::downgrade(&session_manager)); Self { auth, agent_manager, @@ -80,8 +85,10 @@ impl AuthConfig { } pub fn build_router(state: AppState) -> Router { - let shared = Arc::new(state); + build_router_with_state(Arc::new(state)).0 +} +pub fn build_router_with_state(shared: Arc) -> (Router, Arc) { let mut v1_router = Router::new() .route("/health", get(get_health)) .route("/agents", get(list_agents)) @@ -109,7 +116,10 @@ pub fn build_router(state: AppState) -> Router { .with_state(shared.clone()); if shared.auth.token.is_some() { - v1_router = v1_router.layer(axum::middleware::from_fn_with_state(shared, require_token)); + v1_router = v1_router.layer(axum::middleware::from_fn_with_state( + shared.clone(), + require_token, + )); } let mut router = Router::new().nest("/v1", v1_router); @@ -118,7 +128,11 @@ pub fn build_router(state: AppState) -> Router { router = router.merge(ui::router()); } - router.layer(TraceLayer::new_for_http()) + (router.layer(TraceLayer::new_for_http()), shared) +} + +pub async fn shutdown_servers(state: &Arc) { + state.session_manager.server_manager.shutdown().await; } #[derive(OpenApi)] @@ -147,6 +161,8 @@ pub fn build_router(state: AppState) -> Router { AgentCapabilities, AgentInfo, AgentListResponse, + ServerStatus, + ServerStatusInfo, SessionInfo, SessionListResponse, HealthResponse, @@ -564,19 +580,43 @@ impl SessionState { } #[derive(Debug)] -struct SessionManager { - agent_manager: Arc, - sessions: Mutex>, - opencode_server: Mutex>, - codex_server: Mutex>>, - http_client: Client, +enum ManagedServerKind { + Http { base_url: String }, + Stdio { server: Arc }, } #[derive(Debug)] -struct OpencodeServer { - base_url: String, - #[allow(dead_code)] - child: Option, +struct ManagedServer { + kind: ManagedServerKind, + child: Arc>>, + status: ServerStatus, + start_time: Option, + restart_count: u64, + last_error: Option, + shutdown_requested: bool, + instance_id: u64, +} + +#[derive(Debug)] +struct AgentServerManager { + agent_manager: Arc, + servers: Mutex>, + sessions: Mutex>>, + native_sessions: Mutex>>, + http_client: Client, + log_base_dir: PathBuf, + auto_restart: bool, + owner: std::sync::Mutex>>, + #[cfg(feature = "test-utils")] + restart_notifier: Mutex>>, +} + +#[derive(Debug)] +struct SessionManager { + agent_manager: Arc, + sessions: Mutex>, + server_manager: Arc, + http_client: Client, } /// Shared Codex app-server process that handles multiple sessions via JSON-RPC. @@ -663,6 +703,16 @@ impl CodexServer { fn set_initialized(&self) { *self.initialized.lock().unwrap() = true; } + + fn clear_pending(&self) { + let mut pending = self.pending_requests.lock().unwrap(); + pending.clear(); + } + + fn clear_threads(&self) { + let mut sessions = self.thread_sessions.lock().unwrap(); + sessions.clear(); + } } struct SessionSubscription { @@ -670,13 +720,596 @@ struct SessionSubscription { receiver: broadcast::Receiver, } +impl ManagedServer { + fn base_url(&self) -> Option { + match &self.kind { + ManagedServerKind::Http { base_url } => Some(base_url.clone()), + ManagedServerKind::Stdio { .. } => None, + } + } + + fn status_info(&self) -> ServerStatusInfo { + let uptime_ms = self + .start_time + .map(|started| started.elapsed().as_millis() as u64); + ServerStatusInfo { + status: self.status.clone(), + base_url: self.base_url(), + uptime_ms, + restart_count: self.restart_count, + last_error: self.last_error.clone(), + } + } +} + +impl AgentServerManager { + fn new( + agent_manager: Arc, + http_client: Client, + log_base_dir: PathBuf, + auto_restart: bool, + ) -> Self { + Self { + agent_manager, + servers: Mutex::new(HashMap::new()), + sessions: Mutex::new(HashMap::new()), + native_sessions: Mutex::new(HashMap::new()), + http_client, + log_base_dir, + auto_restart, + owner: std::sync::Mutex::new(None), + #[cfg(feature = "test-utils")] + restart_notifier: Mutex::new(None), + } + } + + fn set_owner(&self, owner: Weak) { + *self.owner.lock().expect("owner lock") = Some(owner); + } + + #[cfg(feature = "test-utils")] + async fn set_owner_async(&self, owner: Weak) { + *self.owner.lock().expect("owner lock") = Some(owner); + } + + #[cfg(feature = "test-utils")] + async fn set_restart_notifier(&self, tx: mpsc::UnboundedSender) { + *self.restart_notifier.lock().await = Some(tx); + } + + async fn register_session( + &self, + agent: AgentId, + session_id: &str, + native_session_id: Option<&str>, + ) { + let mut sessions = self.sessions.lock().await; + sessions + .entry(agent) + .or_insert_with(HashSet::new) + .insert(session_id.to_string()); + drop(sessions); + if let Some(native_session_id) = native_session_id { + let mut natives = self.native_sessions.lock().await; + natives + .entry(agent) + .or_insert_with(HashMap::new) + .insert(native_session_id.to_string(), session_id.to_string()); + } + } + + async fn unregister_session( + &self, + agent: AgentId, + session_id: &str, + native_session_id: Option<&str>, + ) { + let mut clear_agent = false; + let mut sessions_map = self.sessions.lock().await; + if let Some(session_set) = sessions_map.get_mut(&agent) { + session_set.remove(session_id); + if session_set.is_empty() { + sessions_map.remove(&agent); + clear_agent = true; + } + } + drop(sessions_map); + if let Some(native_session_id) = native_session_id { + let mut natives = self.native_sessions.lock().await; + if let Some(natives) = natives.get_mut(&agent) { + natives.remove(native_session_id); + } + } + if clear_agent { + let mut natives = self.native_sessions.lock().await; + natives.remove(&agent); + } + } + + async fn clear_mappings(&self, agent: AgentId) { + let mut sessions = self.sessions.lock().await; + sessions.remove(&agent); + drop(sessions); + let mut natives = self.native_sessions.lock().await; + natives.remove(&agent); + } + + async fn status_snapshot(&self) -> HashMap { + let servers = self.servers.lock().await; + servers + .iter() + .map(|(agent, server)| (*agent, server.status_info())) + .collect() + } + + async fn ensure_http_server(self: &Arc, agent: AgentId) -> Result { + { + let servers = self.servers.lock().await; + if let Some(server) = servers.get(&agent) { + if matches!(server.status, ServerStatus::Running) { + if let Some(base_url) = server.base_url() { + return Ok(base_url); + } + } + } + } + + let (base_url, child) = self.spawn_http_server(agent).await?; + let restart_count = { + let servers = self.servers.lock().await; + servers + .get(&agent) + .map(|server| server.restart_count + 1) + .unwrap_or(0) + }; + + { + let mut servers = self.servers.lock().await; + if let Some(existing) = servers.get(&agent) { + if matches!(existing.status, ServerStatus::Running) { + if let Ok(mut guard) = child.lock() { + if let Some(child) = guard.as_mut() { + let _ = child.kill(); + } + } + if let Some(base_url) = existing.base_url() { + return Ok(base_url); + } + } + } + + servers.insert( + agent, + ManagedServer { + kind: ManagedServerKind::Http { + base_url: base_url.clone(), + }, + child: child.clone(), + status: ServerStatus::Running, + start_time: Some(Instant::now()), + restart_count, + last_error: None, + shutdown_requested: false, + instance_id: restart_count, + }, + ); + } + + if let Err(err) = self.wait_for_http_server(&base_url).await { + if let Ok(mut guard) = child.lock() { + if let Some(child) = guard.as_mut() { + let _ = child.kill(); + } + } + self.update_server_error(agent, err.to_string()).await; + return Err(err); + } + + self.spawn_monitor_task(agent, restart_count, child); + + Ok(base_url) + } + + async fn ensure_stdio_server( + self: &Arc, + agent: AgentId, + ) -> Result<(Arc, Option>), SandboxError> { + { + let servers = self.servers.lock().await; + if let Some(server) = servers.get(&agent) { + if matches!(server.status, ServerStatus::Running) { + if let ManagedServerKind::Stdio { server } = &server.kind { + return Ok((server.clone(), None)); + } + } + } + } + + let (server, stdout_rx, child) = self.spawn_stdio_server(agent).await?; + let restart_count = { + let servers = self.servers.lock().await; + servers + .get(&agent) + .map(|server| server.restart_count + 1) + .unwrap_or(0) + }; + + { + let mut servers = self.servers.lock().await; + if let Some(existing) = servers.get(&agent) { + if matches!(existing.status, ServerStatus::Running) { + if let Ok(mut guard) = child.lock() { + if let Some(child) = guard.as_mut() { + let _ = child.kill(); + } + } + if let ManagedServerKind::Stdio { server } = &existing.kind { + return Ok((server.clone(), None)); + } + } + } + servers.insert( + agent, + ManagedServer { + kind: ManagedServerKind::Stdio { + server: server.clone(), + }, + child: child.clone(), + status: ServerStatus::Running, + start_time: Some(Instant::now()), + restart_count, + last_error: None, + shutdown_requested: false, + instance_id: restart_count, + }, + ); + } + + self.spawn_monitor_task(agent, restart_count, child); + + Ok((server, Some(stdout_rx))) + } + + async fn shutdown(&self) { + let mut servers = self.servers.lock().await; + for server in servers.values_mut() { + server.shutdown_requested = true; + server.status = ServerStatus::Stopped; + server.start_time = None; + if let Ok(mut guard) = server.child.lock() { + if let Some(child) = guard.as_mut() { + let _ = child.kill(); + } + } + if let ManagedServerKind::Stdio { server } = &server.kind { + server.clear_pending(); + server.clear_threads(); + } + } + } + + async fn wait_for_http_server(&self, base_url: &str) -> Result<(), SandboxError> { + let endpoints = ["health", "healthz", "app/agents", "agents"]; + for _ in 0..20 { + for endpoint in endpoints { + let url = format!("{base_url}/{endpoint}"); + if let Ok(response) = self.http_client.get(&url).send().await { + if response.status().is_success() { + return Ok(()); + } + } + } + sleep(Duration::from_millis(150)).await; + } + Err(SandboxError::StreamError { + message: "server health check failed".to_string(), + }) + } + + async fn spawn_http_server( + self: &Arc, + agent: AgentId, + ) -> Result<(String, Arc>>), SandboxError> { + let manager = self.agent_manager.clone(); + let log_dir = self.log_base_dir.clone(); + let (base_url, child) = + tokio::task::spawn_blocking(move || -> Result<(String, std::process::Child), SandboxError> { + let path = manager + .resolve_binary(agent) + .map_err(|err| map_spawn_error(agent, err))?; + let port = find_available_port()?; + let mut command = std::process::Command::new(path); + let stderr = AgentServerLogs::new(log_dir, agent.as_str()).open()?; + command + .arg("serve") + .arg("--port") + .arg(port.to_string()) + .stdout(Stdio::null()) + .stderr(stderr); + let child = command.spawn().map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + Ok((format!("http://127.0.0.1:{port}"), child)) + }) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })??; + + Ok(( + base_url, + Arc::new(std::sync::Mutex::new(Some(child))), + )) + } + + async fn spawn_stdio_server( + self: &Arc, + agent: AgentId, + ) -> Result< + ( + Arc, + mpsc::UnboundedReceiver, + Arc>>, + ), + SandboxError, + > { + let manager = self.agent_manager.clone(); + let log_dir = self.log_base_dir.clone(); + let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::(); + let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::(); + + let child = tokio::task::spawn_blocking(move || -> Result { + let path = manager + .resolve_binary(agent) + .map_err(|err| map_spawn_error(agent, err))?; + let mut command = std::process::Command::new(path); + let stderr = AgentServerLogs::new(log_dir, agent.as_str()).open()?; + command + .arg("app-server") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(stderr); + + let mut child = command.spawn().map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + + let stdin = child.stdin.take().ok_or_else(|| SandboxError::StreamError { + message: "codex stdin unavailable".to_string(), + })?; + let stdout = child.stdout.take().ok_or_else(|| SandboxError::StreamError { + message: "codex stdout unavailable".to_string(), + })?; + + let stdin_rx_mut = std::sync::Mutex::new(stdin_rx); + std::thread::spawn(move || { + let mut stdin = stdin; + let mut rx = stdin_rx_mut.lock().unwrap(); + while let Some(line) = rx.blocking_recv() { + if writeln!(stdin, "{line}").is_err() { + break; + } + if stdin.flush().is_err() { + break; + } + } + }); + + std::thread::spawn(move || { + let reader = BufReader::new(stdout); + for line in reader.lines() { + let Ok(line) = line else { break }; + if stdout_tx.send(line).is_err() { + break; + } + } + }); + + Ok(child) + }) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })??; + + let server = Arc::new(CodexServer::new(stdin_tx)); + + Ok(( + server, + stdout_rx, + Arc::new(std::sync::Mutex::new(Some(child))), + )) + } + + fn spawn_monitor_task( + self: &Arc, + agent: AgentId, + instance_id: u64, + child: Arc>>, + ) { + let manager = Arc::clone(self); + tokio::spawn(async move { + loop { + let status = { + let mut guard = match child.lock() { + Ok(guard) => guard, + Err(_) => return, + }; + match guard.as_mut() { + Some(child) => match child.try_wait() { + Ok(status) => status, + Err(_) => None, + }, + None => return, + } + }; + + if let Some(status) = status { + manager + .handle_process_exit(agent, instance_id, status) + .await; + break; + } + + sleep(Duration::from_millis(500)).await; + } + }); + } + + async fn handle_process_exit( + self: &Arc, + agent: AgentId, + instance_id: u64, + status: std::process::ExitStatus, + ) { + let exit_code = status.code(); + let message = format!("agent server exited with status {:?}", status); + let mut codex_server = None; + let mut shutdown_requested = false; + { + let mut servers = self.servers.lock().await; + if let Some(server) = servers.get_mut(&agent) { + if server.instance_id != instance_id { + return; + } + shutdown_requested = server.shutdown_requested; + server.status = if shutdown_requested { + ServerStatus::Stopped + } else { + ServerStatus::Error + }; + server.start_time = None; + if !shutdown_requested { + server.last_error = Some(message.clone()); + } + if let Ok(mut guard) = server.child.lock() { + *guard = None; + } + if let ManagedServerKind::Stdio { server } = &server.kind { + codex_server = Some(server.clone()); + } + } + } + + if let Some(server) = codex_server { + server.clear_pending(); + server.clear_threads(); + } + + if shutdown_requested { + self.clear_mappings(agent).await; + return; + } + + self.notify_sessions_of_error(agent, &message, exit_code) + .await; + + if self.auto_restart { + #[cfg(feature = "test-utils")] + { + if let Some(tx) = self.restart_notifier.lock().await.as_ref() { + let _ = tx.send(agent); + } + } + let manager = Arc::clone(self); + tokio::spawn(async move { + let _ = manager.ensure_server_for_restart(agent).await; + }); + } + } + + async fn ensure_server_for_restart(self: Arc, agent: AgentId) -> Result<(), SandboxError> { + sleep(Duration::from_millis(500)).await; + match agent { + AgentId::Opencode => { + let _ = self.ensure_http_server(agent).await?; + } + AgentId::Codex => { + let (server, receiver) = self.ensure_stdio_server(agent).await?; + if let Some(stdout_rx) = receiver { + let owner = self.owner.lock().expect("owner lock").clone(); + if let Some(owner) = owner.as_ref().and_then(|weak| weak.upgrade()) { + let owner_clone = owner.clone(); + let server_clone = server.clone(); + tokio::spawn(async move { + owner_clone + .handle_codex_server_output(server_clone, stdout_rx) + .await; + }); + let _ = owner.codex_server_initialize(&server).await; + } + } + } + _ => {} + } + Ok(()) + } + + async fn notify_sessions_of_error( + &self, + agent: AgentId, + message: &str, + exit_code: Option, + ) { + let session_ids = { + let sessions = self.sessions.lock().await; + sessions + .get(&agent) + .cloned() + .unwrap_or_default() + .into_iter() + .collect::>() + }; + + let owner = { self.owner.lock().expect("owner lock").clone() }; + if let Some(owner) = owner.and_then(|weak| weak.upgrade()) { + for session_id in session_ids { + owner + .record_error( + &session_id, + message.to_string(), + Some("server_exit".to_string()), + None, + ) + .await; + owner + .mark_session_ended( + &session_id, + exit_code, + message, + SessionEndReason::Error, + TerminatedBy::Daemon, + ) + .await; + } + } + + self.clear_mappings(agent).await; + } + + async fn update_server_error(&self, agent: AgentId, message: String) { + let mut servers = self.servers.lock().await; + if let Some(server) = servers.get_mut(&agent) { + server.status = ServerStatus::Error; + server.start_time = None; + server.last_error = Some(message); + } + } +} + impl SessionManager { fn new(agent_manager: Arc) -> Self { + let log_base_dir = default_log_dir(); + let server_manager = Arc::new(AgentServerManager::new( + agent_manager.clone(), + Client::new(), + log_base_dir, + true, + )); Self { agent_manager, sessions: Mutex::new(Vec::new()), - opencode_server: Mutex::new(None), - codex_server: Mutex::new(None), + server_manager, http_client: Client::new(), } } @@ -785,6 +1418,11 @@ impl SessionManager { let mut sessions = self.sessions.lock().await; sessions.push(session); drop(sessions); + if agent_id == AgentId::Opencode || agent_id == AgentId::Codex { + self.server_manager + .register_session(agent_id, &session_id, native_session_id.as_deref()) + .await; + } if agent_id == AgentId::Opencode { self.ensure_opencode_stream(session_id).await?; @@ -913,6 +1551,14 @@ impl SessionManager { .synthetic() .with_native_session(session.native_session_id.clone()); session.record_conversions(vec![ended]); + let agent = session.agent; + let native_session_id = session.native_session_id.clone(); + drop(sessions); + if agent == AgentId::Opencode || agent == AgentId::Codex { + self.server_manager + .unregister_session(agent, &session_id, native_session_id.as_deref()) + .await; + } Ok(()) } @@ -1145,7 +1791,7 @@ impl SessionManager { } async fn reply_permission( - &self, + self: &Arc, session_id: &str, permission_id: &str, reply: PermissionReply, @@ -1176,12 +1822,7 @@ impl SessionManager { if agent == AgentId::Codex { // Use the shared Codex server to send the permission reply - let server = { - let guard = self.codex_server.lock().await; - guard.clone().ok_or_else(|| SandboxError::InvalidRequest { - message: "codex server not running".to_string(), - })? - }; + let server = self.ensure_codex_server().await?; let pending = pending_permission .clone() @@ -1727,150 +2368,30 @@ impl SessionManager { } async fn ensure_opencode_server(&self) -> Result { - { - let guard = self.opencode_server.lock().await; - if let Some(server) = guard.as_ref() { - return Ok(server.base_url.clone()); - } - } - - let manager = self.agent_manager.clone(); - let server = - tokio::task::spawn_blocking(move || -> Result { - let path = manager - .resolve_binary(AgentId::Opencode) - .map_err(|err| map_spawn_error(AgentId::Opencode, err))?; - let port = find_available_port()?; - let mut command = std::process::Command::new(path); - command - .arg("serve") - .arg("--port") - .arg(port.to_string()) - .stdout(Stdio::null()) - .stderr(Stdio::null()); - let child = command.spawn().map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })?; - Ok(OpencodeServer { - base_url: format!("http://127.0.0.1:{port}"), - child: Some(child), - }) - }) + self.server_manager + .ensure_http_server(AgentId::Opencode) .await - .map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })??; - - { - let mut guard = self.opencode_server.lock().await; - if let Some(existing) = guard.as_ref() { - return Ok(existing.base_url.clone()); - } - *guard = Some(server); - } - let guard = self.opencode_server.lock().await; - guard - .as_ref() - .map(|server| server.base_url.clone()) - .ok_or_else(|| SandboxError::StreamError { - message: "OpenCode server missing".to_string(), - }) } /// Ensures a shared Codex app-server process is running. /// Spawns the process if not already running, sets up stdin/stdout tasks, /// and performs the initialize handshake if needed. async fn ensure_codex_server(self: &Arc) -> Result, SandboxError> { - // Fast path: return existing server - { - let guard = self.codex_server.lock().await; - if let Some(server) = guard.as_ref() { - return Ok(server.clone()); - } + let (server, receiver) = self + .server_manager + .ensure_stdio_server(AgentId::Codex) + .await?; + + if let Some(stdout_rx) = receiver { + let server_for_task = server.clone(); + let self_for_task = Arc::clone(self); + tokio::spawn(async move { + self_for_task + .handle_codex_server_output(server_for_task, stdout_rx) + .await; + }); } - // Spawn the codex app-server process - let manager = self.agent_manager.clone(); - let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::(); - let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::(); - - let _child = tokio::task::spawn_blocking(move || -> Result { - let path = manager - .resolve_binary(AgentId::Codex) - .map_err(|err| map_spawn_error(AgentId::Codex, err))?; - let mut command = std::process::Command::new(path); - command - .arg("app-server") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()); - - let mut child = command.spawn().map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })?; - - let stdin = child.stdin.take().ok_or_else(|| SandboxError::StreamError { - message: "codex stdin unavailable".to_string(), - })?; - let stdout = child.stdout.take().ok_or_else(|| SandboxError::StreamError { - message: "codex stdout unavailable".to_string(), - })?; - - // Stdin writer task - let stdin_rx_mut = std::sync::Mutex::new(stdin_rx); - std::thread::spawn(move || { - let mut stdin = stdin; - let mut rx = stdin_rx_mut.lock().unwrap(); - while let Some(line) = rx.blocking_recv() { - if writeln!(stdin, "{line}").is_err() { - break; - } - if stdin.flush().is_err() { - break; - } - } - }); - - // Stdout reader task - std::thread::spawn(move || { - let reader = BufReader::new(stdout); - for line in reader.lines() { - let Ok(line) = line else { break }; - if stdout_tx.send(line).is_err() { - break; - } - } - }); - - Ok(child) - }) - .await - .map_err(|err| SandboxError::StreamError { - message: err.to_string(), - })??; - - let server = Arc::new(CodexServer::new(stdin_tx)); - - // Store server before spawning notification handler - { - let mut guard = self.codex_server.lock().await; - if let Some(existing) = guard.as_ref() { - // Another task beat us to it - return Ok(existing.clone()); - } - *guard = Some(server.clone()); - } - - // Spawn notification routing task - let server_for_task = server.clone(); - let self_for_task = Arc::clone(self); - tokio::spawn(async move { - self_for_task - .handle_codex_server_output(server_for_task, stdout_rx) - .await; - }); - - // Perform initialize handshake self.codex_server_initialize(&server).await?; Ok(server) @@ -2408,6 +2929,21 @@ pub enum ServerStatus { Running, /// Server is not currently running Stopped, + /// Server is running but unhealthy + Error, +} + +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ServerStatusInfo { + pub status: ServerStatus, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub base_url: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub uptime_ms: Option, + pub restart_count: u64, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub last_error: Option, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] @@ -2422,7 +2958,7 @@ pub struct AgentInfo { pub capabilities: AgentCapabilities, /// Status of the shared server process (only present for agents with shared_process=true) #[serde(default, skip_serializing_if = "Option::is_none")] - pub server_status: Option, + pub server_status: Option, } #[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)] @@ -2628,20 +3164,7 @@ async fn list_agents( State(state): State>, ) -> Result, ApiError> { let manager = state.agent_manager.clone(); - - // Check shared server status for agents that use them - let codex_server_running = state - .session_manager - .codex_server - .lock() - .await - .is_some(); - let opencode_server_running = state - .session_manager - .opencode_server - .lock() - .await - .is_some(); + let server_statuses = state.session_manager.server_manager.status_snapshot().await; let agents = tokio::task::spawn_blocking(move || { all_agents() @@ -2654,16 +3177,18 @@ async fn list_agents( // Add server_status for agents with shared processes let server_status = if capabilities.shared_process { - let running = match agent_id { - AgentId::Codex => codex_server_running, - AgentId::Opencode => opencode_server_running, - _ => false, - }; - Some(if running { - ServerStatus::Running - } else { - ServerStatus::Stopped - }) + Some( + server_statuses + .get(&agent_id) + .cloned() + .unwrap_or(ServerStatusInfo { + status: ServerStatus::Stopped, + base_url: None, + uptime_ms: None, + restart_count: 0, + last_error: None, + }), + ) } else { None }; @@ -3966,6 +4491,237 @@ fn extract_nested_string(value: &Value, path: &[&str]) -> Option { current.as_str().map(|s| s.to_string()) } +#[cfg(feature = "test-utils")] +pub mod test_utils { + use super::*; + use std::process::Command; + use tempfile::TempDir; + + pub struct TestHarness { + session_manager: Arc, + _temp_dir: TempDir, + } + + impl TestHarness { + pub async fn new() -> Self { + let temp_dir = TempDir::new().expect("temp dir"); + let agent_manager = Arc::new(AgentManager::new(temp_dir.path()).expect("agent manager")); + let session_manager = Arc::new(SessionManager::new(agent_manager)); + session_manager + .server_manager + .set_owner_async(Arc::downgrade(&session_manager)) + .await; + Self { + session_manager, + _temp_dir: temp_dir, + } + } + + pub async fn register_session( + &self, + agent: AgentId, + session_id: &str, + native_session_id: Option<&str>, + ) { + self.session_manager + .server_manager + .register_session(agent, session_id, native_session_id) + .await; + } + + pub async fn unregister_session( + &self, + agent: AgentId, + session_id: &str, + native_session_id: Option<&str>, + ) { + self.session_manager + .server_manager + .unregister_session(agent, session_id, native_session_id) + .await; + } + + pub async fn has_session_mapping( + &self, + agent: AgentId, + session_id: &str, + ) -> bool { + let sessions = self.session_manager.server_manager.sessions.lock().await; + sessions + .get(&agent) + .map(|set| set.contains(session_id)) + .unwrap_or(false) + } + + pub async fn native_mapping( + &self, + agent: AgentId, + native_session_id: &str, + ) -> Option { + let natives = self + .session_manager + .server_manager + .native_sessions + .lock() + .await; + natives + .get(&agent) + .and_then(|map| map.get(native_session_id)) + .cloned() + } + + pub async fn insert_session( + &self, + session_id: &str, + agent: AgentId, + native_session_id: Option<&str>, + ) { + let request = CreateSessionRequest { + agent: agent.as_str().to_string(), + agent_mode: None, + permission_mode: None, + model: None, + variant: None, + agent_version: None, + }; + let mut session = SessionState::new(session_id.to_string(), agent, &request) + .expect("session"); + session.native_session_id = native_session_id.map(|id| id.to_string()); + self.session_manager.sessions.lock().await.push(session); + } + + pub async fn insert_stdio_server( + &self, + agent: AgentId, + child: Option, + instance_id: u64, + ) -> Arc>> { + let (stdin_tx, _stdin_rx) = mpsc::unbounded_channel::(); + let server = Arc::new(CodexServer::new(stdin_tx)); + let child = Arc::new(std::sync::Mutex::new(child)); + self.session_manager.server_manager.servers.lock().await.insert( + agent, + ManagedServer { + kind: ManagedServerKind::Stdio { server }, + child: child.clone(), + status: ServerStatus::Running, + start_time: Some(Instant::now()), + restart_count: 0, + last_error: None, + shutdown_requested: false, + instance_id, + }, + ); + child + } + + pub async fn insert_http_server(&self, agent: AgentId, instance_id: u64) { + self.session_manager.server_manager.servers.lock().await.insert( + agent, + ManagedServer { + kind: ManagedServerKind::Http { + base_url: "http://127.0.0.1:1".to_string(), + }, + child: Arc::new(std::sync::Mutex::new(None)), + status: ServerStatus::Running, + start_time: Some(Instant::now()), + restart_count: 0, + last_error: None, + shutdown_requested: false, + instance_id, + }, + ); + } + + pub async fn handle_process_exit( + &self, + agent: AgentId, + instance_id: u64, + status: std::process::ExitStatus, + ) { + self.session_manager + .server_manager + .handle_process_exit(agent, instance_id, status) + .await; + } + + pub async fn shutdown(&self) { + self.session_manager.server_manager.shutdown().await; + } + + pub async fn server_status(&self, agent: AgentId) -> Option { + let servers = self.session_manager.server_manager.servers.lock().await; + servers.get(&agent).map(|server| server.status.clone()) + } + + pub async fn server_last_error(&self, agent: AgentId) -> Option { + let servers = self.session_manager.server_manager.servers.lock().await; + servers + .get(&agent) + .and_then(|server| server.last_error.clone()) + } + + pub async fn session_ended(&self, session_id: &str) -> bool { + let sessions = self.session_manager.sessions.lock().await; + sessions + .iter() + .find(|session| session.session_id == session_id) + .map(|session| session.ended) + .unwrap_or(false) + } + + pub async fn session_end_reason(&self, session_id: &str) -> Option { + let sessions = self.session_manager.sessions.lock().await; + sessions + .iter() + .find(|session| session.session_id == session_id) + .and_then(|session| session.ended_reason.clone()) + } + + pub async fn set_restart_notifier(&self, tx: mpsc::UnboundedSender) { + self.session_manager + .server_manager + .set_restart_notifier(tx) + .await; + } + } + + pub fn spawn_sleep_process() -> std::process::Child { + #[cfg(windows)] + { + Command::new("cmd") + .args(["/C", "ping", "127.0.0.1", "-n", "60"]) + .spawn() + .expect("spawn sleep") + } + #[cfg(not(windows))] + { + Command::new("sh") + .args(["-c", "sleep 60"]) + .spawn() + .expect("spawn sleep") + } + } + + #[cfg(unix)] + pub fn exit_status(code: i32) -> std::process::ExitStatus { + use std::os::unix::process::ExitStatusExt; + std::process::ExitStatus::from_raw(code << 8) + } + + #[cfg(windows)] + pub fn exit_status(code: i32) -> std::process::ExitStatus { + use std::os::windows::process::ExitStatusExt; + std::process::ExitStatus::from_raw(code as u32) + } +} + +fn default_log_dir() -> PathBuf { + dirs::data_dir() + .map(|dir| dir.join("sandbox-agent").join("logs").join("servers")) + .unwrap_or_else(|| PathBuf::from(".").join(".sandbox-agent").join("logs").join("servers")) +} + fn find_available_port() -> Result { for port in 4200..=4300 { if TcpListener::bind(("127.0.0.1", port)).is_ok() { @@ -4014,6 +4770,7 @@ impl SseAccumulator { } } + fn parse_opencode_modes(value: &Value) -> Vec { let mut modes = Vec::new(); let mut seen = HashSet::new();