diff --git a/docs/agent-compatibility.mdx b/docs/agent-compatibility.mdx index cd805d5..58b3045 100644 --- a/docs/agent-compatibility.mdx +++ b/docs/agent-compatibility.mdx @@ -8,7 +8,7 @@ description: "Supported agents, install methods, and streaming formats." | Agent | Provider | Binary | Install method | Session ID | Streaming format | |-------|----------|--------|----------------|------------|------------------| | Claude Code | Anthropic | `claude` | curl raw binary from GCS | `session_id` | JSONL via stdout | -| Codex | OpenAI | `codex` | curl tarball from GitHub releases | `thread_id` | JSONL via stdout | +| Codex | OpenAI | `codex` | curl tarball from GitHub releases | `thread_id` | JSON-RPC over stdio | | OpenCode | Multi-provider | `opencode` | curl tarball from GitHub releases | `session_id` | SSE or JSONL | | Amp | Sourcegraph | `amp` | curl raw binary from GCS | `session_id` | JSONL via stdout | @@ -20,7 +20,7 @@ description: "Supported agents, install methods, and streaming formats." ## Capability notes - **Questions / permissions**: OpenCode natively supports these workflows. Claude plan approval is normalized into a question event. -- **Streaming**: all agents stream events; OpenCode uses SSE while others use JSONL. +- **Streaming**: all agents stream events; OpenCode uses SSE, Codex uses JSON-RPC over stdio, others use JSONL. - **Files and images**: normalized via `UniversalMessagePart` with `File` and `Image` parts. See [Universal API](/universal-api) for feature coverage details. diff --git a/docs/architecture.mdx b/docs/architecture.mdx index 50983f4..d96c35a 100644 --- a/docs/architecture.mdx +++ b/docs/architecture.mdx @@ -11,28 +11,229 @@ Sandbox Agent SDK is built around a single daemon that runs inside the sandbox a - **Universal schema**: Shared input/output types for messages and events. - **SDKs & CLI**: Convenience wrappers around the HTTP API. -## Session model +## Agent Schema Pipeline + +The schema pipeline extracts type definitions from AI coding agents and converts them to a universal format. + +### Schema Extraction + +TypeScript extractors in `resources/agent-schemas/src/` pull schemas from each agent: + +| Agent | Source | Extractor | +|-------|--------|-----------| +| Claude | `claude --output-format json --json-schema` | `claude.ts` | +| Codex | `codex app-server generate-json-schema` | `codex.ts` | +| OpenCode | GitHub OpenAPI spec | `opencode.ts` | +| Amp | Scrapes ampcode.com docs | `amp.ts` | + +All extractors include fallback schemas for when CLIs or URLs are unavailable. + +**Output:** JSON schemas written to `resources/agent-schemas/artifacts/json-schema/` + +### Rust Type Generation + +The `server/packages/extracted-agent-schemas/` package generates Rust types at build time: + +- `build.rs` reads JSON schemas and uses the `typify` crate to generate Rust structs +- Generated code is written to `$OUT_DIR/{agent}.rs` +- Types are exposed via `include!()` macros in `src/lib.rs` + +``` +resources/agent-schemas/artifacts/json-schema/*.json + ↓ (build.rs + typify) +$OUT_DIR/{claude,codex,opencode,amp}.rs + ↓ (include!) +extracted_agent_schemas::{claude,codex,opencode,amp}::* +``` + +### Universal Schema + +The `server/packages/universal-agent-schema/` package defines agent-agnostic types: + +**Core types** (`src/lib.rs`): +- `UniversalEvent` - Wrapper with id, timestamp, session_id, agent, data +- `UniversalEventData` - Enum: Message, Started, Error, QuestionAsked, PermissionAsked, Unknown +- `UniversalMessage` - Parsed (role, parts, metadata) or Unparsed (raw JSON) +- `UniversalMessagePart` - Text, ToolCall, ToolResult, FunctionCall, FunctionResult, File, Image, Error, Unknown + +**Converters** (`src/agents/{claude,codex,opencode,amp}.rs`): +- Each agent has a converter module that transforms native events to universal format +- Conversions are best-effort; unparseable data preserved in `Unparsed` or `Unknown` variants + +## Session Management + +Sessions track agent conversations with in-memory state. + +### Session Model - **Session ID**: Client-provided primary session identifier. - **Agent session ID**: Underlying ID from the agent (thread/session). This is surfaced in events but is not the primary key. -## Event streaming +### Storage + +Sessions are stored in an in-memory `HashMap` inside `SessionManager`: + +```rust +struct SessionManager { + sessions: Mutex>, + // ... +} +``` + +There is no disk persistence. Sessions are ephemeral and lost on server restart. + +### SessionState + +Each session tracks: + +| Field | Purpose | +|-------|---------| +| `session_id` | Client-provided identifier | +| `agent` | Agent type (Claude, Codex, OpenCode, Amp) | +| `agent_mode` | Operating mode (build, plan, custom) | +| `permission_mode` | Permission handling (default, plan, bypass) | +| `model` | Optional model override | +| `events: Vec` | Full event history | +| `pending_questions` | Question IDs awaiting reply | +| `pending_permissions` | Permission IDs awaiting reply | +| `broadcaster` | Tokio broadcast channel for SSE streaming | +| `ended` | Whether agent process has terminated | + +### Lifecycle + +``` +POST /v1/sessions/{sessionId} Create session, auto-install agent + ↓ +POST /v1/sessions/{id}/messages Spawn agent subprocess, stream output + ↓ +GET /v1/sessions/{id}/events Poll for new events (offset-based) +GET /v1/sessions/{id}/events/sse Subscribe to SSE stream + ↓ +POST .../questions/{id}/reply Answer agent question +POST .../permissions/{id}/reply Grant/deny permission request + ↓ +(agent process terminates) Session marked as ended +``` + +### Event Streaming - Events are stored in memory per session and assigned a monotonically increasing `id`. - `/events` returns a slice of events by offset/limit. - `/events/sse` streams new events from the same offset semantics. -## Agent integration strategies +When a message is sent: -### Subprocess per session +1. `send_message()` spawns the agent CLI as a subprocess +2. `consume_spawn()` reads stdout/stderr line by line +3. Each JSON line is parsed and converted via `parse_agent_line()` +4. Events are recorded via `record_event()` which: + - Assigns incrementing event ID + - Appends to `events` vector + - Broadcasts to SSE subscribers -Claude Code, Codex, and Amp run as subprocesses. The daemon reads JSONL output from stdout and converts each event into a UniversalEvent. +## Agent Execution -### Shared server (OpenCode) +Each agent has a different execution model and communication pattern. -OpenCode runs as a shared server. The daemon connects via HTTP and SSE, then converts OpenCode events to UniversalEvents. +### Overview -## Human-in-the-loop +| Agent | Execution Model | Binary Source | Session Resume | +|-------|-----------------|---------------|----------------| +| Claude Code | CLI subprocess | GCS (Anthropic) | Yes (`--resume`) | +| Codex | App Server subprocess (JSON-RPC) | GitHub releases | No | +| OpenCode | HTTP server + SSE | GitHub releases | Yes (server-side) | +| Amp | CLI subprocess | GCS (Amp) | Yes (`--continue`) | + +### Claude Code + +Spawned as a subprocess with JSONL streaming: + +```bash +claude --print --output-format stream-json --verbose \ + [--model MODEL] [--resume SESSION_ID] \ + [--permission-mode plan | --dangerously-skip-permissions] \ + PROMPT +``` + +- Streams JSON events to stdout, one per line +- Supports session resumption via `--resume` +- Permission modes: `--permission-mode plan` for approval workflow, `--dangerously-skip-permissions` for bypass + +### Codex + +Spawned as a subprocess using the App Server JSON-RPC protocol: + +```bash +codex app-server +``` + +- JSON-RPC over stdio (JSONL) +- Uses `initialize`, `thread/start`, and `turn/start` requests +- Approval requests arrive as server JSON-RPC requests + +### OpenCode + +Unique architecture - runs as a **persistent HTTP server** rather than per-message subprocess: + +```bash +opencode serve --port {4200-4300} +``` + +Then communicates via HTTP endpoints: + +| Endpoint | Purpose | +|----------|---------| +| `POST /session` | Create new session | +| `POST /session/{id}/prompt` | Send message | +| `GET /event/subscribe` | SSE event stream | +| `POST /question/reply` | Answer HITL question | +| `POST /permission/reply` | Grant/deny permission | + +The server is started once and reused across sessions. Events are received via Server-Sent Events (SSE) subscription. + +### Amp + +Spawned as a subprocess with dynamic flag detection: + +```bash +amp [--execute|--print] [--output-format stream-json] \ + [--model MODEL] [--continue SESSION_ID] \ + [--dangerously-skip-permissions] PROMPT +``` + +- **Dynamic flag detection**: Probes `--help` output to determine which flags the installed version supports +- **Fallback strategy**: If execution fails, retries with progressively simpler flag combinations +- Streams JSON events to stdout +- Supports session continuation via `--continue` + +### Communication Patterns + +**Subprocess agents (Claude, Codex, Amp):** +1. Agent CLI spawned with appropriate flags +2. Stdout/stderr read line-by-line +3. Each line parsed as JSON +4. Events converted via `parse_agent_line()` → agent-specific converter +5. Universal events recorded and broadcast to SSE subscribers + +**HTTP server agent (OpenCode):** +1. Server started on available port (if not running) +2. Session created via HTTP POST +3. Prompts sent via HTTP POST +4. Events received via SSE subscription +5. HITL responses forwarded via HTTP POST + +### Credential Handling + +All agents receive API keys via environment variables: + +| Agent | Environment Variables | +|-------|----------------------| +| Claude | `ANTHROPIC_API_KEY`, `CLAUDE_API_KEY` | +| Codex | `OPENAI_API_KEY`, `CODEX_API_KEY` | +| OpenCode | `OPENAI_API_KEY` | +| Amp | `ANTHROPIC_API_KEY` | + +## Human-in-the-Loop Questions and permission prompts are normalized into the universal schema: @@ -40,6 +241,76 @@ Questions and permission prompts are normalized into the universal schema: - Permission events surface as `permissionAsked` with `reply: once | always | reject`. - Claude plan approval is normalized into a question event (approve/reject). +## SDK Modes + +The TypeScript SDK supports two connection modes. + +### Embedded Mode + +Defined in `sdks/typescript/src/spawn.ts`: + +1. **Binary resolution**: Checks `SANDBOX_AGENT_BIN` env, then platform-specific npm package, then `PATH` +2. **Port selection**: Uses provided port or finds a free one via `net.createServer()` +3. **Token generation**: Uses provided token or generates random 24-byte hex string +4. **Spawn**: Launches `sandbox-agent server --host --port --token ` +5. **Health wait**: Polls `GET /v1/health` until server is ready (up to 15s timeout) +6. **Cleanup**: On dispose, sends SIGTERM then SIGKILL if needed; also registers process exit handlers + +```typescript +const handle = await spawnSandboxDaemon({ log: "inherit" }); +// handle.baseUrl = "http://127.0.0.1:" +// handle.token = "" +// handle.dispose() to cleanup +``` + +### Server Mode + +Defined in `sdks/typescript/src/client.ts`: + +- Direct HTTP client to a remote `sandbox-daemon` server +- Uses provided `baseUrl` and optional `token` +- No subprocess management + +```typescript +const client = new SandboxDaemonClient({ + baseUrl: "http://remote-server:8080", + token: "secret", +}); +``` + +### Auto-Detection + +`SandboxDaemonClient.connect()` chooses the mode automatically: + +```typescript +// If baseUrl provided → server mode +const client = await SandboxDaemonClient.connect({ + baseUrl: "http://remote:8080", +}); + +// If no baseUrl → embedded mode (spawns subprocess) +const client = await SandboxDaemonClient.connect({}); + +// Explicit control +const client = await SandboxDaemonClient.connect({ + spawn: { enabled: true, port: 9000 }, +}); +``` + +The `spawn` option can be: +- `true` / `false` - Enable/disable embedded mode +- `SandboxDaemonSpawnOptions` - Fine-grained control over host, port, token, binary path, timeout, logging + ## Authentication The daemon uses a **global token** configured at startup. All HTTP and CLI operations reuse the same token and are validated against the `Authorization` header (`Bearer` or `Token`). + +## Key Files + +| Component | Path | +|-----------|------| +| Agent spawn/install | `server/packages/agent-management/src/agents.rs` | +| Session routing | `server/packages/sandbox-agent/src/router.rs` | +| Event converters | `server/packages/universal-agent-schema/src/agents/*.rs` | +| Schema extractors | `resources/agent-schemas/src/*.ts` | +| TypeScript SDK | `sdks/typescript/src/` | diff --git a/research/agents/codex.md b/research/agents/codex.md index cbb757d..2686a4a 100644 --- a/research/agents/codex.md +++ b/research/agents/codex.md @@ -5,7 +5,8 @@ Research notes on OpenAI Codex's configuration, credential discovery, and runtim ## Overview - **Provider**: OpenAI -- **Execution Method**: SDK (`@openai/codex-sdk`) or CLI binary +- **Execution Method (this repo)**: Codex App Server (JSON-RPC over stdio) +- **Execution Method (alternatives)**: SDK (`@openai/codex-sdk`) or CLI binary - **Session Persistence**: Thread ID (string) - **Import**: Dynamic import to avoid bundling issues - **Binary Location**: `~/.nvm/versions/node/v24.3.0/bin/codex` (npm global install) @@ -21,7 +22,7 @@ Research notes on OpenAI Codex's configuration, credential discovery, and runtim Sources: [Codex SDK docs](https://developers.openai.com/codex/sdk/), [GitHub](https://github.com/openai/codex) -## CLI Usage (Alternative to SDK) +## CLI Usage (Alternative to App Server / SDK) You can use the `codex` binary directly instead of the SDK: @@ -129,41 +130,33 @@ for await (const event of events) { } ``` -## Event Types +## App Server Protocol (JSON-RPC) -| Event Type | Description | -|------------|-------------| -| `thread.started` | Thread initialized, contains `thread_id` | -| `item.completed` | Item finished, check for `agent_message` type | -| `turn.failed` | Turn failed with error message | +Codex App Server uses JSON-RPC 2.0 over JSONL/stdin/stdout (no port required). -### Event Structure +### Key Requests -```typescript -// thread.started -{ - type: "thread.started", - thread_id: "thread_abc123" -} +- `initialize` → returns server info +- `thread/start` → starts a new thread +- `turn/start` → sends user input for a thread -// item.completed (agent message) -{ - type: "item.completed", - item: { - type: "agent_message", - text: "Response text" - } -} +### Event Notifications (examples) -// turn.failed -{ - type: "turn.failed", - error: { - message: "Error description" - } -} +```json +{ "method": "thread/started", "params": { "thread": { "id": "thread_abc123" } } } +{ "method": "item/completed", "params": { "item": { "type": "agentMessage", "text": "..." } } } +{ "method": "turn/completed", "params": { "threadId": "thread_abc123", "turn": { "items": [] } } } ``` +### Approval Requests (server → client) + +The server can send JSON-RPC requests (with `id`) for approvals: + +- `item/commandExecution/requestApproval` +- `item/fileChange/requestApproval` + +These require JSON-RPC responses with a decision payload. + ## Response Schema ```typescript diff --git a/resources/agent-schemas/artifacts/json-schema/codex.json b/resources/agent-schemas/artifacts/json-schema/codex.json index c1d5cc8..e828f8f 100644 --- a/resources/agent-schemas/artifacts/json-schema/codex.json +++ b/resources/agent-schemas/artifacts/json-schema/codex.json @@ -1978,7 +1978,7 @@ "type": { "type": "string", "enum": [ - "danger-full-access" + "dangerFullAccess" ], "title": "DangerFullAccessSandboxPolicyType" } @@ -1995,7 +1995,7 @@ "type": { "type": "string", "enum": [ - "read-only" + "readOnly" ], "title": "ReadOnlySandboxPolicyType" } @@ -2021,7 +2021,7 @@ "type": { "type": "string", "enum": [ - "external-sandbox" + "externalSandbox" ], "title": "ExternalSandboxSandboxPolicyType" } @@ -2053,7 +2053,7 @@ "type": { "type": "string", "enum": [ - "workspace-write" + "workspaceWrite" ], "title": "WorkspaceWriteSandboxPolicyType" }, @@ -2082,7 +2082,7 @@ "type": { "type": "string", "enum": [ - "danger-full-access" + "dangerFullAccess" ], "title": "DangerFullAccessSandboxPolicy2Type" } @@ -2099,7 +2099,7 @@ "type": { "type": "string", "enum": [ - "read-only" + "readOnly" ], "title": "ReadOnlySandboxPolicy2Type" } @@ -2125,7 +2125,7 @@ "type": { "type": "string", "enum": [ - "external-sandbox" + "externalSandbox" ], "title": "ExternalSandboxSandboxPolicy2Type" } @@ -2157,7 +2157,7 @@ "type": { "type": "string", "enum": [ - "workspace-write" + "workspaceWrite" ], "title": "WorkspaceWriteSandboxPolicy2Type" }, diff --git a/server/packages/agent-management/Cargo.toml b/server/packages/agent-management/Cargo.toml index 4b3bff1..d31ed26 100644 --- a/server/packages/agent-management/Cargo.toml +++ b/server/packages/agent-management/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true [dependencies] sandbox-agent-agent-credentials.workspace = true +sandbox-agent-extracted-agent-schemas.workspace = true thiserror.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/server/packages/agent-management/src/agents.rs b/server/packages/agent-management/src/agents.rs index 5a74632..d24f33a 100644 --- a/server/packages/agent-management/src/agents.rs +++ b/server/packages/agent-management/src/agents.rs @@ -1,12 +1,22 @@ use std::collections::HashMap; use std::fmt; use std::fs; -use std::io::{self, Read}; +use std::io::{self, BufRead, BufReader, Read, Write}; use std::path::{Path, PathBuf}; -use std::process::{Child, ChildStderr, ChildStdout, Command, ExitStatus, Stdio}; +use std::process::{ + Child, + ChildStderr, + ChildStdin, + ChildStdout, + Command, + ExitStatus, + Stdio, +}; +use std::time::{Duration, Instant}; use flate2::read::GzDecoder; use reqwest::blocking::Client; +use sandbox_agent_extracted_agent_schemas::codex as codex_schema; use serde::{Deserialize, Serialize}; use serde_json::Value; use thiserror::Error; @@ -163,6 +173,9 @@ impl AgentManager { } pub fn spawn(&self, agent: AgentId, options: SpawnOptions) -> Result { + if agent == AgentId::Codex { + return self.spawn_codex_app_server(options); + } let path = self.resolve_binary(agent)?; let working_dir = options .working_dir @@ -199,23 +212,7 @@ impl AgentManager { if options.session_id.is_some() { return Err(AgentError::ResumeUnsupported { agent }); } - command - .arg("exec") - .arg("--json"); - match options.permission_mode.as_deref() { - Some("plan") => { - command.arg("--sandbox").arg("read-only"); - } - Some("bypass") => { - command.arg("--dangerously-bypass-approvals-and-sandbox"); - } - _ => {} - } - if let Some(model) = options.model.as_deref() { - command.arg("-m").arg(model); - } - let prompt = codex_prompt_for_mode(&options.prompt, options.agent_mode.as_deref()); - command.arg(prompt); + command.arg("app-server"); } AgentId::Opencode => { command @@ -275,12 +272,225 @@ impl AgentManager { agent: AgentId, options: SpawnOptions, ) -> Result { + let codex_options = if agent == AgentId::Codex { + Some(options.clone()) + } else { + None + }; let mut command = self.build_command(agent, &options)?; + if agent == AgentId::Codex { + command.stdin(Stdio::piped()); + } command.stdout(Stdio::piped()).stderr(Stdio::piped()); let mut child = command.spawn().map_err(AgentError::Io)?; + let stdin = child.stdin.take(); let stdout = child.stdout.take(); let stderr = child.stderr.take(); - Ok(StreamingSpawn { child, stdout, stderr }) + Ok(StreamingSpawn { + child, + stdin, + stdout, + stderr, + codex_options, + }) + } + + fn spawn_codex_app_server(&self, options: SpawnOptions) -> Result { + if options.session_id.is_some() { + return Err(AgentError::ResumeUnsupported { agent: AgentId::Codex }); + } + let mut command = self.build_command(AgentId::Codex, &options)?; + command.stdin(Stdio::piped()).stdout(Stdio::piped()).stderr(Stdio::piped()); + for (key, value) in options.env { + command.env(key, value); + } + + let mut child = command.spawn().map_err(AgentError::Io)?; + let mut stdin = child.stdin.take().ok_or_else(|| { + AgentError::Io(io::Error::new(io::ErrorKind::Other, "missing codex stdin")) + })?; + let stdout = child.stdout.take().ok_or_else(|| { + AgentError::Io(io::Error::new(io::ErrorKind::Other, "missing codex stdout")) + })?; + let stderr = child.stderr.take().ok_or_else(|| { + AgentError::Io(io::Error::new(io::ErrorKind::Other, "missing codex stderr")) + })?; + + let stderr_handle = std::thread::spawn(move || { + let mut buffer = String::new(); + let _ = BufReader::new(stderr).read_to_string(&mut buffer); + buffer + }); + + let approval_policy = codex_approval_policy(options.permission_mode.as_deref()); + let sandbox_mode = codex_sandbox_mode(options.permission_mode.as_deref()); + let sandbox_policy = codex_sandbox_policy(options.permission_mode.as_deref()); + let prompt = codex_prompt_for_mode(&options.prompt, options.agent_mode.as_deref()); + let cwd = options + .working_dir + .as_ref() + .map(|path| path.to_string_lossy().to_string()); + + let mut next_id = 1i64; + let init_id = next_request_id(&mut next_id); + send_json_line( + &mut stdin, + &codex_schema::ClientRequest::Initialize { + id: init_id.clone(), + params: codex_schema::InitializeParams { + client_info: codex_schema::ClientInfo { + name: "sandbox-agent".to_string(), + title: Some("sandbox-agent".to_string()), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + }, + }, + )?; + + let mut init_done = false; + let mut thread_start_sent = false; + let mut thread_start_id: Option = None; + let mut turn_start_sent = false; + let mut thread_id: Option = None; + let mut stdout_buffer = String::new(); + let mut events = Vec::new(); + let mut line = String::new(); + let mut reader = BufReader::new(stdout); + let mut completed = false; + while reader.read_line(&mut line).map_err(AgentError::Io)? > 0 { + stdout_buffer.push_str(&line); + let trimmed = line.trim_end_matches(&['\r', '\n'][..]).to_string(); + line.clear(); + if trimmed.is_empty() { + continue; + } + let value: Value = match serde_json::from_str(&trimmed) { + Ok(value) => value, + Err(_) => continue, + }; + let message: codex_schema::JsonrpcMessage = + match serde_json::from_value(value.clone()) { + Ok(message) => message, + Err(_) => continue, + }; + match message { + codex_schema::JsonrpcMessage::Response(response) => { + let response_id = response.id.to_string(); + if !init_done && response_id == init_id.to_string() { + init_done = true; + send_json_line( + &mut stdin, + &codex_schema::JsonrpcNotification { + method: "initialized".to_string(), + params: None, + }, + )?; + let request_id = next_request_id(&mut next_id); + let request_id_str = request_id.to_string(); + let mut params = codex_schema::ThreadStartParams::default(); + params.approval_policy = approval_policy; + params.sandbox = sandbox_mode; + params.model = options.model.clone(); + params.cwd = cwd.clone(); + send_json_line( + &mut stdin, + &codex_schema::ClientRequest::ThreadStart { id: request_id, params }, + )?; + thread_start_id = Some(request_id_str); + thread_start_sent = true; + } else if thread_start_id.as_deref() == Some(&response_id) && thread_id.is_none() { + thread_id = codex_thread_id_from_response(&response.result); + } + events.push(value); + } + codex_schema::JsonrpcMessage::Notification(_) => { + if let Ok(notification) = + serde_json::from_value::(value.clone()) + { + if thread_id.is_none() { + thread_id = codex_thread_id_from_notification(¬ification); + } + if matches!( + notification, + codex_schema::ServerNotification::TurnCompleted(_) + | codex_schema::ServerNotification::Error(_) + ) { + completed = true; + } + if let codex_schema::ServerNotification::ItemCompleted(params) = ¬ification { + if matches!(params.item, codex_schema::ThreadItem::AgentMessage { .. }) { + completed = true; + } + } + } + events.push(value); + } + codex_schema::JsonrpcMessage::Request(_) => { + events.push(value); + } + codex_schema::JsonrpcMessage::Error(_) => { + events.push(value); + completed = true; + } + } + if thread_id.is_some() && thread_start_sent && !turn_start_sent { + let request_id = next_request_id(&mut next_id); + let params = codex_schema::TurnStartParams { + approval_policy, + collaboration_mode: None, + cwd: cwd.clone(), + effort: None, + input: vec![codex_schema::UserInput::Text { + text: prompt.clone(), + text_elements: Vec::new(), + }], + model: options.model.clone(), + output_schema: None, + personality: None, + sandbox_policy: sandbox_policy.clone(), + summary: None, + thread_id: thread_id.clone().unwrap_or_default(), + }; + send_json_line( + &mut stdin, + &codex_schema::ClientRequest::TurnStart { + id: request_id, + params, + }, + )?; + turn_start_sent = true; + } + if completed { + break; + } + } + + drop(stdin); + let status = if completed { + let start = Instant::now(); + loop { + if let Some(status) = child.try_wait().map_err(AgentError::Io)? { + break status; + } + if start.elapsed() > Duration::from_secs(5) { + let _ = child.kill(); + break child.wait().map_err(AgentError::Io)?; + } + std::thread::sleep(Duration::from_millis(50)); + } + } else { + child.wait().map_err(AgentError::Io)? + }; + let stderr_output = stderr_handle.join().unwrap_or_default(); + + Ok(SpawnResult { + status, + stdout: stdout_buffer, + stderr: stderr_output, + session_id: extract_session_id(AgentId::Codex, &events), + result: extract_result_text(AgentId::Codex, &events), + events, + }) } fn build_command(&self, agent: AgentId, options: &SpawnOptions) -> Result { @@ -320,21 +530,7 @@ impl AgentManager { if options.session_id.is_some() { return Err(AgentError::ResumeUnsupported { agent }); } - command.arg("exec").arg("--json"); - match options.permission_mode.as_deref() { - Some("plan") => { - command.arg("--sandbox").arg("read-only"); - } - Some("bypass") => { - command.arg("--dangerously-bypass-approvals-and-sandbox"); - } - _ => {} - } - if let Some(model) = options.model.as_deref() { - command.arg("-m").arg(model); - } - let prompt = codex_prompt_for_mode(&options.prompt, options.agent_mode.as_deref()); - command.arg(prompt); + command.arg("app-server"); } AgentId::Opencode => { command.arg("run").arg("--format").arg("json"); @@ -441,8 +637,10 @@ pub struct SpawnResult { #[derive(Debug)] pub struct StreamingSpawn { pub child: Child, + pub stdin: Option, pub stdout: Option, pub stderr: Option, + pub codex_options: Option, } #[derive(Debug, Error)] @@ -497,6 +695,83 @@ fn codex_prompt_for_mode(prompt: &str, mode: Option<&str>) -> String { } } +fn codex_approval_policy(mode: Option<&str>) -> Option { + match mode { + Some("plan") => Some(codex_schema::AskForApproval::Untrusted), + Some("bypass") => Some(codex_schema::AskForApproval::Never), + _ => None, + } +} + +fn codex_sandbox_mode(mode: Option<&str>) -> Option { + match mode { + Some("plan") => Some(codex_schema::SandboxMode::ReadOnly), + Some("bypass") => Some(codex_schema::SandboxMode::DangerFullAccess), + _ => None, + } +} + +fn codex_sandbox_policy(mode: Option<&str>) -> Option { + match mode { + Some("plan") => Some(codex_schema::SandboxPolicy::ReadOnly), + Some("bypass") => Some(codex_schema::SandboxPolicy::DangerFullAccess), + _ => None, + } +} + +fn next_request_id(next_id: &mut i64) -> codex_schema::RequestId { + let id = *next_id; + *next_id += 1; + codex_schema::RequestId::from(id) +} + +fn send_json_line(stdin: &mut ChildStdin, payload: &T) -> Result<(), AgentError> { + let line = serde_json::to_string(payload) + .map_err(|err| AgentError::Io(io::Error::new(io::ErrorKind::Other, err)))?; + writeln!(stdin, "{line}").map_err(AgentError::Io)?; + stdin.flush().map_err(AgentError::Io)?; + Ok(()) +} + +fn codex_thread_id_from_notification( + notification: &codex_schema::ServerNotification, +) -> Option { + match notification { + codex_schema::ServerNotification::ThreadStarted(params) => Some(params.thread.id.clone()), + codex_schema::ServerNotification::TurnStarted(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::TurnCompleted(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemStarted(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemCompleted(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemAgentMessageDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemReasoningTextDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemReasoningSummaryTextDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemCommandExecutionOutputDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemFileChangeOutputDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemMcpToolCallProgress(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ThreadTokenUsageUpdated(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::TurnDiffUpdated(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::TurnPlanUpdated(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemCommandExecutionTerminalInteraction(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemReasoningSummaryPartAdded(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ThreadCompacted(params) => Some(params.thread_id.clone()), + _ => None, + } +} + +fn codex_thread_id_from_response(result: &Value) -> Option { + if let Some(id) = result + .get("thread") + .and_then(|thread| thread.get("id")) + .and_then(Value::as_str) + { + return Some(id.to_string()); + } + if let Some(id) = result.get("threadId").and_then(Value::as_str) { + return Some(id.to_string()); + } + None +} + fn extract_nested_string(value: &Value, path: &[&str]) -> Option { let mut current = value; for key in path { @@ -517,19 +792,36 @@ fn extract_session_id(agent: AgentId, events: &[Value]) -> Option { return Some(id.to_string()); } } - AgentId::Codex => { - if event.get("type").and_then(Value::as_str) == Some("thread.started") { - if let Some(id) = event.get("thread_id").and_then(Value::as_str) { - return Some(id.to_string()); + AgentId::Codex => { + if let Ok(notification) = + serde_json::from_value::(event.clone()) + { + match notification { + codex_schema::ServerNotification::ThreadStarted(params) => { + return Some(params.thread.id); } - } - if let Some(id) = event.get("thread_id").and_then(Value::as_str) { - return Some(id.to_string()); - } - if let Some(id) = event.get("threadId").and_then(Value::as_str) { - return Some(id.to_string()); + codex_schema::ServerNotification::TurnStarted(params) => { + return Some(params.thread_id); + } + codex_schema::ServerNotification::TurnCompleted(params) => { + return Some(params.thread_id); + } + codex_schema::ServerNotification::ItemStarted(params) => { + return Some(params.thread_id); + } + codex_schema::ServerNotification::ItemCompleted(params) => { + return Some(params.thread_id); + } + _ => {} } } + if let Some(id) = event.get("thread_id").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = event.get("threadId").and_then(Value::as_str) { + return Some(id.to_string()); + } + } AgentId::Opencode => { if let Some(id) = event.get("session_id").and_then(Value::as_str) { return Some(id.to_string()); @@ -574,13 +866,25 @@ fn extract_result_text(agent: AgentId, events: &[Value]) -> Option { AgentId::Codex => { let mut last = None; for event in events { - if event.get("type").and_then(Value::as_str) == Some("item.completed") { - if let Some(item) = event.get("item") { - if item.get("type").and_then(Value::as_str) == Some("agent_message") { - if let Some(text) = item.get("text").and_then(Value::as_str) { - last = Some(text.to_string()); + if let Ok(notification) = + serde_json::from_value::(event.clone()) + { + match notification { + codex_schema::ServerNotification::ItemCompleted(params) => { + if let codex_schema::ThreadItem::AgentMessage { text, .. } = params.item + { + last = Some(text); } } + codex_schema::ServerNotification::TurnCompleted(params) => { + for item in params.turn.items.iter().rev() { + if let codex_schema::ThreadItem::AgentMessage { text, .. } = item { + last = Some(text.clone()); + break; + } + } + } + _ => {} } } if let Some(result) = event.get("result").and_then(Value::as_str) { diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index 8900ef8..88c5855 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, HashSet}; use std::convert::Infallible; -use std::io::{BufRead, BufReader}; +use std::io::{BufRead, BufReader, Write}; use std::net::TcpListener; use std::process::Stdio; use std::sync::Arc; @@ -19,10 +19,26 @@ use tower_http::trace::TraceLayer; use reqwest::Client; use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError}; use sandbox_agent_universal_agent_schema::{ - convert_amp, convert_claude, convert_codex, convert_opencode, AttachmentSource, CrashInfo, - EventConversion, PermissionRequest, PermissionToolRef, QuestionInfo, QuestionOption, - QuestionRequest, QuestionToolRef, Started, UniversalEvent, UniversalEventData, - UniversalMessage, UniversalMessageParsed, UniversalMessagePart, + codex as codex_schema, + convert_amp, + convert_claude, + convert_codex, + convert_opencode, + AttachmentSource, + CrashInfo, + EventConversion, + PermissionRequest, + PermissionToolRef, + QuestionInfo, + QuestionOption, + QuestionRequest, + QuestionToolRef, + Started, + UniversalEvent, + UniversalEventData, + UniversalMessage, + UniversalMessageParsed, + UniversalMessagePart, }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -38,6 +54,7 @@ use sandbox_agent_agent_management::agents::{ use sandbox_agent_agent_management::credentials::{ extract_all_credentials, CredentialExtractionOptions, ExtractedCredentials, }; +use crate::ui; #[derive(Debug)] pub struct AppState { @@ -104,9 +121,13 @@ pub fn build_router(state: AppState) -> Router { v1_router = v1_router.layer(axum::middleware::from_fn_with_state(shared, require_token)); } - Router::new() - .nest("/v1", v1_router) - .layer(TraceLayer::new_for_http()) + let mut router = Router::new().nest("/v1", v1_router); + + if ui::is_enabled() { + router = router.merge(ui::router()); + } + + router.layer(TraceLayer::new_for_http()) } #[derive(OpenApi)] @@ -204,6 +225,7 @@ struct SessionState { pending_permissions: HashSet, broadcaster: broadcast::Sender, opencode_stream_started: bool, + codex_sender: Option>, } impl SessionState { @@ -236,6 +258,7 @@ impl SessionState { pending_permissions: HashSet::new(), broadcaster, opencode_stream_started: false, + codex_sender: None, }) } @@ -274,6 +297,14 @@ impl SessionState { event } + fn set_codex_sender(&mut self, sender: Option>) { + self.codex_sender = sender; + } + + fn codex_sender(&self) -> Option> { + self.codex_sender.clone() + } + fn normalize_event_data(&self, mut data: UniversalEventData) -> UniversalEventData { match &mut data { UniversalEventData::QuestionAsked { question_asked } => { @@ -627,7 +658,7 @@ impl SessionManager { permission_id: &str, reply: PermissionReply, ) -> Result<(), SandboxError> { - let (agent, agent_session_id) = { + let (agent, agent_session_id, codex_sender, codex_metadata) = { let mut sessions = self.sessions.lock().await; let session = sessions.get_mut(session_id).ok_or_else(|| SandboxError::SessionNotFound { session_id: session_id.to_string(), @@ -640,10 +671,79 @@ impl SessionManager { message: format!("unknown permission id: {permission_id}"), }); } - (session.agent, session.agent_session_id.clone()) + let codex_metadata = if session.agent == AgentId::Codex { + session.events.iter().find_map(|event| { + if let UniversalEventData::PermissionAsked { permission_asked } = &event.data { + if permission_asked.id == permission_id { + return Some(permission_asked.metadata.clone()); + } + } + None + }) + } else { + None + }; + let codex_sender = if session.agent == AgentId::Codex { + session.codex_sender() + } else { + None + }; + ( + session.agent, + session.agent_session_id.clone(), + codex_sender, + codex_metadata, + ) }; - if agent == AgentId::Opencode { + if agent == AgentId::Codex { + let sender = codex_sender.ok_or_else(|| SandboxError::InvalidRequest { + message: "codex session not active".to_string(), + })?; + let metadata = codex_metadata.ok_or_else(|| SandboxError::InvalidRequest { + message: "missing codex permission metadata".to_string(), + })?; + let request_id = codex_request_id_from_metadata(&metadata) + .or_else(|| codex_request_id_from_string(permission_id)) + .ok_or_else(|| SandboxError::InvalidRequest { + message: "invalid codex permission request id".to_string(), + })?; + let request_kind = metadata + .get("codexRequestKind") + .and_then(Value::as_str) + .unwrap_or(""); + let response_value = match request_kind { + "commandExecution" => { + let decision = codex_command_decision_for_reply(reply); + let response = codex_schema::CommandExecutionRequestApprovalResponse { decision }; + serde_json::to_value(response).map_err(|err| SandboxError::InvalidRequest { + message: err.to_string(), + })? + } + "fileChange" => { + let decision = codex_file_change_decision_for_reply(reply); + let response = codex_schema::FileChangeRequestApprovalResponse { decision }; + serde_json::to_value(response).map_err(|err| SandboxError::InvalidRequest { + message: err.to_string(), + })? + } + _ => { + return Err(SandboxError::InvalidRequest { + message: "unsupported codex permission request".to_string(), + }); + } + }; + let response = codex_schema::JsonrpcResponse { + id: request_id, + result: response_value, + }; + let line = serde_json::to_string(&response).map_err(|err| SandboxError::InvalidRequest { + message: err.to_string(), + })?; + sender.send(line).map_err(|_| SandboxError::InvalidRequest { + message: "codex session not active".to_string(), + })?; + } else if agent == AgentId::Opencode { let agent_session_id = agent_session_id.ok_or_else(|| SandboxError::InvalidRequest { message: "missing OpenCode session id".to_string(), })?; @@ -681,10 +781,17 @@ impl SessionManager { ) { let StreamingSpawn { mut child, + stdin, stdout, stderr, + codex_options, } = spawn; let (tx, mut rx) = mpsc::unbounded_channel::(); + let mut codex_state = codex_options + .filter(|_| agent == AgentId::Codex) + .map(CodexAppServerState::new); + let mut codex_sender: Option> = None; + let mut terminate_early = false; if let Some(stdout) = stdout { let tx_stdout = tx.clone(); @@ -700,12 +807,52 @@ impl SessionManager { } drop(tx); + if agent == AgentId::Codex { + if let Some(stdin) = stdin { + let (writer_tx, writer_rx) = mpsc::unbounded_channel::(); + codex_sender = Some(writer_tx.clone()); + { + let mut sessions = self.sessions.lock().await; + if let Some(session) = sessions.get_mut(&session_id) { + session.set_codex_sender(Some(writer_tx)); + } + } + tokio::task::spawn_blocking(move || { + write_lines(stdin, writer_rx); + }); + } + if let (Some(state), Some(sender)) = (codex_state.as_mut(), codex_sender.as_ref()) { + state.start(sender); + } + } + while let Some(line) = rx.recv().await { - if let Some(conversion) = parse_agent_line(agent, &line, &session_id) { + if agent == AgentId::Codex { + if let Some(state) = codex_state.as_mut() { + let outcome = state.handle_line(&line); + if let Some(conversion) = outcome.conversion { + let _ = self.record_conversion(&session_id, conversion).await; + } + if outcome.should_terminate { + terminate_early = true; + break; + } + } + } else if let Some(conversion) = parse_agent_line(agent, &line, &session_id) { let _ = self.record_conversion(&session_id, conversion).await; } } + if agent == AgentId::Codex { + let mut sessions = self.sessions.lock().await; + if let Some(session) = sessions.get_mut(&session_id) { + session.set_codex_sender(None); + } + } + + if terminate_early { + let _ = child.kill(); + } let status = tokio::task::spawn_blocking(move || child.wait()).await; match status { Ok(Ok(status)) if status.success() => {} @@ -1932,6 +2079,430 @@ fn read_lines(reader: R, sender: mpsc::UnboundedSender } } +fn write_lines(mut stdin: std::process::ChildStdin, mut receiver: mpsc::UnboundedReceiver) { + while let Some(line) = receiver.blocking_recv() { + if writeln!(stdin, "{line}").is_err() { + break; + } + if stdin.flush().is_err() { + break; + } + } +} + +#[derive(Default)] +struct CodexLineOutcome { + conversion: Option, + should_terminate: bool, +} + +struct CodexAppServerState { + init_id: Option, + thread_start_id: Option, + init_done: bool, + thread_start_sent: bool, + turn_start_sent: bool, + thread_id: Option, + next_id: i64, + prompt: String, + model: Option, + cwd: Option, + approval_policy: Option, + sandbox_mode: Option, + sandbox_policy: Option, + sender: Option>, +} + +impl CodexAppServerState { + fn new(options: SpawnOptions) -> Self { + let prompt = codex_prompt_for_mode(&options.prompt, options.agent_mode.as_deref()); + let cwd = options + .working_dir + .as_ref() + .map(|path| path.to_string_lossy().to_string()); + Self { + init_id: None, + thread_start_id: None, + init_done: false, + thread_start_sent: false, + turn_start_sent: false, + thread_id: None, + next_id: 1, + prompt, + model: options.model.clone(), + cwd, + approval_policy: codex_approval_policy(options.permission_mode.as_deref()), + sandbox_mode: codex_sandbox_mode(options.permission_mode.as_deref()), + sandbox_policy: codex_sandbox_policy(options.permission_mode.as_deref()), + sender: None, + } + } + + fn start(&mut self, sender: &mpsc::UnboundedSender) { + self.sender = Some(sender.clone()); + let request_id = self.next_request_id(); + self.init_id = Some(request_id.to_string()); + let request = codex_schema::ClientRequest::Initialize { + id: request_id, + params: codex_schema::InitializeParams { + client_info: codex_schema::ClientInfo { + name: "sandbox-agent".to_string(), + title: Some("sandbox-agent".to_string()), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + }, + }; + self.send_json(&request); + } + + fn handle_line(&mut self, line: &str) -> CodexLineOutcome { + let trimmed = line.trim(); + if trimmed.is_empty() { + return CodexLineOutcome::default(); + } + let value: Value = match serde_json::from_str(trimmed) { + Ok(value) => value, + Err(_) => return CodexLineOutcome::default(), + }; + let message: codex_schema::JsonrpcMessage = match serde_json::from_value(value.clone()) { + Ok(message) => message, + Err(_) => return CodexLineOutcome::default(), + }; + + match message { + codex_schema::JsonrpcMessage::Response(response) => { + self.handle_response(&response); + CodexLineOutcome::default() + } + codex_schema::JsonrpcMessage::Notification(_) => { + if let Ok(notification) = + serde_json::from_value::(value.clone()) + { + self.maybe_capture_thread_id(¬ification); + let conversion = convert_codex::notification_to_universal(¬ification); + let should_terminate = matches!( + notification, + codex_schema::ServerNotification::TurnCompleted(_) + | codex_schema::ServerNotification::Error(_) + ); + CodexLineOutcome { + conversion: Some(conversion), + should_terminate, + } + } else { + CodexLineOutcome::default() + } + } + codex_schema::JsonrpcMessage::Request(_) => { + if let Ok(request) = + serde_json::from_value::(value.clone()) + { + let conversion = codex_request_to_universal(&request); + CodexLineOutcome { + conversion: Some(conversion), + should_terminate: false, + } + } else { + CodexLineOutcome::default() + } + } + codex_schema::JsonrpcMessage::Error(error) => CodexLineOutcome { + conversion: Some(codex_rpc_error_to_universal(&error)), + should_terminate: true, + }, + } + } + + fn handle_response(&mut self, response: &codex_schema::JsonrpcResponse) { + let response_id = response.id.to_string(); + if !self.init_done { + if self + .init_id + .as_ref() + .is_some_and(|id| id == &response_id) + { + self.init_done = true; + self.send_initialized(); + self.send_thread_start(); + } + return; + } + if self.thread_id.is_none() + && self + .thread_start_id + .as_ref() + .is_some_and(|id| id == &response_id) + { + self.send_turn_start(); + } + } + + fn maybe_capture_thread_id(&mut self, notification: &codex_schema::ServerNotification) { + if self.thread_id.is_some() { + return; + } + let thread_id = match notification { + codex_schema::ServerNotification::ThreadStarted(params) => Some(params.thread.id.clone()), + codex_schema::ServerNotification::TurnStarted(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::TurnCompleted(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemStarted(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemCompleted(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemAgentMessageDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemReasoningTextDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemReasoningSummaryTextDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemCommandExecutionOutputDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemFileChangeOutputDelta(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemMcpToolCallProgress(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ThreadTokenUsageUpdated(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::TurnDiffUpdated(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::TurnPlanUpdated(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemCommandExecutionTerminalInteraction(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ItemReasoningSummaryPartAdded(params) => Some(params.thread_id.clone()), + codex_schema::ServerNotification::ThreadCompacted(params) => Some(params.thread_id.clone()), + _ => None, + }; + if let Some(thread_id) = thread_id { + self.thread_id = Some(thread_id); + self.send_turn_start(); + } + } + + fn send_initialized(&self) { + let notification = codex_schema::JsonrpcNotification { + method: "initialized".to_string(), + params: None, + }; + self.send_json(¬ification); + } + + fn send_thread_start(&mut self) { + if self.thread_start_sent { + return; + } + let request_id = self.next_request_id(); + self.thread_start_id = Some(request_id.to_string()); + let mut params = codex_schema::ThreadStartParams::default(); + params.approval_policy = self.approval_policy; + params.sandbox = self.sandbox_mode; + params.model = self.model.clone(); + params.cwd = self.cwd.clone(); + let request = codex_schema::ClientRequest::ThreadStart { id: request_id, params }; + self.thread_start_sent = true; + self.send_json(&request); + } + + fn send_turn_start(&mut self) { + if self.turn_start_sent { + return; + } + let thread_id = match self.thread_id.clone() { + Some(thread_id) => thread_id, + None => return, + }; + let request_id = self.next_request_id(); + let params = codex_schema::TurnStartParams { + approval_policy: self.approval_policy, + collaboration_mode: None, + cwd: self.cwd.clone(), + effort: None, + input: vec![codex_schema::UserInput::Text { + text: self.prompt.clone(), + text_elements: Vec::new(), + }], + model: self.model.clone(), + output_schema: None, + personality: None, + sandbox_policy: self.sandbox_policy.clone(), + summary: None, + thread_id, + }; + let request = codex_schema::ClientRequest::TurnStart { id: request_id, params }; + self.turn_start_sent = true; + self.send_json(&request); + } + + fn next_request_id(&mut self) -> codex_schema::RequestId { + let id = self.next_id; + self.next_id += 1; + codex_schema::RequestId::from(id) + } + + fn send_json(&self, payload: &T) { + let Some(sender) = self.sender.as_ref() else { + return; + }; + let Ok(line) = serde_json::to_string(payload) else { + return; + }; + let _ = sender.send(line); + } +} + +fn codex_prompt_for_mode(prompt: &str, mode: Option<&str>) -> String { + match mode { + Some("plan") => format!("Make a plan before acting.\n\n{prompt}"), + _ => prompt.to_string(), + } +} + +fn codex_approval_policy(mode: Option<&str>) -> Option { + match mode { + Some("plan") => Some(codex_schema::AskForApproval::Untrusted), + Some("bypass") => Some(codex_schema::AskForApproval::Never), + _ => None, + } +} + +fn codex_sandbox_mode(mode: Option<&str>) -> Option { + match mode { + Some("plan") => Some(codex_schema::SandboxMode::ReadOnly), + Some("bypass") => Some(codex_schema::SandboxMode::DangerFullAccess), + _ => None, + } +} + +fn codex_sandbox_policy(mode: Option<&str>) -> Option { + match mode { + Some("plan") => Some(codex_schema::SandboxPolicy::ReadOnly), + Some("bypass") => Some(codex_schema::SandboxPolicy::DangerFullAccess), + _ => None, + } +} + +fn codex_request_to_universal(request: &codex_schema::ServerRequest) -> EventConversion { + match request { + codex_schema::ServerRequest::ItemCommandExecutionRequestApproval { id, params } => { + let mut metadata = serde_json::Map::new(); + metadata.insert( + "codexRequestKind".to_string(), + Value::String("commandExecution".to_string()), + ); + metadata.insert( + "codexRequestId".to_string(), + serde_json::to_value(id).unwrap_or(Value::Null), + ); + metadata.insert("threadId".to_string(), Value::String(params.thread_id.clone())); + metadata.insert("turnId".to_string(), Value::String(params.turn_id.clone())); + metadata.insert("itemId".to_string(), Value::String(params.item_id.clone())); + if let Some(command) = params.command.as_ref() { + metadata.insert("command".to_string(), Value::String(command.clone())); + } + if let Some(reason) = params.reason.as_ref() { + metadata.insert("reason".to_string(), Value::String(reason.clone())); + } + let permission = PermissionRequest { + id: id.to_string(), + session_id: params.thread_id.clone(), + permission: "commandExecution".to_string(), + patterns: params + .command + .as_ref() + .map(|command| vec![command.clone()]) + .unwrap_or_default(), + metadata, + always: Vec::new(), + tool: None, + }; + EventConversion::new(UniversalEventData::PermissionAsked { + permission_asked: permission, + }) + .with_session(Some(params.thread_id.clone())) + } + codex_schema::ServerRequest::ItemFileChangeRequestApproval { id, params } => { + let mut metadata = serde_json::Map::new(); + metadata.insert( + "codexRequestKind".to_string(), + Value::String("fileChange".to_string()), + ); + metadata.insert( + "codexRequestId".to_string(), + serde_json::to_value(id).unwrap_or(Value::Null), + ); + metadata.insert("threadId".to_string(), Value::String(params.thread_id.clone())); + metadata.insert("turnId".to_string(), Value::String(params.turn_id.clone())); + metadata.insert("itemId".to_string(), Value::String(params.item_id.clone())); + if let Some(reason) = params.reason.as_ref() { + metadata.insert("reason".to_string(), Value::String(reason.clone())); + } + if let Some(grant_root) = params.grant_root.as_ref() { + metadata.insert("grantRoot".to_string(), Value::String(grant_root.clone())); + } + let permission = PermissionRequest { + id: id.to_string(), + session_id: params.thread_id.clone(), + permission: "fileChange".to_string(), + patterns: params + .grant_root + .as_ref() + .map(|root| vec![root.clone()]) + .unwrap_or_default(), + metadata, + always: Vec::new(), + tool: None, + }; + EventConversion::new(UniversalEventData::PermissionAsked { + permission_asked: permission, + }) + .with_session(Some(params.thread_id.clone())) + } + _ => EventConversion::new(UniversalEventData::Unknown { + raw: serde_json::to_value(request).unwrap_or(Value::Null), + }), + } +} + +fn codex_rpc_error_to_universal(error: &codex_schema::JsonrpcError) -> EventConversion { + let message = error.error.message.clone(); + let crash = CrashInfo { + message, + kind: Some("jsonrpc.error".to_string()), + details: serde_json::to_value(error).ok(), + }; + EventConversion::new(UniversalEventData::Error { error: crash }) +} + +fn codex_request_id_from_metadata( + metadata: &serde_json::Map, +) -> Option { + let value = metadata.get("codexRequestId")?; + codex_request_id_from_value(value) +} + +fn codex_request_id_from_string(value: &str) -> Option { + if let Ok(number) = value.parse::() { + return Some(codex_schema::RequestId::from(number)); + } + Some(codex_schema::RequestId::Variant0(value.to_string())) +} + +fn codex_request_id_from_value(value: &Value) -> Option { + match value { + Value::String(value) => Some(codex_schema::RequestId::Variant0(value.clone())), + Value::Number(value) => value.as_i64().map(codex_schema::RequestId::from), + _ => None, + } +} + +fn codex_command_decision_for_reply( + reply: PermissionReply, +) -> codex_schema::CommandExecutionApprovalDecision { + match reply { + PermissionReply::Once => codex_schema::CommandExecutionApprovalDecision::Accept, + PermissionReply::Always => codex_schema::CommandExecutionApprovalDecision::AcceptForSession, + PermissionReply::Reject => codex_schema::CommandExecutionApprovalDecision::Decline, + } +} + +fn codex_file_change_decision_for_reply( + reply: PermissionReply, +) -> codex_schema::FileChangeApprovalDecision { + match reply { + PermissionReply::Once => codex_schema::FileChangeApprovalDecision::Accept, + PermissionReply::Always => codex_schema::FileChangeApprovalDecision::AcceptForSession, + PermissionReply::Reject => codex_schema::FileChangeApprovalDecision::Decline, + } +} + fn parse_agent_line(agent: AgentId, line: &str, session_id: &str) -> Option { let trimmed = line.trim(); if trimmed.is_empty() { diff --git a/server/packages/sandbox-agent/tests/snapshots/http_sse_snapshots__approval_flow_snapshots@question_reply_events_claude.snap b/server/packages/sandbox-agent/tests/snapshots/http_sse_snapshots__approval_flow_snapshots@question_reply_events_claude.snap index 35076ce..da5765c 100644 --- a/server/packages/sandbox-agent/tests/snapshots/http_sse_snapshots__approval_flow_snapshots@question_reply_events_claude.snap +++ b/server/packages/sandbox-agent/tests/snapshots/http_sse_snapshots__approval_flow_snapshots@question_reply_events_claude.snap @@ -1,5 +1,6 @@ --- source: server/packages/sandbox-agent/tests/http_sse_snapshots.rs +assertion_line: 1045 expression: normalize_events(&question_events) --- - agent: claude @@ -20,3 +21,11 @@ expression: normalize_events(&question_events) type: text role: assistant seq: 3 +- agent: claude + kind: message + message: + parts: + - text: "" + type: text + role: assistant + seq: 4 diff --git a/todo.md b/todo.md index 60936f5..0e5f8a6 100644 --- a/todo.md +++ b/todo.md @@ -21,6 +21,8 @@ - [x] Implement SSE endpoint for events with same semantics as JSON endpoint - [x] Replace in-memory session store with sandbox session manager (questions/permissions routing, long-lived processes) - [x] Remove legacy token header support +- [x] Embed inspector frontend and serve it at `/ui` +- [x] Log inspector URL when starting the HTTP server ## CLI - [x] Implement clap CLI flags: `--token`, `--no-token`, `--host`, `--port`, CORS flags @@ -28,6 +30,8 @@ - [x] Update `CLAUDE.md` to keep CLI endpoints in sync with HTTP API changes - [x] Prefix CLI API requests with `/v1` - [x] Add CLI credentials extractor subcommand +- [x] Move daemon startup to `server` subcommand +- [x] Add `sandbox-daemon` CLI alias ## HTTP API Endpoints - [x] POST `/agents/{}/install` with `reinstall` handling @@ -46,6 +50,7 @@ - [x] Implement install/version/spawn basics for Claude/Codex/OpenCode/Amp - [x] Implement agent install URL patterns + platform mappings for supported OS/arch - [x] Parse JSONL output for subprocess agents and extract session/result metadata +- [x] Migrate Codex subprocess to App Server JSON-RPC protocol - [x] Map permissionMode to agent CLI flags (Claude/Codex/Amp) - [x] Implement session resume flags for Claude/OpenCode/Amp (Codex unsupported) - [x] Replace sandbox-agent core agent modules with new agent-management crate (delete originals) @@ -74,6 +79,7 @@ - [ ] Add tests for question/permission flows using deterministic prompts - [x] Add HTTP/SSE snapshot tests for real agents (env-configured) - [x] Add snapshot coverage for auth, CORS, and concurrent sessions +- [x] Add inspector UI route test ## Frontend (frontend/packages/inspector) - [x] Build Vite + React app with connect screen (endpoint + optional token) @@ -81,6 +87,7 @@ - [x] Implement full agent UI covering all features - [x] Add HTTP request log with copyable curl command - [x] Add Content-Type header to CORS callout command +- [x] Default inspector endpoint to current origin and auto-connect via health check ## TypeScript SDK - [x] Generate OpenAPI from utoipa and run `openapi-typescript`