feat: migrate codex app server

This commit is contained in:
Nathan Flurry 2026-01-26 21:50:37 -08:00
parent c91595d338
commit 4b5b390b7f
9 changed files with 1266 additions and 110 deletions

View file

@ -1,6 +1,6 @@
use std::collections::{HashMap, HashSet};
use std::convert::Infallible;
use std::io::{BufRead, BufReader};
use std::io::{BufRead, BufReader, Write};
use std::net::TcpListener;
use std::process::Stdio;
use std::sync::Arc;
@ -19,10 +19,26 @@ use tower_http::trace::TraceLayer;
use reqwest::Client;
use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError};
use sandbox_agent_universal_agent_schema::{
convert_amp, convert_claude, convert_codex, convert_opencode, AttachmentSource, CrashInfo,
EventConversion, PermissionRequest, PermissionToolRef, QuestionInfo, QuestionOption,
QuestionRequest, QuestionToolRef, Started, UniversalEvent, UniversalEventData,
UniversalMessage, UniversalMessageParsed, UniversalMessagePart,
codex as codex_schema,
convert_amp,
convert_claude,
convert_codex,
convert_opencode,
AttachmentSource,
CrashInfo,
EventConversion,
PermissionRequest,
PermissionToolRef,
QuestionInfo,
QuestionOption,
QuestionRequest,
QuestionToolRef,
Started,
UniversalEvent,
UniversalEventData,
UniversalMessage,
UniversalMessageParsed,
UniversalMessagePart,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
@ -38,6 +54,7 @@ use sandbox_agent_agent_management::agents::{
use sandbox_agent_agent_management::credentials::{
extract_all_credentials, CredentialExtractionOptions, ExtractedCredentials,
};
use crate::ui;
#[derive(Debug)]
pub struct AppState {
@ -104,9 +121,13 @@ pub fn build_router(state: AppState) -> Router {
v1_router = v1_router.layer(axum::middleware::from_fn_with_state(shared, require_token));
}
Router::new()
.nest("/v1", v1_router)
.layer(TraceLayer::new_for_http())
let mut router = Router::new().nest("/v1", v1_router);
if ui::is_enabled() {
router = router.merge(ui::router());
}
router.layer(TraceLayer::new_for_http())
}
#[derive(OpenApi)]
@ -204,6 +225,7 @@ struct SessionState {
pending_permissions: HashSet<String>,
broadcaster: broadcast::Sender<UniversalEvent>,
opencode_stream_started: bool,
codex_sender: Option<mpsc::UnboundedSender<String>>,
}
impl SessionState {
@ -236,6 +258,7 @@ impl SessionState {
pending_permissions: HashSet::new(),
broadcaster,
opencode_stream_started: false,
codex_sender: None,
})
}
@ -274,6 +297,14 @@ impl SessionState {
event
}
fn set_codex_sender(&mut self, sender: Option<mpsc::UnboundedSender<String>>) {
self.codex_sender = sender;
}
fn codex_sender(&self) -> Option<mpsc::UnboundedSender<String>> {
self.codex_sender.clone()
}
fn normalize_event_data(&self, mut data: UniversalEventData) -> UniversalEventData {
match &mut data {
UniversalEventData::QuestionAsked { question_asked } => {
@ -627,7 +658,7 @@ impl SessionManager {
permission_id: &str,
reply: PermissionReply,
) -> Result<(), SandboxError> {
let (agent, agent_session_id) = {
let (agent, agent_session_id, codex_sender, codex_metadata) = {
let mut sessions = self.sessions.lock().await;
let session = sessions.get_mut(session_id).ok_or_else(|| SandboxError::SessionNotFound {
session_id: session_id.to_string(),
@ -640,10 +671,79 @@ impl SessionManager {
message: format!("unknown permission id: {permission_id}"),
});
}
(session.agent, session.agent_session_id.clone())
let codex_metadata = if session.agent == AgentId::Codex {
session.events.iter().find_map(|event| {
if let UniversalEventData::PermissionAsked { permission_asked } = &event.data {
if permission_asked.id == permission_id {
return Some(permission_asked.metadata.clone());
}
}
None
})
} else {
None
};
let codex_sender = if session.agent == AgentId::Codex {
session.codex_sender()
} else {
None
};
(
session.agent,
session.agent_session_id.clone(),
codex_sender,
codex_metadata,
)
};
if agent == AgentId::Opencode {
if agent == AgentId::Codex {
let sender = codex_sender.ok_or_else(|| SandboxError::InvalidRequest {
message: "codex session not active".to_string(),
})?;
let metadata = codex_metadata.ok_or_else(|| SandboxError::InvalidRequest {
message: "missing codex permission metadata".to_string(),
})?;
let request_id = codex_request_id_from_metadata(&metadata)
.or_else(|| codex_request_id_from_string(permission_id))
.ok_or_else(|| SandboxError::InvalidRequest {
message: "invalid codex permission request id".to_string(),
})?;
let request_kind = metadata
.get("codexRequestKind")
.and_then(Value::as_str)
.unwrap_or("");
let response_value = match request_kind {
"commandExecution" => {
let decision = codex_command_decision_for_reply(reply);
let response = codex_schema::CommandExecutionRequestApprovalResponse { decision };
serde_json::to_value(response).map_err(|err| SandboxError::InvalidRequest {
message: err.to_string(),
})?
}
"fileChange" => {
let decision = codex_file_change_decision_for_reply(reply);
let response = codex_schema::FileChangeRequestApprovalResponse { decision };
serde_json::to_value(response).map_err(|err| SandboxError::InvalidRequest {
message: err.to_string(),
})?
}
_ => {
return Err(SandboxError::InvalidRequest {
message: "unsupported codex permission request".to_string(),
});
}
};
let response = codex_schema::JsonrpcResponse {
id: request_id,
result: response_value,
};
let line = serde_json::to_string(&response).map_err(|err| SandboxError::InvalidRequest {
message: err.to_string(),
})?;
sender.send(line).map_err(|_| SandboxError::InvalidRequest {
message: "codex session not active".to_string(),
})?;
} else if agent == AgentId::Opencode {
let agent_session_id = agent_session_id.ok_or_else(|| SandboxError::InvalidRequest {
message: "missing OpenCode session id".to_string(),
})?;
@ -681,10 +781,17 @@ impl SessionManager {
) {
let StreamingSpawn {
mut child,
stdin,
stdout,
stderr,
codex_options,
} = spawn;
let (tx, mut rx) = mpsc::unbounded_channel::<String>();
let mut codex_state = codex_options
.filter(|_| agent == AgentId::Codex)
.map(CodexAppServerState::new);
let mut codex_sender: Option<mpsc::UnboundedSender<String>> = None;
let mut terminate_early = false;
if let Some(stdout) = stdout {
let tx_stdout = tx.clone();
@ -700,12 +807,52 @@ impl SessionManager {
}
drop(tx);
if agent == AgentId::Codex {
if let Some(stdin) = stdin {
let (writer_tx, writer_rx) = mpsc::unbounded_channel::<String>();
codex_sender = Some(writer_tx.clone());
{
let mut sessions = self.sessions.lock().await;
if let Some(session) = sessions.get_mut(&session_id) {
session.set_codex_sender(Some(writer_tx));
}
}
tokio::task::spawn_blocking(move || {
write_lines(stdin, writer_rx);
});
}
if let (Some(state), Some(sender)) = (codex_state.as_mut(), codex_sender.as_ref()) {
state.start(sender);
}
}
while let Some(line) = rx.recv().await {
if let Some(conversion) = parse_agent_line(agent, &line, &session_id) {
if agent == AgentId::Codex {
if let Some(state) = codex_state.as_mut() {
let outcome = state.handle_line(&line);
if let Some(conversion) = outcome.conversion {
let _ = self.record_conversion(&session_id, conversion).await;
}
if outcome.should_terminate {
terminate_early = true;
break;
}
}
} else if let Some(conversion) = parse_agent_line(agent, &line, &session_id) {
let _ = self.record_conversion(&session_id, conversion).await;
}
}
if agent == AgentId::Codex {
let mut sessions = self.sessions.lock().await;
if let Some(session) = sessions.get_mut(&session_id) {
session.set_codex_sender(None);
}
}
if terminate_early {
let _ = child.kill();
}
let status = tokio::task::spawn_blocking(move || child.wait()).await;
match status {
Ok(Ok(status)) if status.success() => {}
@ -1932,6 +2079,430 @@ fn read_lines<R: std::io::Read>(reader: R, sender: mpsc::UnboundedSender<String>
}
}
fn write_lines(mut stdin: std::process::ChildStdin, mut receiver: mpsc::UnboundedReceiver<String>) {
while let Some(line) = receiver.blocking_recv() {
if writeln!(stdin, "{line}").is_err() {
break;
}
if stdin.flush().is_err() {
break;
}
}
}
#[derive(Default)]
struct CodexLineOutcome {
conversion: Option<EventConversion>,
should_terminate: bool,
}
struct CodexAppServerState {
init_id: Option<String>,
thread_start_id: Option<String>,
init_done: bool,
thread_start_sent: bool,
turn_start_sent: bool,
thread_id: Option<String>,
next_id: i64,
prompt: String,
model: Option<String>,
cwd: Option<String>,
approval_policy: Option<codex_schema::AskForApproval>,
sandbox_mode: Option<codex_schema::SandboxMode>,
sandbox_policy: Option<codex_schema::SandboxPolicy>,
sender: Option<mpsc::UnboundedSender<String>>,
}
impl CodexAppServerState {
fn new(options: SpawnOptions) -> Self {
let prompt = codex_prompt_for_mode(&options.prompt, options.agent_mode.as_deref());
let cwd = options
.working_dir
.as_ref()
.map(|path| path.to_string_lossy().to_string());
Self {
init_id: None,
thread_start_id: None,
init_done: false,
thread_start_sent: false,
turn_start_sent: false,
thread_id: None,
next_id: 1,
prompt,
model: options.model.clone(),
cwd,
approval_policy: codex_approval_policy(options.permission_mode.as_deref()),
sandbox_mode: codex_sandbox_mode(options.permission_mode.as_deref()),
sandbox_policy: codex_sandbox_policy(options.permission_mode.as_deref()),
sender: None,
}
}
fn start(&mut self, sender: &mpsc::UnboundedSender<String>) {
self.sender = Some(sender.clone());
let request_id = self.next_request_id();
self.init_id = Some(request_id.to_string());
let request = codex_schema::ClientRequest::Initialize {
id: request_id,
params: codex_schema::InitializeParams {
client_info: codex_schema::ClientInfo {
name: "sandbox-agent".to_string(),
title: Some("sandbox-agent".to_string()),
version: env!("CARGO_PKG_VERSION").to_string(),
},
},
};
self.send_json(&request);
}
fn handle_line(&mut self, line: &str) -> CodexLineOutcome {
let trimmed = line.trim();
if trimmed.is_empty() {
return CodexLineOutcome::default();
}
let value: Value = match serde_json::from_str(trimmed) {
Ok(value) => value,
Err(_) => return CodexLineOutcome::default(),
};
let message: codex_schema::JsonrpcMessage = match serde_json::from_value(value.clone()) {
Ok(message) => message,
Err(_) => return CodexLineOutcome::default(),
};
match message {
codex_schema::JsonrpcMessage::Response(response) => {
self.handle_response(&response);
CodexLineOutcome::default()
}
codex_schema::JsonrpcMessage::Notification(_) => {
if let Ok(notification) =
serde_json::from_value::<codex_schema::ServerNotification>(value.clone())
{
self.maybe_capture_thread_id(&notification);
let conversion = convert_codex::notification_to_universal(&notification);
let should_terminate = matches!(
notification,
codex_schema::ServerNotification::TurnCompleted(_)
| codex_schema::ServerNotification::Error(_)
);
CodexLineOutcome {
conversion: Some(conversion),
should_terminate,
}
} else {
CodexLineOutcome::default()
}
}
codex_schema::JsonrpcMessage::Request(_) => {
if let Ok(request) =
serde_json::from_value::<codex_schema::ServerRequest>(value.clone())
{
let conversion = codex_request_to_universal(&request);
CodexLineOutcome {
conversion: Some(conversion),
should_terminate: false,
}
} else {
CodexLineOutcome::default()
}
}
codex_schema::JsonrpcMessage::Error(error) => CodexLineOutcome {
conversion: Some(codex_rpc_error_to_universal(&error)),
should_terminate: true,
},
}
}
fn handle_response(&mut self, response: &codex_schema::JsonrpcResponse) {
let response_id = response.id.to_string();
if !self.init_done {
if self
.init_id
.as_ref()
.is_some_and(|id| id == &response_id)
{
self.init_done = true;
self.send_initialized();
self.send_thread_start();
}
return;
}
if self.thread_id.is_none()
&& self
.thread_start_id
.as_ref()
.is_some_and(|id| id == &response_id)
{
self.send_turn_start();
}
}
fn maybe_capture_thread_id(&mut self, notification: &codex_schema::ServerNotification) {
if self.thread_id.is_some() {
return;
}
let thread_id = match notification {
codex_schema::ServerNotification::ThreadStarted(params) => Some(params.thread.id.clone()),
codex_schema::ServerNotification::TurnStarted(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::TurnCompleted(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemStarted(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemCompleted(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemAgentMessageDelta(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemReasoningTextDelta(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemReasoningSummaryTextDelta(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemCommandExecutionOutputDelta(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemFileChangeOutputDelta(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemMcpToolCallProgress(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ThreadTokenUsageUpdated(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::TurnDiffUpdated(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::TurnPlanUpdated(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemCommandExecutionTerminalInteraction(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ItemReasoningSummaryPartAdded(params) => Some(params.thread_id.clone()),
codex_schema::ServerNotification::ThreadCompacted(params) => Some(params.thread_id.clone()),
_ => None,
};
if let Some(thread_id) = thread_id {
self.thread_id = Some(thread_id);
self.send_turn_start();
}
}
fn send_initialized(&self) {
let notification = codex_schema::JsonrpcNotification {
method: "initialized".to_string(),
params: None,
};
self.send_json(&notification);
}
fn send_thread_start(&mut self) {
if self.thread_start_sent {
return;
}
let request_id = self.next_request_id();
self.thread_start_id = Some(request_id.to_string());
let mut params = codex_schema::ThreadStartParams::default();
params.approval_policy = self.approval_policy;
params.sandbox = self.sandbox_mode;
params.model = self.model.clone();
params.cwd = self.cwd.clone();
let request = codex_schema::ClientRequest::ThreadStart { id: request_id, params };
self.thread_start_sent = true;
self.send_json(&request);
}
fn send_turn_start(&mut self) {
if self.turn_start_sent {
return;
}
let thread_id = match self.thread_id.clone() {
Some(thread_id) => thread_id,
None => return,
};
let request_id = self.next_request_id();
let params = codex_schema::TurnStartParams {
approval_policy: self.approval_policy,
collaboration_mode: None,
cwd: self.cwd.clone(),
effort: None,
input: vec![codex_schema::UserInput::Text {
text: self.prompt.clone(),
text_elements: Vec::new(),
}],
model: self.model.clone(),
output_schema: None,
personality: None,
sandbox_policy: self.sandbox_policy.clone(),
summary: None,
thread_id,
};
let request = codex_schema::ClientRequest::TurnStart { id: request_id, params };
self.turn_start_sent = true;
self.send_json(&request);
}
fn next_request_id(&mut self) -> codex_schema::RequestId {
let id = self.next_id;
self.next_id += 1;
codex_schema::RequestId::from(id)
}
fn send_json<T: Serialize>(&self, payload: &T) {
let Some(sender) = self.sender.as_ref() else {
return;
};
let Ok(line) = serde_json::to_string(payload) else {
return;
};
let _ = sender.send(line);
}
}
fn codex_prompt_for_mode(prompt: &str, mode: Option<&str>) -> String {
match mode {
Some("plan") => format!("Make a plan before acting.\n\n{prompt}"),
_ => prompt.to_string(),
}
}
fn codex_approval_policy(mode: Option<&str>) -> Option<codex_schema::AskForApproval> {
match mode {
Some("plan") => Some(codex_schema::AskForApproval::Untrusted),
Some("bypass") => Some(codex_schema::AskForApproval::Never),
_ => None,
}
}
fn codex_sandbox_mode(mode: Option<&str>) -> Option<codex_schema::SandboxMode> {
match mode {
Some("plan") => Some(codex_schema::SandboxMode::ReadOnly),
Some("bypass") => Some(codex_schema::SandboxMode::DangerFullAccess),
_ => None,
}
}
fn codex_sandbox_policy(mode: Option<&str>) -> Option<codex_schema::SandboxPolicy> {
match mode {
Some("plan") => Some(codex_schema::SandboxPolicy::ReadOnly),
Some("bypass") => Some(codex_schema::SandboxPolicy::DangerFullAccess),
_ => None,
}
}
fn codex_request_to_universal(request: &codex_schema::ServerRequest) -> EventConversion {
match request {
codex_schema::ServerRequest::ItemCommandExecutionRequestApproval { id, params } => {
let mut metadata = serde_json::Map::new();
metadata.insert(
"codexRequestKind".to_string(),
Value::String("commandExecution".to_string()),
);
metadata.insert(
"codexRequestId".to_string(),
serde_json::to_value(id).unwrap_or(Value::Null),
);
metadata.insert("threadId".to_string(), Value::String(params.thread_id.clone()));
metadata.insert("turnId".to_string(), Value::String(params.turn_id.clone()));
metadata.insert("itemId".to_string(), Value::String(params.item_id.clone()));
if let Some(command) = params.command.as_ref() {
metadata.insert("command".to_string(), Value::String(command.clone()));
}
if let Some(reason) = params.reason.as_ref() {
metadata.insert("reason".to_string(), Value::String(reason.clone()));
}
let permission = PermissionRequest {
id: id.to_string(),
session_id: params.thread_id.clone(),
permission: "commandExecution".to_string(),
patterns: params
.command
.as_ref()
.map(|command| vec![command.clone()])
.unwrap_or_default(),
metadata,
always: Vec::new(),
tool: None,
};
EventConversion::new(UniversalEventData::PermissionAsked {
permission_asked: permission,
})
.with_session(Some(params.thread_id.clone()))
}
codex_schema::ServerRequest::ItemFileChangeRequestApproval { id, params } => {
let mut metadata = serde_json::Map::new();
metadata.insert(
"codexRequestKind".to_string(),
Value::String("fileChange".to_string()),
);
metadata.insert(
"codexRequestId".to_string(),
serde_json::to_value(id).unwrap_or(Value::Null),
);
metadata.insert("threadId".to_string(), Value::String(params.thread_id.clone()));
metadata.insert("turnId".to_string(), Value::String(params.turn_id.clone()));
metadata.insert("itemId".to_string(), Value::String(params.item_id.clone()));
if let Some(reason) = params.reason.as_ref() {
metadata.insert("reason".to_string(), Value::String(reason.clone()));
}
if let Some(grant_root) = params.grant_root.as_ref() {
metadata.insert("grantRoot".to_string(), Value::String(grant_root.clone()));
}
let permission = PermissionRequest {
id: id.to_string(),
session_id: params.thread_id.clone(),
permission: "fileChange".to_string(),
patterns: params
.grant_root
.as_ref()
.map(|root| vec![root.clone()])
.unwrap_or_default(),
metadata,
always: Vec::new(),
tool: None,
};
EventConversion::new(UniversalEventData::PermissionAsked {
permission_asked: permission,
})
.with_session(Some(params.thread_id.clone()))
}
_ => EventConversion::new(UniversalEventData::Unknown {
raw: serde_json::to_value(request).unwrap_or(Value::Null),
}),
}
}
fn codex_rpc_error_to_universal(error: &codex_schema::JsonrpcError) -> EventConversion {
let message = error.error.message.clone();
let crash = CrashInfo {
message,
kind: Some("jsonrpc.error".to_string()),
details: serde_json::to_value(error).ok(),
};
EventConversion::new(UniversalEventData::Error { error: crash })
}
fn codex_request_id_from_metadata(
metadata: &serde_json::Map<String, Value>,
) -> Option<codex_schema::RequestId> {
let value = metadata.get("codexRequestId")?;
codex_request_id_from_value(value)
}
fn codex_request_id_from_string(value: &str) -> Option<codex_schema::RequestId> {
if let Ok(number) = value.parse::<i64>() {
return Some(codex_schema::RequestId::from(number));
}
Some(codex_schema::RequestId::Variant0(value.to_string()))
}
fn codex_request_id_from_value(value: &Value) -> Option<codex_schema::RequestId> {
match value {
Value::String(value) => Some(codex_schema::RequestId::Variant0(value.clone())),
Value::Number(value) => value.as_i64().map(codex_schema::RequestId::from),
_ => None,
}
}
fn codex_command_decision_for_reply(
reply: PermissionReply,
) -> codex_schema::CommandExecutionApprovalDecision {
match reply {
PermissionReply::Once => codex_schema::CommandExecutionApprovalDecision::Accept,
PermissionReply::Always => codex_schema::CommandExecutionApprovalDecision::AcceptForSession,
PermissionReply::Reject => codex_schema::CommandExecutionApprovalDecision::Decline,
}
}
fn codex_file_change_decision_for_reply(
reply: PermissionReply,
) -> codex_schema::FileChangeApprovalDecision {
match reply {
PermissionReply::Once => codex_schema::FileChangeApprovalDecision::Accept,
PermissionReply::Always => codex_schema::FileChangeApprovalDecision::AcceptForSession,
PermissionReply::Reject => codex_schema::FileChangeApprovalDecision::Decline,
}
}
fn parse_agent_line(agent: AgentId, line: &str, session_id: &str) -> Option<EventConversion> {
let trimmed = line.trim();
if trimmed.is_empty() {

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/http_sse_snapshots.rs
assertion_line: 1045
expression: normalize_events(&question_events)
---
- agent: claude
@ -20,3 +21,11 @@ expression: normalize_events(&question_events)
type: text
role: assistant
seq: 3
- agent: claude
kind: message
message:
parts:
- text: "<redacted>"
type: text
role: assistant
seq: 4