fix: stabilize codex model handling and initialization

This commit is contained in:
Nathan Flurry 2026-02-08 11:47:16 -08:00
parent 91cac052b8
commit a97b15e19a

View file

@ -978,6 +978,7 @@ pub(crate) struct SessionManager {
struct ModelCatalogState {
models: HashMap<AgentId, AgentModelsResponse>,
in_flight: HashMap<AgentId, Arc<Notify>>,
codex_unavailable_models: HashSet<String>,
}
/// Shared Codex app-server process that handles multiple sessions via JSON-RPC.
@ -987,11 +988,15 @@ struct CodexServer {
/// Sender for writing to the process stdin
stdin_sender: mpsc::UnboundedSender<String>,
/// Pending JSON-RPC requests awaiting responses, keyed by request ID
pending_requests: std::sync::Mutex<HashMap<i64, oneshot::Sender<Value>>>,
pending_requests: std::sync::Mutex<HashMap<i64, oneshot::Sender<CodexRequestResult>>>,
/// Optional mapping from request ID to session ID for routing request-scoped errors
request_sessions: std::sync::Mutex<HashMap<i64, String>>,
/// Next request ID for JSON-RPC
next_id: AtomicI64,
/// Whether initialize/initialized handshake has completed
initialized: std::sync::Mutex<bool>,
/// Serializes initialize handshakes so only one request is in flight at a time.
initialize_lock: Mutex<()>,
/// Mapping from thread_id to session_id for routing notifications
thread_sessions: std::sync::Mutex<HashMap<String, String>>,
}
@ -1009,8 +1014,10 @@ impl CodexServer {
Self {
stdin_sender,
pending_requests: std::sync::Mutex::new(HashMap::new()),
request_sessions: std::sync::Mutex::new(HashMap::new()),
next_id: AtomicI64::new(1),
initialized: std::sync::Mutex::new(false),
initialize_lock: Mutex::new(()),
thread_sessions: std::sync::Mutex::new(HashMap::new()),
}
}
@ -1019,14 +1026,37 @@ impl CodexServer {
self.next_id.fetch_add(1, Ordering::SeqCst)
}
fn send_request(&self, id: i64, request: &impl Serialize) -> Option<oneshot::Receiver<Value>> {
fn send_request(
&self,
id: i64,
request: &impl Serialize,
) -> Option<oneshot::Receiver<CodexRequestResult>> {
self.send_request_with_session(id, request, None)
}
fn send_request_with_session(
&self,
id: i64,
request: &impl Serialize,
session_id: Option<String>,
) -> Option<oneshot::Receiver<CodexRequestResult>> {
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending_requests.lock().unwrap();
pending.insert(id, tx);
}
if let Some(session_id) = session_id {
let mut sessions = self.request_sessions.lock().unwrap();
sessions.insert(id, session_id);
}
let line = serde_json::to_string(request).ok()?;
self.stdin_sender.send(line).ok()?;
if self.stdin_sender.send(line).is_err() {
let mut pending = self.pending_requests.lock().unwrap();
pending.remove(&id);
let mut sessions = self.request_sessions.lock().unwrap();
sessions.remove(&id);
return None;
}
Some(rx)
}
@ -1037,7 +1067,7 @@ impl CodexServer {
self.stdin_sender.send(line).is_ok()
}
fn complete_request(&self, id: i64, result: Value) {
fn complete_request(&self, id: i64, result: CodexRequestResult) {
let tx = {
let mut pending = self.pending_requests.lock().unwrap();
pending.remove(&id)
@ -1047,6 +1077,11 @@ impl CodexServer {
}
}
fn take_request_session(&self, id: i64) -> Option<String> {
let mut sessions = self.request_sessions.lock().unwrap();
sessions.remove(&id)
}
fn register_thread(&self, thread_id: String, session_id: String) {
let mut sessions = self.thread_sessions.lock().unwrap();
sessions.insert(thread_id, session_id);
@ -1068,6 +1103,8 @@ impl CodexServer {
fn clear_pending(&self) {
let mut pending = self.pending_requests.lock().unwrap();
pending.clear();
let mut sessions = self.request_sessions.lock().unwrap();
sessions.clear();
}
fn clear_threads(&self) {
@ -1076,6 +1113,12 @@ impl CodexServer {
}
}
#[derive(Debug, Clone)]
enum CodexRequestResult {
Response(Value),
Error(codex_schema::JsonrpcErrorError),
}
pub(crate) struct SessionSubscription {
pub(crate) initial_events: Vec<UniversalEvent>,
pub(crate) receiver: broadcast::Receiver<UniversalEvent>,
@ -1885,6 +1928,80 @@ impl SessionManager {
Ok(())
}
async fn mark_codex_model_unavailable(&self, model_id: &str) -> bool {
let mut catalog = self.model_catalog.lock().await;
let inserted = catalog
.codex_unavailable_models
.insert(model_id.to_string());
if inserted {
// Force a fresh fetch so provider/model lists drop unavailable models.
catalog.models.remove(&AgentId::Codex);
}
inserted
}
async fn clear_codex_session_model_if_unavailable(
&self,
session_id: &str,
model_id: &str,
) -> bool {
let mut sessions = self.sessions.lock().await;
let Some(session) = SessionManager::session_mut(&mut sessions, session_id) else {
return false;
};
if session.agent == AgentId::Codex && session.model.as_deref() == Some(model_id) {
session.model = None;
session.updated_at = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(session.updated_at);
return true;
}
false
}
async fn codex_native_session_id(&self, session_id: &str) -> Option<String> {
let sessions = self.sessions.lock().await;
let session = SessionManager::session_ref(&sessions, session_id)?;
if session.agent != AgentId::Codex {
return None;
}
session.native_session_id.clone()
}
async fn handle_codex_model_unavailable(
&self,
session_id: &str,
model_id: &str,
native_session_id: Option<String>,
) {
let newly_marked = self.mark_codex_model_unavailable(model_id).await;
if newly_marked {
tracing::warn!(
model_id = %model_id,
"codex model marked unavailable after runtime error"
);
}
if self
.clear_codex_session_model_if_unavailable(session_id, model_id)
.await
{
let native_session_id = match native_session_id {
Some(native_session_id) => Some(native_session_id),
None => self.codex_native_session_id(session_id).await,
};
let _ = self
.record_conversions(
session_id,
vec![codex_model_unavailable_status_event(
native_session_id,
model_id,
)],
)
.await;
}
}
pub(crate) async fn delete_session(&self, session_id: &str) -> Result<(), SandboxError> {
let (agent, native_session_id) = {
let mut sessions = self.sessions.lock().await;
@ -3548,7 +3665,8 @@ impl SessionManager {
codex_schema::JsonrpcMessage::Response(response) => {
// Route response to waiting request
if let Some(id) = codex_request_id_to_i64(&response.id) {
server.complete_request(id, response.result.clone());
server.take_request_session(id);
server.complete_request(id, CodexRequestResult::Response(response.result));
}
}
codex_schema::JsonrpcMessage::Notification(_) => {
@ -3560,6 +3678,20 @@ impl SessionManager {
codex_thread_id_from_server_notification(&notification)
{
if let Some(session_id) = server.session_for_thread(&thread_id) {
if let codex_schema::ServerNotification::Error(params) =
&notification
{
if let Some(model_id) =
codex_unavailable_model_from_message(&params.error.message)
{
self.handle_codex_model_unavailable(
&session_id,
&model_id,
Some(thread_id.clone()),
)
.await;
}
}
let conversions =
match convert_codex::notification_to_universal(&notification) {
Ok(c) => c,
@ -3601,8 +3733,28 @@ impl SessionManager {
}
}
codex_schema::JsonrpcMessage::Error(error) => {
// Log error but don't have a session to route to
eprintln!("Codex server error: {:?}", error);
if let Some(id) = codex_request_id_to_i64(&error.id) {
let session_id = server.take_request_session(id);
server.complete_request(id, CodexRequestResult::Error(error.error.clone()));
if let Some(session_id) = session_id {
if let Some(model_id) =
codex_unavailable_model_from_rpc_error(&error.error)
{
self.handle_codex_model_unavailable(&session_id, &model_id, None)
.await;
}
let _ = self
.record_conversions(
&session_id,
vec![codex_rpc_error_to_universal(&error)],
)
.await;
} else {
eprintln!("Codex server error: {:?}", error);
}
} else {
eprintln!("Codex server error: {:?}", error);
}
}
}
}
@ -3610,6 +3762,7 @@ impl SessionManager {
/// Performs the initialize/initialized handshake with the Codex server.
async fn codex_server_initialize(&self, server: &CodexServer) -> Result<(), SandboxError> {
let _initialize_guard = server.initialize_lock.lock().await;
if server.is_initialized() {
return Ok(());
}
@ -3635,7 +3788,7 @@ impl SessionManager {
// Wait for initialize response with timeout
let result = tokio::time::timeout(Duration::from_secs(30), rx).await;
match result {
Ok(Ok(_)) => {
Ok(Ok(CodexRequestResult::Response(_))) => {
// Send initialized notification
let notification = codex_schema::JsonrpcNotification {
method: "initialized".to_string(),
@ -3645,6 +3798,10 @@ impl SessionManager {
server.set_initialized();
Ok(())
}
Ok(Ok(CodexRequestResult::Error(error))) => Err(codex_request_error_to_sandbox(
"initialize request failed",
&error,
)),
Ok(Err(_)) => Err(SandboxError::StreamError {
message: "initialize request cancelled".to_string(),
}),
@ -3682,7 +3839,7 @@ impl SessionManager {
// Wait for thread/start response
let result = tokio::time::timeout(Duration::from_secs(30), rx).await;
match result {
Ok(Ok(response)) => {
Ok(Ok(CodexRequestResult::Response(response))) => {
// Extract thread_id from response
let thread_id = response
.get("thread")
@ -3699,6 +3856,10 @@ impl SessionManager {
Ok(thread_id)
}
Ok(Ok(CodexRequestResult::Error(error))) => Err(codex_request_error_to_sandbox(
"thread/start request failed",
&error,
)),
Ok(Err(_)) => Err(SandboxError::StreamError {
message: "thread/start request cancelled".to_string(),
}),
@ -3726,6 +3887,22 @@ impl SessionManager {
let id = server.next_request_id();
let prompt_text = codex_prompt_for_mode(prompt, Some(&session.agent_mode));
let mut model = session.model.clone();
if let Some(model_id) = model.clone() {
let is_unavailable = {
let catalog = self.model_catalog.lock().await;
catalog.codex_unavailable_models.contains(&model_id)
};
if is_unavailable {
self.handle_codex_model_unavailable(
&session.session_id,
&model_id,
session.native_session_id.clone(),
)
.await;
model = None;
}
}
let params = codex_schema::TurnStartParams {
approval_policy: codex_approval_policy(Some(&session.permission_mode)),
collaboration_mode: None,
@ -3735,7 +3912,7 @@ impl SessionManager {
text: prompt_text,
text_elements: Vec::new(),
}],
model: session.model.clone(),
model,
output_schema: None,
sandbox_policy: codex_sandbox_policy(Some(&session.permission_mode)),
summary: None,
@ -3749,7 +3926,7 @@ impl SessionManager {
// Send but don't wait for response - notifications will stream back
server
.send_request(id, &request)
.send_request_with_session(id, &request, Some(session.session_id.clone()))
.ok_or_else(|| SandboxError::StreamError {
message: "failed to send turn/start request".to_string(),
})?;
@ -3899,6 +4076,10 @@ impl SessionManager {
async fn fetch_codex_models(self: &Arc<Self>) -> Result<AgentModelsResponse, SandboxError> {
let started = Instant::now();
let unavailable_models = {
let catalog = self.model_catalog.lock().await;
catalog.codex_unavailable_models.clone()
};
let server = self.ensure_codex_server().await?;
tracing::info!(
elapsed_ms = started.elapsed().as_millis() as u64,
@ -3932,7 +4113,19 @@ impl SessionManager {
let result =
tokio::time::timeout(Duration::from_secs(CODEX_MODEL_LIST_TIMEOUT_SECS), rx).await;
let value = match result {
Ok(Ok(value)) => value,
Ok(Ok(CodexRequestResult::Response(value))) => value,
Ok(Ok(CodexRequestResult::Error(error))) => {
tracing::warn!(
elapsed_ms = started.elapsed().as_millis() as u64,
page = pages + 1,
error = %error.message,
"codex model/list request failed"
);
return Err(codex_request_error_to_sandbox(
"model/list request failed",
&error,
));
}
Ok(Err(_)) => {
tracing::warn!(
elapsed_ms = started.elapsed().as_millis() as u64,
@ -3977,6 +4170,9 @@ impl SessionManager {
let Some(model_id) = model_id else {
continue;
};
if unavailable_models.contains(model_id) {
continue;
}
if !seen.insert(model_id.to_string()) {
continue;
}
@ -4039,6 +4235,12 @@ impl SessionManager {
}
models.sort_by(|a, b| a.id.cmp(&b.id));
if default_model
.as_ref()
.is_some_and(|model_id| unavailable_models.contains(model_id))
{
default_model = None;
}
if default_model.is_none() {
default_model = models.first().map(|model| model.id.clone());
}
@ -5713,6 +5915,46 @@ mod tests {
assert!(!options.env.contains_key("ANTHROPIC_API_KEY"));
assert!(!options.env.contains_key("CLAUDE_API_KEY"));
}
#[test]
fn codex_unavailable_model_parser_handles_requested_model_message() {
let message = "The requested model 'gpt-5.3-codex' does not exist.";
assert_eq!(
codex_unavailable_model_from_message(message),
Some("gpt-5.3-codex".to_string())
);
}
#[test]
fn codex_unavailable_model_parser_handles_chatgpt_account_message() {
let message = "The 'gpt-5.3-codex-NOTREAL' model is not supported when using Codex with a ChatGPT account.";
assert_eq!(
codex_unavailable_model_from_message(message),
Some("gpt-5.3-codex-NOTREAL".to_string())
);
}
#[test]
fn codex_unavailable_model_parser_ignores_non_model_messages() {
let message = "Network error while contacting provider.";
assert_eq!(codex_unavailable_model_from_message(message), None);
}
#[test]
fn codex_unavailable_model_parser_ignores_non_unavailable_model_messages() {
let message = "using model 'gpt-5.3-codex' for this turn";
assert_eq!(codex_unavailable_model_from_message(message), None);
}
#[test]
fn codex_unavailable_model_parser_handles_embedded_json_detail_message() {
let message = "http 400 Bad Request: Some(\"{\\\"detail\\\":\\\"The 'gpt-5.3-codex-NOTREAL' model is not supported when using Codex with a ChatGPT account.\\\"}\")";
assert_eq!(
codex_unavailable_model_from_message(message),
Some("gpt-5.3-codex-NOTREAL".to_string())
);
}
}
fn claude_input_session_id(session: &SessionSnapshot) -> String {
@ -6403,6 +6645,111 @@ fn codex_rpc_error_to_universal(error: &codex_schema::JsonrpcError) -> EventConv
EventConversion::new(UniversalEventType::Error, UniversalEventData::Error(data))
}
fn codex_request_error_to_sandbox(
context: &str,
error: &codex_schema::JsonrpcErrorError,
) -> SandboxError {
SandboxError::StreamError {
message: format!("{context}: {} (code {})", error.message, error.code),
}
}
fn codex_model_unavailable_status_event(
native_session_id: Option<String>,
model_id: &str,
) -> EventConversion {
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData {
item: UniversalItem {
item_id: String::new(),
native_item_id: None,
parent_id: None,
kind: ItemKind::Status,
role: Some(ItemRole::System),
content: vec![ContentPart::Status {
label: "codex.model.unavailable".to_string(),
detail: Some(format!(
"Model '{}' was rejected by provider; falling back to default for this session.",
model_id
)),
}],
status: ItemStatus::Completed,
},
}),
)
.synthetic()
.with_native_session(native_session_id)
}
fn codex_unavailable_model_from_message(message: &str) -> Option<String> {
let normalized = message.to_ascii_lowercase();
if !normalized.contains("model") {
return None;
}
let is_known_unavailable_shape = normalized.contains("does not exist")
|| normalized.contains("model_not_found")
|| normalized.contains("requested model")
|| normalized.contains("not supported when using codex with a chatgpt account");
if !is_known_unavailable_shape {
return None;
}
for token in extract_quoted_tokens(message, '\'')
.into_iter()
.chain(extract_quoted_tokens(message, '"').into_iter())
{
if is_likely_model_id(token) {
return Some(token.to_string());
}
}
None
}
fn codex_unavailable_model_from_rpc_error(
error: &codex_schema::JsonrpcErrorError,
) -> Option<String> {
codex_unavailable_model_from_message(&error.message).or_else(|| {
error
.data
.as_ref()
.and_then(|data| codex_unavailable_model_from_message(&data.to_string()))
})
}
fn extract_quoted_tokens<'a>(message: &'a str, quote: char) -> Vec<&'a str> {
let mut out = Vec::new();
let mut start: Option<usize> = None;
for (idx, ch) in message.char_indices() {
if ch != quote {
continue;
}
if let Some(open) = start.take() {
if open < idx {
out.push(&message[open..idx]);
}
} else {
start = Some(idx + ch.len_utf8());
}
}
out
}
fn is_likely_model_id(candidate: &str) -> bool {
if candidate.len() < 3 || candidate.len() > 128 {
return false;
}
if candidate.chars().any(|ch| ch.is_whitespace()) {
return false;
}
if !candidate
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || matches!(ch, '-' | '_' | '.'))
{
return false;
}
candidate.contains('-')
}
fn codex_permission_response_line(
permission_id: &str,
pending: &PendingPermission,