diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index d61c494..a2d1efe 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -978,6 +978,7 @@ pub(crate) struct SessionManager { struct ModelCatalogState { models: HashMap, in_flight: HashMap>, + codex_unavailable_models: HashSet, } /// Shared Codex app-server process that handles multiple sessions via JSON-RPC. @@ -987,11 +988,15 @@ struct CodexServer { /// Sender for writing to the process stdin stdin_sender: mpsc::UnboundedSender, /// Pending JSON-RPC requests awaiting responses, keyed by request ID - pending_requests: std::sync::Mutex>>, + pending_requests: std::sync::Mutex>>, + /// Optional mapping from request ID to session ID for routing request-scoped errors + request_sessions: std::sync::Mutex>, /// Next request ID for JSON-RPC next_id: AtomicI64, /// Whether initialize/initialized handshake has completed initialized: std::sync::Mutex, + /// Serializes initialize handshakes so only one request is in flight at a time. + initialize_lock: Mutex<()>, /// Mapping from thread_id to session_id for routing notifications thread_sessions: std::sync::Mutex>, } @@ -1009,8 +1014,10 @@ impl CodexServer { Self { stdin_sender, pending_requests: std::sync::Mutex::new(HashMap::new()), + request_sessions: std::sync::Mutex::new(HashMap::new()), next_id: AtomicI64::new(1), initialized: std::sync::Mutex::new(false), + initialize_lock: Mutex::new(()), thread_sessions: std::sync::Mutex::new(HashMap::new()), } } @@ -1019,14 +1026,37 @@ impl CodexServer { self.next_id.fetch_add(1, Ordering::SeqCst) } - fn send_request(&self, id: i64, request: &impl Serialize) -> Option> { + fn send_request( + &self, + id: i64, + request: &impl Serialize, + ) -> Option> { + self.send_request_with_session(id, request, None) + } + + fn send_request_with_session( + &self, + id: i64, + request: &impl Serialize, + session_id: Option, + ) -> Option> { let (tx, rx) = oneshot::channel(); { let mut pending = self.pending_requests.lock().unwrap(); pending.insert(id, tx); } + if let Some(session_id) = session_id { + let mut sessions = self.request_sessions.lock().unwrap(); + sessions.insert(id, session_id); + } let line = serde_json::to_string(request).ok()?; - self.stdin_sender.send(line).ok()?; + if self.stdin_sender.send(line).is_err() { + let mut pending = self.pending_requests.lock().unwrap(); + pending.remove(&id); + let mut sessions = self.request_sessions.lock().unwrap(); + sessions.remove(&id); + return None; + } Some(rx) } @@ -1037,7 +1067,7 @@ impl CodexServer { self.stdin_sender.send(line).is_ok() } - fn complete_request(&self, id: i64, result: Value) { + fn complete_request(&self, id: i64, result: CodexRequestResult) { let tx = { let mut pending = self.pending_requests.lock().unwrap(); pending.remove(&id) @@ -1047,6 +1077,11 @@ impl CodexServer { } } + fn take_request_session(&self, id: i64) -> Option { + let mut sessions = self.request_sessions.lock().unwrap(); + sessions.remove(&id) + } + fn register_thread(&self, thread_id: String, session_id: String) { let mut sessions = self.thread_sessions.lock().unwrap(); sessions.insert(thread_id, session_id); @@ -1068,6 +1103,8 @@ impl CodexServer { fn clear_pending(&self) { let mut pending = self.pending_requests.lock().unwrap(); pending.clear(); + let mut sessions = self.request_sessions.lock().unwrap(); + sessions.clear(); } fn clear_threads(&self) { @@ -1076,6 +1113,12 @@ impl CodexServer { } } +#[derive(Debug, Clone)] +enum CodexRequestResult { + Response(Value), + Error(codex_schema::JsonrpcErrorError), +} + pub(crate) struct SessionSubscription { pub(crate) initial_events: Vec, pub(crate) receiver: broadcast::Receiver, @@ -1885,6 +1928,80 @@ impl SessionManager { Ok(()) } + async fn mark_codex_model_unavailable(&self, model_id: &str) -> bool { + let mut catalog = self.model_catalog.lock().await; + let inserted = catalog + .codex_unavailable_models + .insert(model_id.to_string()); + if inserted { + // Force a fresh fetch so provider/model lists drop unavailable models. + catalog.models.remove(&AgentId::Codex); + } + inserted + } + + async fn clear_codex_session_model_if_unavailable( + &self, + session_id: &str, + model_id: &str, + ) -> bool { + let mut sessions = self.sessions.lock().await; + let Some(session) = SessionManager::session_mut(&mut sessions, session_id) else { + return false; + }; + if session.agent == AgentId::Codex && session.model.as_deref() == Some(model_id) { + session.model = None; + session.updated_at = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_millis() as i64) + .unwrap_or(session.updated_at); + return true; + } + false + } + + async fn codex_native_session_id(&self, session_id: &str) -> Option { + let sessions = self.sessions.lock().await; + let session = SessionManager::session_ref(&sessions, session_id)?; + if session.agent != AgentId::Codex { + return None; + } + session.native_session_id.clone() + } + + async fn handle_codex_model_unavailable( + &self, + session_id: &str, + model_id: &str, + native_session_id: Option, + ) { + let newly_marked = self.mark_codex_model_unavailable(model_id).await; + if newly_marked { + tracing::warn!( + model_id = %model_id, + "codex model marked unavailable after runtime error" + ); + } + if self + .clear_codex_session_model_if_unavailable(session_id, model_id) + .await + { + let native_session_id = match native_session_id { + Some(native_session_id) => Some(native_session_id), + None => self.codex_native_session_id(session_id).await, + }; + let _ = self + .record_conversions( + session_id, + vec![codex_model_unavailable_status_event( + native_session_id, + model_id, + )], + ) + .await; + } + } + pub(crate) async fn delete_session(&self, session_id: &str) -> Result<(), SandboxError> { let (agent, native_session_id) = { let mut sessions = self.sessions.lock().await; @@ -3548,7 +3665,8 @@ impl SessionManager { codex_schema::JsonrpcMessage::Response(response) => { // Route response to waiting request if let Some(id) = codex_request_id_to_i64(&response.id) { - server.complete_request(id, response.result.clone()); + server.take_request_session(id); + server.complete_request(id, CodexRequestResult::Response(response.result)); } } codex_schema::JsonrpcMessage::Notification(_) => { @@ -3560,6 +3678,20 @@ impl SessionManager { codex_thread_id_from_server_notification(¬ification) { if let Some(session_id) = server.session_for_thread(&thread_id) { + if let codex_schema::ServerNotification::Error(params) = + ¬ification + { + if let Some(model_id) = + codex_unavailable_model_from_message(¶ms.error.message) + { + self.handle_codex_model_unavailable( + &session_id, + &model_id, + Some(thread_id.clone()), + ) + .await; + } + } let conversions = match convert_codex::notification_to_universal(¬ification) { Ok(c) => c, @@ -3601,8 +3733,28 @@ impl SessionManager { } } codex_schema::JsonrpcMessage::Error(error) => { - // Log error but don't have a session to route to - eprintln!("Codex server error: {:?}", error); + if let Some(id) = codex_request_id_to_i64(&error.id) { + let session_id = server.take_request_session(id); + server.complete_request(id, CodexRequestResult::Error(error.error.clone())); + if let Some(session_id) = session_id { + if let Some(model_id) = + codex_unavailable_model_from_rpc_error(&error.error) + { + self.handle_codex_model_unavailable(&session_id, &model_id, None) + .await; + } + let _ = self + .record_conversions( + &session_id, + vec![codex_rpc_error_to_universal(&error)], + ) + .await; + } else { + eprintln!("Codex server error: {:?}", error); + } + } else { + eprintln!("Codex server error: {:?}", error); + } } } } @@ -3610,6 +3762,7 @@ impl SessionManager { /// Performs the initialize/initialized handshake with the Codex server. async fn codex_server_initialize(&self, server: &CodexServer) -> Result<(), SandboxError> { + let _initialize_guard = server.initialize_lock.lock().await; if server.is_initialized() { return Ok(()); } @@ -3635,7 +3788,7 @@ impl SessionManager { // Wait for initialize response with timeout let result = tokio::time::timeout(Duration::from_secs(30), rx).await; match result { - Ok(Ok(_)) => { + Ok(Ok(CodexRequestResult::Response(_))) => { // Send initialized notification let notification = codex_schema::JsonrpcNotification { method: "initialized".to_string(), @@ -3645,6 +3798,10 @@ impl SessionManager { server.set_initialized(); Ok(()) } + Ok(Ok(CodexRequestResult::Error(error))) => Err(codex_request_error_to_sandbox( + "initialize request failed", + &error, + )), Ok(Err(_)) => Err(SandboxError::StreamError { message: "initialize request cancelled".to_string(), }), @@ -3682,7 +3839,7 @@ impl SessionManager { // Wait for thread/start response let result = tokio::time::timeout(Duration::from_secs(30), rx).await; match result { - Ok(Ok(response)) => { + Ok(Ok(CodexRequestResult::Response(response))) => { // Extract thread_id from response let thread_id = response .get("thread") @@ -3699,6 +3856,10 @@ impl SessionManager { Ok(thread_id) } + Ok(Ok(CodexRequestResult::Error(error))) => Err(codex_request_error_to_sandbox( + "thread/start request failed", + &error, + )), Ok(Err(_)) => Err(SandboxError::StreamError { message: "thread/start request cancelled".to_string(), }), @@ -3726,6 +3887,22 @@ impl SessionManager { let id = server.next_request_id(); let prompt_text = codex_prompt_for_mode(prompt, Some(&session.agent_mode)); + let mut model = session.model.clone(); + if let Some(model_id) = model.clone() { + let is_unavailable = { + let catalog = self.model_catalog.lock().await; + catalog.codex_unavailable_models.contains(&model_id) + }; + if is_unavailable { + self.handle_codex_model_unavailable( + &session.session_id, + &model_id, + session.native_session_id.clone(), + ) + .await; + model = None; + } + } let params = codex_schema::TurnStartParams { approval_policy: codex_approval_policy(Some(&session.permission_mode)), collaboration_mode: None, @@ -3735,7 +3912,7 @@ impl SessionManager { text: prompt_text, text_elements: Vec::new(), }], - model: session.model.clone(), + model, output_schema: None, sandbox_policy: codex_sandbox_policy(Some(&session.permission_mode)), summary: None, @@ -3749,7 +3926,7 @@ impl SessionManager { // Send but don't wait for response - notifications will stream back server - .send_request(id, &request) + .send_request_with_session(id, &request, Some(session.session_id.clone())) .ok_or_else(|| SandboxError::StreamError { message: "failed to send turn/start request".to_string(), })?; @@ -3899,6 +4076,10 @@ impl SessionManager { async fn fetch_codex_models(self: &Arc) -> Result { let started = Instant::now(); + let unavailable_models = { + let catalog = self.model_catalog.lock().await; + catalog.codex_unavailable_models.clone() + }; let server = self.ensure_codex_server().await?; tracing::info!( elapsed_ms = started.elapsed().as_millis() as u64, @@ -3932,7 +4113,19 @@ impl SessionManager { let result = tokio::time::timeout(Duration::from_secs(CODEX_MODEL_LIST_TIMEOUT_SECS), rx).await; let value = match result { - Ok(Ok(value)) => value, + Ok(Ok(CodexRequestResult::Response(value))) => value, + Ok(Ok(CodexRequestResult::Error(error))) => { + tracing::warn!( + elapsed_ms = started.elapsed().as_millis() as u64, + page = pages + 1, + error = %error.message, + "codex model/list request failed" + ); + return Err(codex_request_error_to_sandbox( + "model/list request failed", + &error, + )); + } Ok(Err(_)) => { tracing::warn!( elapsed_ms = started.elapsed().as_millis() as u64, @@ -3977,6 +4170,9 @@ impl SessionManager { let Some(model_id) = model_id else { continue; }; + if unavailable_models.contains(model_id) { + continue; + } if !seen.insert(model_id.to_string()) { continue; } @@ -4039,6 +4235,12 @@ impl SessionManager { } models.sort_by(|a, b| a.id.cmp(&b.id)); + if default_model + .as_ref() + .is_some_and(|model_id| unavailable_models.contains(model_id)) + { + default_model = None; + } if default_model.is_none() { default_model = models.first().map(|model| model.id.clone()); } @@ -5713,6 +5915,46 @@ mod tests { assert!(!options.env.contains_key("ANTHROPIC_API_KEY")); assert!(!options.env.contains_key("CLAUDE_API_KEY")); } + + #[test] + fn codex_unavailable_model_parser_handles_requested_model_message() { + let message = "The requested model 'gpt-5.3-codex' does not exist."; + assert_eq!( + codex_unavailable_model_from_message(message), + Some("gpt-5.3-codex".to_string()) + ); + } + + #[test] + fn codex_unavailable_model_parser_handles_chatgpt_account_message() { + let message = "The 'gpt-5.3-codex-NOTREAL' model is not supported when using Codex with a ChatGPT account."; + assert_eq!( + codex_unavailable_model_from_message(message), + Some("gpt-5.3-codex-NOTREAL".to_string()) + ); + } + + #[test] + fn codex_unavailable_model_parser_ignores_non_model_messages() { + let message = "Network error while contacting provider."; + assert_eq!(codex_unavailable_model_from_message(message), None); + } + + #[test] + fn codex_unavailable_model_parser_ignores_non_unavailable_model_messages() { + let message = "using model 'gpt-5.3-codex' for this turn"; + assert_eq!(codex_unavailable_model_from_message(message), None); + } + + #[test] + fn codex_unavailable_model_parser_handles_embedded_json_detail_message() { + let message = "http 400 Bad Request: Some(\"{\\\"detail\\\":\\\"The 'gpt-5.3-codex-NOTREAL' model is not supported when using Codex with a ChatGPT account.\\\"}\")"; + assert_eq!( + codex_unavailable_model_from_message(message), + Some("gpt-5.3-codex-NOTREAL".to_string()) + ); + } + } fn claude_input_session_id(session: &SessionSnapshot) -> String { @@ -6403,6 +6645,111 @@ fn codex_rpc_error_to_universal(error: &codex_schema::JsonrpcError) -> EventConv EventConversion::new(UniversalEventType::Error, UniversalEventData::Error(data)) } +fn codex_request_error_to_sandbox( + context: &str, + error: &codex_schema::JsonrpcErrorError, +) -> SandboxError { + SandboxError::StreamError { + message: format!("{context}: {} (code {})", error.message, error.code), + } +} + +fn codex_model_unavailable_status_event( + native_session_id: Option, + model_id: &str, +) -> EventConversion { + EventConversion::new( + UniversalEventType::ItemCompleted, + UniversalEventData::Item(ItemEventData { + item: UniversalItem { + item_id: String::new(), + native_item_id: None, + parent_id: None, + kind: ItemKind::Status, + role: Some(ItemRole::System), + content: vec![ContentPart::Status { + label: "codex.model.unavailable".to_string(), + detail: Some(format!( + "Model '{}' was rejected by provider; falling back to default for this session.", + model_id + )), + }], + status: ItemStatus::Completed, + }, + }), + ) + .synthetic() + .with_native_session(native_session_id) +} + +fn codex_unavailable_model_from_message(message: &str) -> Option { + let normalized = message.to_ascii_lowercase(); + if !normalized.contains("model") { + return None; + } + let is_known_unavailable_shape = normalized.contains("does not exist") + || normalized.contains("model_not_found") + || normalized.contains("requested model") + || normalized.contains("not supported when using codex with a chatgpt account"); + if !is_known_unavailable_shape { + return None; + } + for token in extract_quoted_tokens(message, '\'') + .into_iter() + .chain(extract_quoted_tokens(message, '"').into_iter()) + { + if is_likely_model_id(token) { + return Some(token.to_string()); + } + } + None +} + +fn codex_unavailable_model_from_rpc_error( + error: &codex_schema::JsonrpcErrorError, +) -> Option { + codex_unavailable_model_from_message(&error.message).or_else(|| { + error + .data + .as_ref() + .and_then(|data| codex_unavailable_model_from_message(&data.to_string())) + }) +} + +fn extract_quoted_tokens<'a>(message: &'a str, quote: char) -> Vec<&'a str> { + let mut out = Vec::new(); + let mut start: Option = None; + for (idx, ch) in message.char_indices() { + if ch != quote { + continue; + } + if let Some(open) = start.take() { + if open < idx { + out.push(&message[open..idx]); + } + } else { + start = Some(idx + ch.len_utf8()); + } + } + out +} + +fn is_likely_model_id(candidate: &str) -> bool { + if candidate.len() < 3 || candidate.len() > 128 { + return false; + } + if candidate.chars().any(|ch| ch.is_whitespace()) { + return false; + } + if !candidate + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.')) + { + return false; + } + candidate.contains('-') +} + fn codex_permission_response_line( permission_id: &str, pending: &PendingPermission,