diff --git a/docs/agent-compatibility.mdx b/docs/agent-compatibility.mdx index 3f38bd8..e4e37be 100644 --- a/docs/agent-compatibility.mdx +++ b/docs/agent-compatibility.mdx @@ -8,14 +8,14 @@ The universal API normalizes different coding agents into a consistent interface ## Feature Matrix -| Feature | [Claude Code](https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview)* | [Codex](https://github.com/openai/codex) | [OpenCode](https://github.com/opencode-ai/opencode) | [Amp](https://ampcode.com) | +| Feature | [Claude Code](https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview) | [Codex](https://github.com/openai/codex) | [OpenCode](https://github.com/opencode-ai/opencode) | [Amp](https://ampcode.com) | |---------|:-----------:|:-----:|:--------:|:---:| | Stability | Stable | Stable | Experimental | Experimental | | Text Messages | ✓ | ✓ | ✓ | ✓ | -| Tool Calls | —* | ✓ | ✓ | ✓ | -| Tool Results | —* | ✓ | ✓ | ✓ | -| Questions (HITL) | —* | | ✓ | | -| Permissions (HITL) | —* | | ✓ | | +| Tool Calls | ✓ | ✓ | ✓ | ✓ | +| Tool Results | ✓ | ✓ | ✓ | ✓ | +| Questions (HITL) | ✓ | | ✓ | | +| Permissions (HITL) | ✓ | | ✓ | | | Images | | ✓ | ✓ | | | File Attachments | | ✓ | ✓ | | | Session Lifecycle | | ✓ | ✓ | | @@ -24,9 +24,7 @@ The universal API normalizes different coding agents into a consistent interface | Command Execution | | ✓ | | | | File Changes | | ✓ | | | | MCP Tools | | ✓ | | | -| Streaming Deltas | | ✓ | ✓ | | - -\* Coming imminently +| Streaming Deltas | ✓ | ✓ | ✓ | | ## Feature Descriptions diff --git a/docs/conversion.mdx b/docs/conversion.mdx index 97b5792..d155ab4 100644 --- a/docs/conversion.mdx +++ b/docs/conversion.mdx @@ -31,13 +31,13 @@ Events / Message Flow | session.ended | SDKMessage.type=result | no explicit session end (turn/completed) | no explicit session end (session.deleted)| type=done | | message (user) | SDKMessage.type=user | item/completed (ThreadItem.type=userMessage)| message.updated (Message.role=user) | type=message | | message (assistant) | SDKMessage.type=assistant | item/completed (ThreadItem.type=agentMessage)| message.updated (Message.role=assistant)| type=message | -| message.delta | synthetic | method=item/agentMessage/delta | type=message.part.updated (delta) | synthetic | -| tool call | synthetic from tool usage | method=item/mcpToolCall/progress | message.part.updated (part.type=tool) | type=tool_call | -| tool result | synthetic from tool usage | item/completed (tool result ThreadItem variants) | message.part.updated (part.type=tool, state=completed) | type=tool_result | -| permission.requested | none | none | type=permission.asked | none | -| permission.resolved | none | none | type=permission.replied | none | -| question.requested | ExitPlanMode tool (synthetic)| experimental request_user_input (payload) | type=question.asked | none | -| question.resolved | ExitPlanMode reply (synthetic)| experimental request_user_input (payload) | type=question.replied / question.rejected | none | +| message.delta | stream_event (partial) or synthetic | method=item/agentMessage/delta | type=message.part.updated (delta) | synthetic | +| tool call | type=tool_use | method=item/mcpToolCall/progress | message.part.updated (part.type=tool) | type=tool_call | +| tool result | user.message.content.tool_result | item/completed (tool result ThreadItem variants) | message.part.updated (part.type=tool, state=completed) | type=tool_result | +| permission.requested | control_request.can_use_tool | none | type=permission.asked | none | +| permission.resolved | daemon reply to can_use_tool | none | type=permission.replied | none | +| question.requested | tool_use (AskUserQuestion) | experimental request_user_input (payload) | type=question.asked | none | +| question.resolved | tool_result (AskUserQuestion) | experimental request_user_input (payload) | type=question.replied / question.rejected | none | | error | SDKResultMessage.error | method=error | type=session.error (or message error) | type=error | +------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+ @@ -50,10 +50,11 @@ Synthetics | session.ended | When agent emits no explicit end | session.ended event | Mark source=daemon; reason may be inferred | | item_id (Claude) | Claude provides no item IDs | item_id | Maintain provider_item_id map when possible | | user message (Claude) | Claude emits only assistant output | item.completed | Mark source=daemon; preserve raw input in event metadata | -| question events (Claude) | Plan mode ExitPlanMode tool usage | question.requested/resolved | Synthetic mapping from tool call/result | +| question events (Claude) | AskUserQuestion tool usage | question.requested/resolved | Derived from tool_use blocks (source=agent) | | native_session_id (Codex) | Codex uses threadId | native_session_id | Intentionally merged threadId into native_session_id | +------------------------------+------------------------+--------------------------+--------------------------------------------------------------+ -| message.delta (Claude/Amp) | No native deltas | item.delta | Synthetic delta with full message content; source=daemon | +| message.delta (Claude) | No native deltas emitted | item.delta | Synthetic delta with full message content; source=daemon | +| message.delta (Amp) | No native deltas | item.delta | Synthetic delta with full message content; source=daemon | +------------------------------+------------------------+--------------------------+--------------------------------------------------------------+ | message.delta (OpenCode) | part delta before message | item.delta | If part arrives first, create item.started stub then delta | +------------------------------+------------------------+--------------------------+--------------------------------------------------------------+ @@ -62,11 +63,12 @@ Delta handling - Codex emits agent message and other deltas (e.g., item/agentMessage/delta). - OpenCode emits part deltas via message.part.updated with a delta string. -- Claude and Amp do not emit deltas in their schemas. +- Claude can emit stream_event deltas when partial streaming is enabled; Amp does not emit deltas. Policy: - Always emit item.delta across all providers. - For providers without native deltas, emit a single synthetic delta containing the full content prior to item.completed. +- For Claude when partial streaming is enabled, forward native deltas and skip the synthetic full-content delta. - For providers with native deltas, forward as-is; also emit item.completed when final content is known. Message normalization notes diff --git a/research/agents/claude.md b/research/agents/claude.md index 49da983..42a552b 100644 --- a/research/agents/claude.md +++ b/research/agents/claude.md @@ -160,6 +160,36 @@ Claude conflates agent mode and permission mode - `plan` is a permission restric | `plan` | `--permission-mode plan` | Read-only, must ExitPlanMode to execute | | `bypassPermissions` | `--dangerously-skip-permissions` | Skip all permission checks | +### Root Restrictions + +**Claude refuses to run with `--dangerously-skip-permissions` when running as root (uid 0).** + +This is a security measure built into Claude Code. When running as root: +- The CLI outputs: `--dangerously-skip-permissions cannot be used with root/sudo privileges for security reasons` +- The process exits immediately without executing + +This affects container environments (Docker, Daytona, E2B, etc.) which commonly run as root. + +**Workarounds:** +1. Run as a non-root user in the container +2. Use `default` permission mode (but this requires interactive approval) +3. Use `acceptEdits` mode for file operations (still requires Bash approval) + +### Headless Permission Behavior + +When permissions are denied in headless mode (`--print --output-format stream-json`): + +1. Claude emits a `tool_use` event for the tool (e.g., Write, Bash) +2. A `user` event follows with `tool_result` containing `is_error: true` +3. Error message: `"Claude requested permissions to X, but you haven't granted it yet."` +4. Final `result` event includes `permission_denials` array listing all denied tools + +```json +{"type":"assistant","message":{"content":[{"type":"tool_use","name":"Write","input":{...}}]}} +{"type":"user","message":{"content":[{"type":"tool_result","is_error":true,"content":"Claude requested permissions to write to /tmp/test.txt, but you haven't granted it yet."}]}} +{"type":"result","permission_denials":[{"tool_name":"Write","tool_use_id":"...","tool_input":{...}}]} +``` + ### Subagent Types Claude supports spawning subagents via the `Task` tool with `subagent_type`: diff --git a/server/packages/agent-management/src/agents.rs b/server/packages/agent-management/src/agents.rs index 91aa143..65c7051 100644 --- a/server/packages/agent-management/src/agents.rs +++ b/server/packages/agent-management/src/agents.rs @@ -217,7 +217,6 @@ impl AgentManager { match agent { AgentId::Claude => { command - .arg("--print") .arg("--output-format") .arg("stream-json") .arg("--verbose"); @@ -234,9 +233,21 @@ impl AgentManager { Some("bypass") => { command.arg("--dangerously-skip-permissions"); } + Some("acceptEdits") => { + command.arg("--permission-mode").arg("acceptEdits"); + } _ => {} } - command.arg(&options.prompt); + if options.streaming_input { + command + .arg("--input-format") + .arg("stream-json") + .arg("--permission-prompt-tool") + .arg("stdio") + .arg("--include-partial-messages"); + } else { + command.arg("--print").arg("--").arg(&options.prompt); + } } AgentId::Codex => { if options.session_id.is_some() { @@ -305,15 +316,18 @@ impl AgentManager { pub fn spawn_streaming( &self, agent: AgentId, - options: SpawnOptions, + mut options: SpawnOptions, ) -> Result { let codex_options = if agent == AgentId::Codex { Some(options.clone()) } else { None }; + if agent == AgentId::Claude { + options.streaming_input = true; + } let mut command = self.build_command(agent, &options)?; - if agent == AgentId::Codex { + if matches!(agent, AgentId::Codex | AgentId::Claude) { command.stdin(Stdio::piped()); } command.stdout(Stdio::piped()).stderr(Stdio::piped()); @@ -539,7 +553,6 @@ impl AgentManager { match agent { AgentId::Claude => { command - .arg("--print") .arg("--output-format") .arg("stream-json") .arg("--verbose"); @@ -556,9 +569,21 @@ impl AgentManager { Some("bypass") => { command.arg("--dangerously-skip-permissions"); } + Some("acceptEdits") => { + command.arg("--permission-mode").arg("acceptEdits"); + } _ => {} } - command.arg(&options.prompt); + if options.streaming_input { + command + .arg("--input-format") + .arg("stream-json") + .arg("--permission-prompt-tool") + .arg("stdio") + .arg("--include-partial-messages"); + } else { + command.arg(&options.prompt); + } } AgentId::Codex => { if options.session_id.is_some() { @@ -646,6 +671,8 @@ pub struct SpawnOptions { pub session_id: Option, pub working_dir: Option, pub env: HashMap, + /// Use stream-json input via stdin (Claude only). + pub streaming_input: bool, } impl SpawnOptions { @@ -659,6 +686,7 @@ impl SpawnOptions { session_id: None, working_dir: None, env: HashMap::new(), + streaming_input: false, } } } diff --git a/server/packages/sandbox-agent/Cargo.toml b/server/packages/sandbox-agent/Cargo.toml index e6a9375..028362a 100644 --- a/server/packages/sandbox-agent/Cargo.toml +++ b/server/packages/sandbox-agent/Cargo.toml @@ -36,6 +36,9 @@ tracing-subscriber.workspace = true include_dir.workspace = true tempfile = { workspace = true, optional = true } +[target.'cfg(unix)'.dependencies] +libc = "0.2" + [dev-dependencies] http-body-util.workspace = true insta.workspace = true diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index c98388e..36feb4c 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -265,6 +265,7 @@ struct SessionState { broadcaster: broadcast::Sender, opencode_stream_started: bool, codex_sender: Option>, + claude_sender: Option>, session_started_emitted: bool, last_claude_message_id: Option, claude_message_counter: u64, @@ -322,6 +323,7 @@ impl SessionState { broadcaster, opencode_stream_started: false, codex_sender: None, + claude_sender: None, session_started_emitted: false, last_claude_message_id: None, claude_message_counter: 0, @@ -382,6 +384,15 @@ impl SessionState { self.codex_sender.clone() } + fn set_claude_sender(&mut self, sender: Option>) { + self.claude_sender = sender; + } + + #[allow(dead_code)] + fn claude_sender(&self) -> Option> { + self.claude_sender.clone() + } + fn normalize_conversion(&mut self, mut conversion: EventConversion) -> Vec { if self.native_session_id.is_none() && conversion.native_session_id.is_some() { self.native_session_id = conversion.native_session_id.clone(); @@ -1621,6 +1632,11 @@ impl SessionManager { let manager = self.agent_manager.clone(); let prompt = message; + let initial_input = if session_snapshot.agent == AgentId::Claude { + Some(claude_user_message_line(&session_snapshot, &prompt)) + } else { + None + }; let credentials = tokio::task::spawn_blocking(move || { let options = CredentialExtractionOptions::new(); extract_all_credentials(&options) @@ -1630,7 +1646,7 @@ impl SessionManager { message: err.to_string(), })?; - let spawn_options = build_spawn_options(&session_snapshot, prompt, credentials); + let spawn_options = build_spawn_options(&session_snapshot, prompt.clone(), credentials); let agent_id = session_snapshot.agent; let spawn_result = tokio::task::spawn_blocking(move || manager.spawn_streaming(agent_id, spawn_options)) @@ -1649,7 +1665,7 @@ impl SessionManager { let manager = Arc::clone(self); tokio::spawn(async move { manager - .consume_spawn(session_id, agent_id, spawn_result) + .consume_spawn(session_id, agent_id, spawn_result, initial_input) .await; }); @@ -1847,7 +1863,7 @@ impl SessionManager { question_id: &str, answers: Vec>, ) -> Result<(), SandboxError> { - let (agent, native_session_id, pending_question) = { + let (agent, native_session_id, pending_question, claude_sender) = { let mut sessions = self.sessions.lock().await; let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| { SandboxError::SessionNotFound { @@ -1863,7 +1879,12 @@ impl SessionManager { if let Some(err) = session.ended_error() { return Err(err); } - (session.agent, session.native_session_id.clone(), pending) + ( + session.agent, + session.native_session_id.clone(), + pending, + session.claude_sender(), + ) }; let response = answers.first().and_then(|inner| inner.first()).cloned(); @@ -1877,6 +1898,16 @@ impl SessionManager { })?; self.opencode_question_reply(&agent_session_id, question_id, answers) .await?; + } else if agent == AgentId::Claude { + let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest { + message: "Claude session is not active".to_string(), + })?; + let session_id = native_session_id.clone().unwrap_or_else(|| session_id.to_string()); + let response_text = response.clone().unwrap_or_default(); + let line = claude_tool_result_line(&session_id, question_id, &response_text, false); + sender.send(line).map_err(|_| SandboxError::InvalidRequest { + message: "Claude session is not active".to_string(), + })?; } else { // TODO: Forward question replies to subprocess agents. } @@ -1905,7 +1936,7 @@ impl SessionManager { session_id: &str, question_id: &str, ) -> Result<(), SandboxError> { - let (agent, native_session_id, pending_question) = { + let (agent, native_session_id, pending_question, claude_sender) = { let mut sessions = self.sessions.lock().await; let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| { SandboxError::SessionNotFound { @@ -1921,7 +1952,12 @@ impl SessionManager { if let Some(err) = session.ended_error() { return Err(err); } - (session.agent, session.native_session_id.clone(), pending) + ( + session.agent, + session.native_session_id.clone(), + pending, + session.claude_sender(), + ) }; if agent == AgentId::Opencode { @@ -1933,6 +1969,20 @@ impl SessionManager { })?; self.opencode_question_reject(&agent_session_id, question_id) .await?; + } else if agent == AgentId::Claude { + let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest { + message: "Claude session is not active".to_string(), + })?; + let session_id = native_session_id.clone().unwrap_or_else(|| session_id.to_string()); + let line = claude_tool_result_line( + &session_id, + question_id, + "User rejected the question.", + true, + ); + sender.send(line).map_err(|_| SandboxError::InvalidRequest { + message: "Claude session is not active".to_string(), + })?; } else { // TODO: Forward question rejections to subprocess agents. } @@ -1963,7 +2013,7 @@ impl SessionManager { reply: PermissionReply, ) -> Result<(), SandboxError> { let reply_for_status = reply.clone(); - let (agent, native_session_id, pending_permission) = { + let (agent, native_session_id, pending_permission, claude_sender) = { let mut sessions = self.sessions.lock().await; let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| { SandboxError::SessionNotFound { @@ -1983,6 +2033,7 @@ impl SessionManager { session.agent, session.native_session_id.clone(), pending, + session.claude_sender(), ) }; @@ -2050,6 +2101,44 @@ impl SessionManager { })?; self.opencode_permission_reply(&agent_session_id, permission_id, reply.clone()) .await?; + } else if agent == AgentId::Claude { + let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest { + message: "Claude session is not active".to_string(), + })?; + let metadata = pending_permission + .as_ref() + .and_then(|pending| pending.metadata.as_ref()) + .and_then(Value::as_object); + let updated_input = metadata + .and_then(|map| map.get("input")) + .cloned() + .unwrap_or(Value::Null); + + let mut response_map = serde_json::Map::new(); + match reply { + PermissionReply::Reject => { + response_map.insert( + "message".to_string(), + Value::String("Permission denied.".to_string()), + ); + } + PermissionReply::Once | PermissionReply::Always => { + if !updated_input.is_null() { + response_map.insert("updatedInput".to_string(), updated_input); + } + } + } + let response_value = Value::Object(response_map); + + let behavior = match reply { + PermissionReply::Reject => "deny", + PermissionReply::Once | PermissionReply::Always => "allow", + }; + + let line = claude_control_response_line(permission_id, behavior, response_value); + sender.send(line).map_err(|_| SandboxError::InvalidRequest { + message: "Claude session is not active".to_string(), + })?; } else { // TODO: Forward permission replies to subprocess agents. } @@ -2151,6 +2240,7 @@ impl SessionManager { session_id: String, agent: AgentId, spawn: StreamingSpawn, + initial_input: Option, ) { let StreamingSpawn { mut child, @@ -2197,6 +2287,22 @@ impl SessionManager { if let (Some(state), Some(sender)) = (codex_state.as_mut(), codex_sender.as_ref()) { state.start(sender); } + } else if agent == AgentId::Claude { + if let Some(stdin) = stdin { + let (writer_tx, writer_rx) = mpsc::unbounded_channel::(); + { + let mut sessions = self.sessions.lock().await; + if let Some(session) = Self::session_mut(&mut sessions, &session_id) { + session.set_claude_sender(Some(writer_tx.clone())); + } + } + if let Some(initial) = initial_input { + let _ = writer_tx.send(initial); + } + tokio::task::spawn_blocking(move || { + write_lines(stdin, writer_rx); + }); + } } while let Some(line) = rx.recv().await { @@ -2214,6 +2320,14 @@ impl SessionManager { } } } else if agent == AgentId::Claude { + if let Ok(value) = serde_json::from_str::(&line) { + if value.get("type").and_then(Value::as_str) == Some("result") { + let mut sessions = self.sessions.lock().await; + if let Some(session) = Self::session_mut(&mut sessions, &session_id) { + session.set_claude_sender(None); + } + } + } let conversions = self.parse_claude_line(&line, &session_id).await; if !conversions.is_empty() { let _ = self.record_conversions(&session_id, conversions).await; @@ -2231,6 +2345,11 @@ impl SessionManager { if let Some(session) = Self::session_mut(&mut sessions, &session_id) { session.set_codex_sender(None); } + } else if agent == AgentId::Claude { + let mut sessions = self.sessions.lock().await; + if let Some(session) = Self::session_mut(&mut sessions, &session_id) { + session.set_claude_sender(None); + } } if terminate_early { @@ -3779,14 +3898,14 @@ fn agent_supports_item_started(agent: AgentId) -> bool { fn agent_capabilities_for(agent: AgentId) -> AgentCapabilities { match agent { - // Headless Claude CLI does not expose AskUserQuestion and does not emit tool_result, - // so we keep these capabilities off until we switch to an SDK-backed wrapper. + // Claude CLI supports tool calls/results and permission prompts via the SDK control protocol, + // but we still emit synthetic item.started events. AgentId::Claude => AgentCapabilities { plan_mode: false, - permissions: false, - questions: false, - tool_calls: false, - tool_results: false, + permissions: true, + questions: true, + tool_calls: true, + tool_results: true, text_messages: true, images: false, file_attachments: false, @@ -3797,7 +3916,7 @@ fn agent_capabilities_for(agent: AgentId) -> AgentCapabilities { command_execution: false, file_changes: false, mcp_tools: false, - streaming_deltas: false, + streaming_deltas: true, item_started: false, shared_process: false, // per-turn subprocess with --resume }, @@ -3990,12 +4109,24 @@ fn normalize_agent_mode(agent: AgentId, agent_mode: Option<&str>) -> Result bool { + #[cfg(unix)] + { + unsafe { libc::getuid() == 0 } + } + #[cfg(not(unix))] + { + false + } +} + fn normalize_permission_mode( agent: AgentId, permission_mode: Option<&str>, ) -> Result { let mode = match permission_mode.unwrap_or("default") { - "default" | "plan" | "bypass" => permission_mode.unwrap_or("default"), + "default" | "plan" | "bypass" | "acceptEdits" => permission_mode.unwrap_or("default"), value => { return Err(SandboxError::InvalidRequest { message: format!("invalid permission mode: {value}"), @@ -4004,14 +4135,20 @@ fn normalize_permission_mode( } }; if agent == AgentId::Claude { - if mode == "plan" { - return Err(SandboxError::ModeNotSupported { - agent: agent.as_str().to_string(), - mode: mode.to_string(), + // Claude refuses --dangerously-skip-permissions when running as root, + // which is common in container environments (Docker, Daytona, E2B). + // Return an error if user explicitly requests bypass while running as root. + if mode == "bypass" && is_running_as_root() { + return Err(SandboxError::InvalidRequest { + message: "permission mode 'bypass' is not supported when running as root (Claude refuses --dangerously-skip-permissions with root privileges)".to_string(), } .into()); } - return Ok("bypass".to_string()); + // Pass through bypass/acceptEdits/plan if explicitly requested, otherwise use default + if mode == "bypass" || mode == "acceptEdits" || mode == "plan" { + return Ok(mode.to_string()); + } + return Ok("default".to_string()); } let supported = match agent { AgentId::Claude => false, @@ -4117,6 +4254,87 @@ fn build_spawn_options( options } +fn claude_input_session_id(session: &SessionSnapshot) -> String { + session + .native_session_id + .clone() + .unwrap_or_else(|| session.session_id.clone()) +} + +fn claude_user_message_line(session: &SessionSnapshot, message: &str) -> String { + let session_id = claude_input_session_id(session); + serde_json::json!({ + "type": "user", + "message": { + "role": "user", + "content": message, + }, + "parent_tool_use_id": null, + "session_id": session_id, + }) + .to_string() +} + +fn claude_tool_result_line( + session_id: &str, + tool_use_id: &str, + content: &str, + is_error: bool, +) -> String { + serde_json::json!({ + "type": "user", + "message": { + "role": "user", + "content": [{ + "type": "tool_result", + "tool_use_id": tool_use_id, + "content": content, + "is_error": is_error, + }], + }, + "parent_tool_use_id": null, + "session_id": session_id, + }) + .to_string() +} + +fn claude_control_response_line( + request_id: &str, + behavior: &str, + response: Value, +) -> String { + let mut response_obj = serde_json::Map::new(); + response_obj.insert( + "behavior".to_string(), + Value::String(behavior.to_string()), + ); + if let Some(message) = response.get("message") { + response_obj.insert("message".to_string(), message.clone()); + } + if let Some(updated_input) = response.get("updatedInput") { + response_obj.insert("updatedInput".to_string(), updated_input.clone()); + } + if let Some(updated_permissions) = response.get("updatedPermissions") { + response_obj.insert( + "updatedPermissions".to_string(), + updated_permissions.clone(), + ); + } + if let Some(interrupt) = response.get("interrupt") { + response_obj.insert("interrupt".to_string(), interrupt.clone()); + } + + serde_json::json!({ + "type": "control_response", + "response": { + "subtype": "success", + "request_id": request_id, + "response": Value::Object(response_obj), + } + }) + .to_string() +} + fn read_lines(reader: R, sender: mpsc::UnboundedSender) { let mut reader = BufReader::new(reader); let mut line = String::new(); diff --git a/server/packages/universal-agent-schema/src/agents/claude.rs b/server/packages/universal-agent-schema/src/agents/claude.rs index 0d085b4..3a62285 100644 --- a/server/packages/universal-agent-schema/src/agents/claude.rs +++ b/server/packages/universal-agent-schema/src/agents/claude.rs @@ -6,9 +6,12 @@ use crate::{ ContentPart, EventConversion, ItemEventData, + ItemDeltaData, ItemKind, ItemRole, ItemStatus, + PermissionEventData, + PermissionStatus, QuestionEventData, QuestionStatus, SessionStartedData, @@ -31,11 +34,14 @@ pub fn event_to_universal_with_session( let event_type = event.get("type").and_then(Value::as_str).unwrap_or(""); let mut conversions = match event_type { "system" => vec![system_event_to_universal(event)], - "user" => Vec::new(), + "user" => user_event_to_universal(event), "assistant" => assistant_event_to_universal(event, &session_id), "tool_use" => tool_use_event_to_universal(event, &session_id), "tool_result" => tool_result_event_to_universal(event), "result" => result_event_to_universal(event, &session_id), + "stream_event" => stream_event_to_universal(event), + "control_request" => control_request_to_universal(event)?, + "control_response" | "keep_alive" | "update_environment_variables" => Vec::new(), _ => return Err(format!("unsupported Claude event type: {event_type}")), }; @@ -85,6 +91,44 @@ fn assistant_event_to_universal(event: &Value, session_id: &str) -> Vec Vec Vec { + let mut conversions = Vec::new(); + let content = event + .get("message") + .and_then(|msg| msg.get("content")) + .and_then(Value::as_array) + .cloned() + .unwrap_or_default(); + + for block in content { + let block_type = block.get("type").and_then(Value::as_str).unwrap_or(""); + if block_type != "tool_result" { + continue; + } + + let tool_use_id = block + .get("tool_use_id") + .or_else(|| block.get("toolUseId")) + .and_then(Value::as_str) + .map(|s| s.to_string()) + .unwrap_or_else(|| next_temp_id("tmp_claude_tool")); + let output = block.get("content").cloned().unwrap_or(Value::Null); + let output_text = serde_json::to_string(&output).unwrap_or_default(); + + let tool_item = UniversalItem { + item_id: next_temp_id("tmp_claude_tool_result"), + native_item_id: Some(tool_use_id.clone()), + parent_id: None, + kind: ItemKind::ToolResult, + role: Some(ItemRole::Tool), + content: vec![ContentPart::ToolResult { + call_id: tool_use_id, + output: output_text, + }], + status: ItemStatus::Completed, + }; + conversions.extend(item_events(tool_item, true)); + } + conversions } @@ -141,10 +227,14 @@ fn tool_use_event_to_universal(event: &Value, session_id: &str) -> Vec Vec Vec { conversions } +fn stream_event_to_universal(event: &Value) -> Vec { + let mut conversions = Vec::new(); + let Some(raw_event) = event.get("event").and_then(Value::as_object) else { + return conversions; + }; + let event_type = raw_event.get("type").and_then(Value::as_str).unwrap_or(""); + if event_type != "content_block_delta" { + return conversions; + } + let delta_text = raw_event + .get("delta") + .and_then(|delta| delta.get("text")) + .and_then(Value::as_str) + .unwrap_or(""); + if delta_text.is_empty() { + return conversions; + } + + conversions.push(EventConversion::new( + UniversalEventType::ItemDelta, + UniversalEventData::ItemDelta(ItemDeltaData { + item_id: String::new(), + native_item_id: None, + delta: delta_text.to_string(), + }), + )); + + conversions +} + +fn control_request_to_universal(event: &Value) -> Result, String> { + let request_id = event + .get("request_id") + .and_then(Value::as_str) + .ok_or_else(|| "missing request_id".to_string())?; + let request = event + .get("request") + .and_then(Value::as_object) + .ok_or_else(|| "missing request".to_string())?; + let subtype = request + .get("subtype") + .and_then(Value::as_str) + .unwrap_or(""); + + if subtype != "can_use_tool" { + return Err(format!("unsupported Claude control_request subtype: {subtype}")); + } + + let tool_name = request + .get("tool_name") + .and_then(Value::as_str) + .unwrap_or("unknown"); + let input = request.get("input").cloned().unwrap_or(Value::Null); + let permission_suggestions = request + .get("permission_suggestions") + .cloned() + .unwrap_or(Value::Null); + let blocked_path = request + .get("blocked_path") + .cloned() + .unwrap_or(Value::Null); + + let metadata = serde_json::json!({ + "toolName": tool_name, + "input": input, + "permissionSuggestions": permission_suggestions, + "blockedPath": blocked_path, + }); + + let permission = PermissionEventData { + permission_id: request_id.to_string(), + action: tool_name.to_string(), + status: PermissionStatus::Requested, + metadata: Some(metadata), + }; + + Ok(vec![EventConversion::new( + UniversalEventType::PermissionRequested, + UniversalEventData::Permission(permission), + )]) +} + fn result_event_to_universal(event: &Value, session_id: &str) -> Vec { // The `result` event completes the message started by `assistant`. // Use the same native_item_id so they link to the same universal item. @@ -285,7 +471,7 @@ fn item_events(item: UniversalItem, synthetic_start: bool) -> Vec) -> Vec { +fn message_started_events(item: UniversalItem) -> Vec { let mut events = Vec::new(); // Emit item.started (in-progress) @@ -293,25 +479,6 @@ fn message_started_events(item: UniversalItem, parts: Vec) -> Vec