feat: add Claude adapter improvements for HITL support (#30)

* feat: add Claude adapter improvements for HITL support

- Add question and permission handling for Claude sessions
- Add Claude sender channel for interactive communication
- Add stream event and control request handling
- Update agent compatibility documentation

* fix: restore Claude HITL streaming input and permission handling

- Add streaming_input field to SpawnOptions for Claude stdin streaming
- Enable --input-format stream-json, --permission-prompt-tool stdio flags
- Pipe stdin for Claude (not just Codex) in spawn_streaming
- Update Claude capabilities: permissions, questions, tool_calls, tool_results, streaming_deltas
- Fix permission mode normalization to respect user's choice instead of forcing bypass
- Add acceptEdits permission mode support
- Add libc dependency for is_running_as_root check
This commit is contained in:
Nathan Flurry 2026-01-29 07:19:10 -08:00 committed by GitHub
parent c7d6482fd4
commit 0ee60920c8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 513 additions and 67 deletions

View file

@ -36,6 +36,9 @@ tracing-subscriber.workspace = true
include_dir.workspace = true
tempfile = { workspace = true, optional = true }
[target.'cfg(unix)'.dependencies]
libc = "0.2"
[dev-dependencies]
http-body-util.workspace = true
insta.workspace = true

View file

@ -265,6 +265,7 @@ struct SessionState {
broadcaster: broadcast::Sender<UniversalEvent>,
opencode_stream_started: bool,
codex_sender: Option<mpsc::UnboundedSender<String>>,
claude_sender: Option<mpsc::UnboundedSender<String>>,
session_started_emitted: bool,
last_claude_message_id: Option<String>,
claude_message_counter: u64,
@ -322,6 +323,7 @@ impl SessionState {
broadcaster,
opencode_stream_started: false,
codex_sender: None,
claude_sender: None,
session_started_emitted: false,
last_claude_message_id: None,
claude_message_counter: 0,
@ -382,6 +384,15 @@ impl SessionState {
self.codex_sender.clone()
}
fn set_claude_sender(&mut self, sender: Option<mpsc::UnboundedSender<String>>) {
self.claude_sender = sender;
}
#[allow(dead_code)]
fn claude_sender(&self) -> Option<mpsc::UnboundedSender<String>> {
self.claude_sender.clone()
}
fn normalize_conversion(&mut self, mut conversion: EventConversion) -> Vec<EventConversion> {
if self.native_session_id.is_none() && conversion.native_session_id.is_some() {
self.native_session_id = conversion.native_session_id.clone();
@ -1621,6 +1632,11 @@ impl SessionManager {
let manager = self.agent_manager.clone();
let prompt = message;
let initial_input = if session_snapshot.agent == AgentId::Claude {
Some(claude_user_message_line(&session_snapshot, &prompt))
} else {
None
};
let credentials = tokio::task::spawn_blocking(move || {
let options = CredentialExtractionOptions::new();
extract_all_credentials(&options)
@ -1630,7 +1646,7 @@ impl SessionManager {
message: err.to_string(),
})?;
let spawn_options = build_spawn_options(&session_snapshot, prompt, credentials);
let spawn_options = build_spawn_options(&session_snapshot, prompt.clone(), credentials);
let agent_id = session_snapshot.agent;
let spawn_result =
tokio::task::spawn_blocking(move || manager.spawn_streaming(agent_id, spawn_options))
@ -1649,7 +1665,7 @@ impl SessionManager {
let manager = Arc::clone(self);
tokio::spawn(async move {
manager
.consume_spawn(session_id, agent_id, spawn_result)
.consume_spawn(session_id, agent_id, spawn_result, initial_input)
.await;
});
@ -1847,7 +1863,7 @@ impl SessionManager {
question_id: &str,
answers: Vec<Vec<String>>,
) -> Result<(), SandboxError> {
let (agent, native_session_id, pending_question) = {
let (agent, native_session_id, pending_question, claude_sender) = {
let mut sessions = self.sessions.lock().await;
let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| {
SandboxError::SessionNotFound {
@ -1863,7 +1879,12 @@ impl SessionManager {
if let Some(err) = session.ended_error() {
return Err(err);
}
(session.agent, session.native_session_id.clone(), pending)
(
session.agent,
session.native_session_id.clone(),
pending,
session.claude_sender(),
)
};
let response = answers.first().and_then(|inner| inner.first()).cloned();
@ -1877,6 +1898,16 @@ impl SessionManager {
})?;
self.opencode_question_reply(&agent_session_id, question_id, answers)
.await?;
} else if agent == AgentId::Claude {
let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(),
})?;
let session_id = native_session_id.clone().unwrap_or_else(|| session_id.to_string());
let response_text = response.clone().unwrap_or_default();
let line = claude_tool_result_line(&session_id, question_id, &response_text, false);
sender.send(line).map_err(|_| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(),
})?;
} else {
// TODO: Forward question replies to subprocess agents.
}
@ -1905,7 +1936,7 @@ impl SessionManager {
session_id: &str,
question_id: &str,
) -> Result<(), SandboxError> {
let (agent, native_session_id, pending_question) = {
let (agent, native_session_id, pending_question, claude_sender) = {
let mut sessions = self.sessions.lock().await;
let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| {
SandboxError::SessionNotFound {
@ -1921,7 +1952,12 @@ impl SessionManager {
if let Some(err) = session.ended_error() {
return Err(err);
}
(session.agent, session.native_session_id.clone(), pending)
(
session.agent,
session.native_session_id.clone(),
pending,
session.claude_sender(),
)
};
if agent == AgentId::Opencode {
@ -1933,6 +1969,20 @@ impl SessionManager {
})?;
self.opencode_question_reject(&agent_session_id, question_id)
.await?;
} else if agent == AgentId::Claude {
let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(),
})?;
let session_id = native_session_id.clone().unwrap_or_else(|| session_id.to_string());
let line = claude_tool_result_line(
&session_id,
question_id,
"User rejected the question.",
true,
);
sender.send(line).map_err(|_| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(),
})?;
} else {
// TODO: Forward question rejections to subprocess agents.
}
@ -1963,7 +2013,7 @@ impl SessionManager {
reply: PermissionReply,
) -> Result<(), SandboxError> {
let reply_for_status = reply.clone();
let (agent, native_session_id, pending_permission) = {
let (agent, native_session_id, pending_permission, claude_sender) = {
let mut sessions = self.sessions.lock().await;
let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| {
SandboxError::SessionNotFound {
@ -1983,6 +2033,7 @@ impl SessionManager {
session.agent,
session.native_session_id.clone(),
pending,
session.claude_sender(),
)
};
@ -2050,6 +2101,44 @@ impl SessionManager {
})?;
self.opencode_permission_reply(&agent_session_id, permission_id, reply.clone())
.await?;
} else if agent == AgentId::Claude {
let sender = claude_sender.ok_or_else(|| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(),
})?;
let metadata = pending_permission
.as_ref()
.and_then(|pending| pending.metadata.as_ref())
.and_then(Value::as_object);
let updated_input = metadata
.and_then(|map| map.get("input"))
.cloned()
.unwrap_or(Value::Null);
let mut response_map = serde_json::Map::new();
match reply {
PermissionReply::Reject => {
response_map.insert(
"message".to_string(),
Value::String("Permission denied.".to_string()),
);
}
PermissionReply::Once | PermissionReply::Always => {
if !updated_input.is_null() {
response_map.insert("updatedInput".to_string(), updated_input);
}
}
}
let response_value = Value::Object(response_map);
let behavior = match reply {
PermissionReply::Reject => "deny",
PermissionReply::Once | PermissionReply::Always => "allow",
};
let line = claude_control_response_line(permission_id, behavior, response_value);
sender.send(line).map_err(|_| SandboxError::InvalidRequest {
message: "Claude session is not active".to_string(),
})?;
} else {
// TODO: Forward permission replies to subprocess agents.
}
@ -2151,6 +2240,7 @@ impl SessionManager {
session_id: String,
agent: AgentId,
spawn: StreamingSpawn,
initial_input: Option<String>,
) {
let StreamingSpawn {
mut child,
@ -2197,6 +2287,22 @@ impl SessionManager {
if let (Some(state), Some(sender)) = (codex_state.as_mut(), codex_sender.as_ref()) {
state.start(sender);
}
} else if agent == AgentId::Claude {
if let Some(stdin) = stdin {
let (writer_tx, writer_rx) = mpsc::unbounded_channel::<String>();
{
let mut sessions = self.sessions.lock().await;
if let Some(session) = Self::session_mut(&mut sessions, &session_id) {
session.set_claude_sender(Some(writer_tx.clone()));
}
}
if let Some(initial) = initial_input {
let _ = writer_tx.send(initial);
}
tokio::task::spawn_blocking(move || {
write_lines(stdin, writer_rx);
});
}
}
while let Some(line) = rx.recv().await {
@ -2214,6 +2320,14 @@ impl SessionManager {
}
}
} else if agent == AgentId::Claude {
if let Ok(value) = serde_json::from_str::<Value>(&line) {
if value.get("type").and_then(Value::as_str) == Some("result") {
let mut sessions = self.sessions.lock().await;
if let Some(session) = Self::session_mut(&mut sessions, &session_id) {
session.set_claude_sender(None);
}
}
}
let conversions = self.parse_claude_line(&line, &session_id).await;
if !conversions.is_empty() {
let _ = self.record_conversions(&session_id, conversions).await;
@ -2231,6 +2345,11 @@ impl SessionManager {
if let Some(session) = Self::session_mut(&mut sessions, &session_id) {
session.set_codex_sender(None);
}
} else if agent == AgentId::Claude {
let mut sessions = self.sessions.lock().await;
if let Some(session) = Self::session_mut(&mut sessions, &session_id) {
session.set_claude_sender(None);
}
}
if terminate_early {
@ -3779,14 +3898,14 @@ fn agent_supports_item_started(agent: AgentId) -> bool {
fn agent_capabilities_for(agent: AgentId) -> AgentCapabilities {
match agent {
// Headless Claude CLI does not expose AskUserQuestion and does not emit tool_result,
// so we keep these capabilities off until we switch to an SDK-backed wrapper.
// Claude CLI supports tool calls/results and permission prompts via the SDK control protocol,
// but we still emit synthetic item.started events.
AgentId::Claude => AgentCapabilities {
plan_mode: false,
permissions: false,
questions: false,
tool_calls: false,
tool_results: false,
permissions: true,
questions: true,
tool_calls: true,
tool_results: true,
text_messages: true,
images: false,
file_attachments: false,
@ -3797,7 +3916,7 @@ fn agent_capabilities_for(agent: AgentId) -> AgentCapabilities {
command_execution: false,
file_changes: false,
mcp_tools: false,
streaming_deltas: false,
streaming_deltas: true,
item_started: false,
shared_process: false, // per-turn subprocess with --resume
},
@ -3990,12 +4109,24 @@ fn normalize_agent_mode(agent: AgentId, agent_mode: Option<&str>) -> Result<Stri
}
}
/// Check if the current process is running as root (uid 0)
fn is_running_as_root() -> bool {
#[cfg(unix)]
{
unsafe { libc::getuid() == 0 }
}
#[cfg(not(unix))]
{
false
}
}
fn normalize_permission_mode(
agent: AgentId,
permission_mode: Option<&str>,
) -> Result<String, SandboxError> {
let mode = match permission_mode.unwrap_or("default") {
"default" | "plan" | "bypass" => permission_mode.unwrap_or("default"),
"default" | "plan" | "bypass" | "acceptEdits" => permission_mode.unwrap_or("default"),
value => {
return Err(SandboxError::InvalidRequest {
message: format!("invalid permission mode: {value}"),
@ -4004,14 +4135,20 @@ fn normalize_permission_mode(
}
};
if agent == AgentId::Claude {
if mode == "plan" {
return Err(SandboxError::ModeNotSupported {
agent: agent.as_str().to_string(),
mode: mode.to_string(),
// Claude refuses --dangerously-skip-permissions when running as root,
// which is common in container environments (Docker, Daytona, E2B).
// Return an error if user explicitly requests bypass while running as root.
if mode == "bypass" && is_running_as_root() {
return Err(SandboxError::InvalidRequest {
message: "permission mode 'bypass' is not supported when running as root (Claude refuses --dangerously-skip-permissions with root privileges)".to_string(),
}
.into());
}
return Ok("bypass".to_string());
// Pass through bypass/acceptEdits/plan if explicitly requested, otherwise use default
if mode == "bypass" || mode == "acceptEdits" || mode == "plan" {
return Ok(mode.to_string());
}
return Ok("default".to_string());
}
let supported = match agent {
AgentId::Claude => false,
@ -4117,6 +4254,87 @@ fn build_spawn_options(
options
}
fn claude_input_session_id(session: &SessionSnapshot) -> String {
session
.native_session_id
.clone()
.unwrap_or_else(|| session.session_id.clone())
}
fn claude_user_message_line(session: &SessionSnapshot, message: &str) -> String {
let session_id = claude_input_session_id(session);
serde_json::json!({
"type": "user",
"message": {
"role": "user",
"content": message,
},
"parent_tool_use_id": null,
"session_id": session_id,
})
.to_string()
}
fn claude_tool_result_line(
session_id: &str,
tool_use_id: &str,
content: &str,
is_error: bool,
) -> String {
serde_json::json!({
"type": "user",
"message": {
"role": "user",
"content": [{
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": content,
"is_error": is_error,
}],
},
"parent_tool_use_id": null,
"session_id": session_id,
})
.to_string()
}
fn claude_control_response_line(
request_id: &str,
behavior: &str,
response: Value,
) -> String {
let mut response_obj = serde_json::Map::new();
response_obj.insert(
"behavior".to_string(),
Value::String(behavior.to_string()),
);
if let Some(message) = response.get("message") {
response_obj.insert("message".to_string(), message.clone());
}
if let Some(updated_input) = response.get("updatedInput") {
response_obj.insert("updatedInput".to_string(), updated_input.clone());
}
if let Some(updated_permissions) = response.get("updatedPermissions") {
response_obj.insert(
"updatedPermissions".to_string(),
updated_permissions.clone(),
);
}
if let Some(interrupt) = response.get("interrupt") {
response_obj.insert("interrupt".to_string(), interrupt.clone());
}
serde_json::json!({
"type": "control_response",
"response": {
"subtype": "success",
"request_id": request_id,
"response": Value::Object(response_obj),
}
})
.to_string()
}
fn read_lines<R: std::io::Read>(reader: R, sender: mpsc::UnboundedSender<String>) {
let mut reader = BufReader::new(reader);
let mut line = String::new();