diff --git a/docs/conversion.mdx b/docs/conversion.mdx index 8fcd38c..ac728c6 100644 --- a/docs/conversion.mdx +++ b/docs/conversion.mdx @@ -22,6 +22,21 @@ Notes: - opt-in via `include_raw=true` on events endpoints (HTTP + SSE). - If parsing fails, emit agent.unparsed (source=daemon, synthetic=true). Tests must assert zero unparsed events. +Runtime model by agent + +| Agent | Runtime model | Notes | +|---|---|---| +| Claude | Per-message subprocess streaming | Routed through `AgentManager::spawn_streaming` with Claude stream-json stdin. | +| Amp | Per-message subprocess streaming | Routed through `AgentManager::spawn_streaming` with parsed JSONL output. | +| Codex | Shared app-server (stdio JSON-RPC) | One shared server process, daemon sessions map to Codex thread IDs. | +| OpenCode | Shared HTTP server + SSE | One shared HTTP server, daemon sessions map to OpenCode session IDs. | +| Pi | Dedicated per-session RPC process | Canonical path is router-managed Pi runtime (`pi --mode rpc`), one process per daemon session. | + +Pi runtime contract: +- Session/message lifecycle for Pi must stay on router-managed per-session RPC runtime. +- `AgentManager::spawn(Pi)` is kept for one-shot utility/testing flows. +- `AgentManager::spawn_streaming(Pi)` is intentionally unsupported. + Events / Message Flow +------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+----------------------------+ diff --git a/docs/session-transcript-schema.mdx b/docs/session-transcript-schema.mdx index 11d965d..de52162 100644 --- a/docs/session-transcript-schema.mdx +++ b/docs/session-transcript-schema.mdx @@ -29,13 +29,14 @@ This table shows which agent feature coverage appears in the universal event str | File Changes | - | ✓ | - | - | | | MCP Tools | - | ✓ | - | - | | | Streaming Deltas | ✓ | ✓ | ✓ | - | ✓ | -| Variants | | ✓ | ✓ | ✓ | | +| Variants | | ✓ | ✓ | ✓ | ✓ | Agents: [Claude Code](https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview) · [Codex](https://github.com/openai/codex) · [OpenCode](https://github.com/opencode-ai/opencode) · [Amp](https://ampcode.com) · [Pi](https://buildwithpi.ai/pi-cli) - ✓ = Appears in session events - \- = Agent supports natively, schema conversion coming soon - (blank) = Not supported by agent +- Pi runtime model is router-managed per-session RPC (`pi --mode rpc`); it does not use generic subprocess streaming. diff --git a/server/packages/agent-management/src/agents.rs b/server/packages/agent-management/src/agents.rs index d896d39..8d5afa5 100644 --- a/server/packages/agent-management/src/agents.rs +++ b/server/packages/agent-management/src/agents.rs @@ -325,8 +325,17 @@ impl AgentManager { }); } AgentId::Pi => { - return Err(AgentError::UnsupportedAgent { - agent: agent.as_str().to_string(), + let output = spawn_pi(&path, &working_dir, &options)?; + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + let events = parse_jsonl_from_outputs(&stdout, &stderr); + return Ok(SpawnResult { + status: output.status, + stdout, + stderr, + session_id: extract_session_id(agent, &events), + result: extract_result_text(agent, &events), + events, }); } AgentId::Mock => { @@ -359,6 +368,15 @@ impl AgentManager { agent: AgentId, mut options: SpawnOptions, ) -> Result { + // Pi sessions are intentionally handled by the router's dedicated RPC runtime + // (one process per daemon session), not by generic subprocess streaming. + if agent == AgentId::Pi { + return Err(AgentError::UnsupportedRuntimePath { + agent, + operation: "spawn_streaming", + recommended_path: "router-managed per-session RPC runtime", + }); + } let codex_options = if agent == AgentId::Codex { Some(options.clone()) } else { @@ -596,6 +614,19 @@ impl AgentManager { } fn build_command(&self, agent: AgentId, options: &SpawnOptions) -> Result { + if agent == AgentId::Pi { + return Err(AgentError::UnsupportedRuntimePath { + agent, + operation: "build_command", + recommended_path: "router-managed per-session RPC runtime", + }); + } + if agent == AgentId::Mock { + return Err(AgentError::UnsupportedAgent { + agent: agent.as_str().to_string(), + }); + } + let path = self.resolve_binary(agent)?; let working_dir = options .working_dir @@ -665,14 +696,10 @@ impl AgentManager { return Ok(build_amp_command(&path, &working_dir, options)); } AgentId::Pi => { - return Err(AgentError::UnsupportedAgent { - agent: agent.as_str().to_string(), - }); + unreachable!("Pi is handled by router RPC runtime"); } AgentId::Mock => { - return Err(AgentError::UnsupportedAgent { - agent: agent.as_str().to_string(), - }); + unreachable!("Mock is handled above"); } } @@ -789,6 +816,12 @@ pub enum AgentError { ExtractFailed(String), #[error("resume unsupported for {agent}")] ResumeUnsupported { agent: AgentId }, + #[error("unsupported runtime path for {agent}: {operation}; use {recommended_path}")] + UnsupportedRuntimePath { + agent: AgentId, + operation: &'static str, + recommended_path: &'static str, + }, } fn parse_version_output(output: &std::process::Output) -> Option { @@ -990,7 +1023,25 @@ fn extract_session_id(agent: AgentId, events: &[Value]) -> Option { return Some(id); } } - AgentId::Pi => {} + AgentId::Pi => { + if event.get("type").and_then(Value::as_str) == Some("session") { + if let Some(id) = event.get("id").and_then(Value::as_str) { + return Some(id.to_string()); + } + } + if let Some(id) = event.get("session_id").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = event.get("sessionId").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = extract_nested_string(event, &["data", "sessionId"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(event, &["session", "id"]) { + return Some(id); + } + } AgentId::Mock => {} } } @@ -1073,11 +1124,124 @@ fn extract_result_text(agent: AgentId, events: &[Value]) -> Option { Some(buffer) } } - AgentId::Pi => None, + AgentId::Pi => extract_pi_result_text(events), AgentId::Mock => None, } } +fn extract_text_from_content_parts(content: &Value) -> Option { + let parts = content.as_array()?; + let mut text = String::new(); + for part in parts { + if part.get("type").and_then(Value::as_str) != Some("text") { + continue; + } + if let Some(part_text) = part.get("text").and_then(Value::as_str) { + text.push_str(part_text); + } + } + if text.is_empty() { + None + } else { + Some(text) + } +} + +fn extract_assistant_message_text(message: &Value) -> Option { + if message.get("role").and_then(Value::as_str) != Some("assistant") { + return None; + } + if let Some(content) = message.get("content") { + return extract_text_from_content_parts(content); + } + None +} + +fn extract_pi_result_text(events: &[Value]) -> Option { + let mut delta_buffer = String::new(); + let mut last_full = None; + for event in events { + if event.get("type").and_then(Value::as_str) == Some("message_update") { + if let Some(delta_kind) = + extract_nested_string(event, &["assistantMessageEvent", "type"]) + { + if delta_kind == "text_delta" { + if let Some(delta) = + extract_nested_string(event, &["assistantMessageEvent", "delta"]) + { + delta_buffer.push_str(&delta); + } + if let Some(delta) = + extract_nested_string(event, &["assistantMessageEvent", "text"]) + { + delta_buffer.push_str(&delta); + } + } + } + } + if let Some(message) = event.get("message") { + if let Some(text) = extract_assistant_message_text(message) { + last_full = Some(text); + } + } + if event.get("type").and_then(Value::as_str) == Some("agent_end") { + if let Some(messages) = event.get("messages").and_then(Value::as_array) { + for message in messages { + if let Some(text) = extract_assistant_message_text(message) { + last_full = Some(text); + } + } + } + } + } + if delta_buffer.is_empty() { + last_full + } else { + Some(delta_buffer) + } +} + +fn apply_pi_model_args(command: &mut Command, model: Option<&str>) { + let Some(model) = model else { + return; + }; + if let Some((provider, model_id)) = model.split_once('/') { + command + .arg("--provider") + .arg(provider) + .arg("--model") + .arg(model_id); + return; + } + command.arg("--model").arg(model); +} + +fn spawn_pi( + path: &Path, + working_dir: &Path, + options: &SpawnOptions, +) -> Result { + if options.session_id.is_some() { + return Err(AgentError::ResumeUnsupported { agent: AgentId::Pi }); + } + + let mut command = Command::new(path); + command + .current_dir(working_dir) + .arg("--mode") + .arg("json") + .arg("--print"); + apply_pi_model_args(&mut command, options.model.as_deref()); + if let Some(variant) = options.variant.as_deref() { + command.arg("--thinking").arg(variant); + } + command.arg(&options.prompt); + for (key, value) in &options.env { + command.env(key, value); + } + command.output().map_err(AgentError::Io) +} + fn spawn_amp( path: &Path, working_dir: &Path, @@ -1505,3 +1669,87 @@ fn find_file_recursive(dir: &Path, filename: &str) -> Result, Ag } Ok(None) } + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::{ + extract_result_text, extract_session_id, AgentError, AgentId, AgentManager, SpawnOptions, + }; + + #[test] + fn pi_spawn_streaming_fails_fast_with_runtime_contract_error() { + let temp_dir = tempfile::tempdir().expect("temp dir"); + let manager = AgentManager::new(temp_dir.path().join("bin")).expect("agent manager"); + let err = manager + .spawn_streaming(AgentId::Pi, SpawnOptions::new("hello")) + .expect_err("expected Pi spawn_streaming to be rejected"); + assert!(matches!( + err, + AgentError::UnsupportedRuntimePath { + agent: AgentId::Pi, + operation: "spawn_streaming", + .. + } + )); + } + + #[test] + fn extract_pi_session_id_from_session_event() { + let events = vec![json!({ + "type": "session", + "id": "pi-session-123" + })]; + assert_eq!( + extract_session_id(AgentId::Pi, &events).as_deref(), + Some("pi-session-123") + ); + } + + #[test] + fn extract_pi_result_text_from_agent_end_message() { + let events = vec![json!({ + "type": "agent_end", + "messages": [ + { + "role": "assistant", + "content": [ + { + "type": "text", + "text": "OK" + } + ] + } + ] + })]; + assert_eq!( + extract_result_text(AgentId::Pi, &events).as_deref(), + Some("OK") + ); + } + + #[test] + fn extract_pi_result_text_from_message_update_deltas() { + let events = vec![ + json!({ + "type": "message_update", + "assistantMessageEvent": { + "type": "text_delta", + "delta": "O" + } + }), + json!({ + "type": "message_update", + "assistantMessageEvent": { + "type": "text_delta", + "delta": "K" + } + }), + ]; + assert_eq!( + extract_result_text(AgentId::Pi, &events).as_deref(), + Some("OK") + ); + } +} diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index d0908b8..d22ce4e 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -1786,6 +1786,8 @@ impl SessionManager { session.native_session_id = Some(thread_id); } if agent_id == AgentId::Pi { + // Pi uses one dedicated RPC process per daemon session. + // This is the canonical runtime path for Pi sessions. let pi = self .create_pi_session(&session_id, session.model.as_deref()) .await?; @@ -1970,6 +1972,8 @@ impl SessionManager { return Ok(()); } if session_snapshot.agent == AgentId::Pi { + // Pi bypasses generic AgentManager::spawn_streaming and stays on + // router-managed per-session RPC runtime for lifecycle isolation. self.send_pi_prompt(&session_snapshot, &message).await?; if !agent_supports_item_started(session_snapshot.agent) { let _ = self @@ -3859,11 +3863,33 @@ impl SessionManager { "message": prompt }); - runtime + let response_rx = runtime .send_request(id, &request) .ok_or_else(|| SandboxError::StreamError { message: "failed to send pi prompt request".to_string(), })?; + let response = tokio::time::timeout(Duration::from_secs(30), response_rx) + .await + .map_err(|_| SandboxError::StreamError { + message: "pi prompt request timed out".to_string(), + })? + .map_err(|_| SandboxError::StreamError { + message: "pi prompt request cancelled".to_string(), + })?; + if response + .get("success") + .and_then(Value::as_bool) + .is_some_and(|success| !success) + { + let detail = response + .get("error") + .cloned() + .or_else(|| response.get("data").and_then(|data| data.get("error")).cloned()) + .unwrap_or_else(|| response.clone()); + return Err(SandboxError::InvalidRequest { + message: format!("pi prompt failed: {detail}"), + }); + } Ok(()) } @@ -5718,6 +5744,9 @@ fn map_install_error(agent: AgentId, err: ManagerError) -> SandboxError { ManagerError::ResumeUnsupported { agent } => SandboxError::InvalidRequest { message: format!("resume unsupported for {agent}"), }, + ManagerError::UnsupportedRuntimePath { .. } => SandboxError::InvalidRequest { + message: err.to_string(), + }, ManagerError::UnsupportedPlatform { .. } | ManagerError::DownloadFailed { .. } | ManagerError::Http(_) @@ -5738,6 +5767,9 @@ fn map_spawn_error(agent: AgentId, err: ManagerError) -> SandboxError { ManagerError::ResumeUnsupported { agent } => SandboxError::InvalidRequest { message: format!("resume unsupported for {agent}"), }, + ManagerError::UnsupportedRuntimePath { .. } => SandboxError::InvalidRequest { + message: err.to_string(), + }, _ => SandboxError::AgentProcessExited { agent: agent.as_str().to_string(), exit_code: None, @@ -6670,6 +6702,33 @@ mod agent_capabilities_tests { } } +#[cfg(test)] +mod runtime_contract_tests { + use super::*; + + #[test] + fn map_spawn_error_maps_unsupported_runtime_path_to_invalid_request() { + let error = map_spawn_error( + AgentId::Pi, + ManagerError::UnsupportedRuntimePath { + agent: AgentId::Pi, + operation: "spawn_streaming", + recommended_path: "router-managed per-session RPC runtime", + }, + ); + match error { + SandboxError::InvalidRequest { message } => { + assert!(message.contains("spawn_streaming"), "{message}"); + assert!( + message.contains("router-managed per-session RPC runtime"), + "{message}" + ); + } + other => panic!("expected InvalidRequest, got {other:?}"), + } + } +} + #[cfg(test)] mod pi_runtime_tests { use super::*; @@ -6933,10 +6992,103 @@ mod pi_runtime_tests { prompt_request.get("message").and_then(Value::as_str), Some("Hello") ); + let prompt_id = prompt_request + .get("id") + .and_then(Value::as_i64) + .expect("prompt id"); + runtime.complete_request( + prompt_id, + json!({ + "type": "response", + "id": prompt_id, + "success": true + }), + ); task.await.expect("join").expect("send_pi_prompt ok"); } + #[tokio::test] + async fn send_pi_prompt_maps_explicit_rpc_error_to_invalid_request() { + let (session_manager, runtime, mut stdin_rx, _temp_dir) = + setup_pi_session_with_stdin("pi-prompt-error").await; + let snapshot = SessionSnapshot { + session_id: "pi-prompt-error".to_string(), + agent: AgentId::Pi, + agent_mode: "build".to_string(), + permission_mode: "default".to_string(), + model: None, + variant: None, + native_session_id: Some("native-pi-prompt-error".to_string()), + }; + let manager_for_task = session_manager.clone(); + let task = + tokio::spawn(async move { manager_for_task.send_pi_prompt(&snapshot, "Hello").await }); + + let prompt_line = stdin_rx.recv().await.expect("prompt request"); + let prompt_request: Value = serde_json::from_str(&prompt_line).expect("json request"); + assert_eq!(prompt_request.get("type").and_then(Value::as_str), Some("prompt")); + let prompt_id = prompt_request + .get("id") + .and_then(Value::as_i64) + .expect("prompt id"); + runtime.complete_request( + prompt_id, + json!({ + "type": "response", + "id": prompt_id, + "success": false, + "error": "turn already in progress" + }), + ); + + let err = task + .await + .expect("join") + .expect_err("send_pi_prompt should fail"); + match err { + SandboxError::InvalidRequest { message } => { + assert!(message.contains("turn already in progress"), "{message}"); + } + other => panic!("expected InvalidRequest, got {other:?}"), + } + } + + #[tokio::test] + async fn send_pi_prompt_reports_cancelled_response() { + let (session_manager, runtime, mut stdin_rx, _temp_dir) = + setup_pi_session_with_stdin("pi-prompt-cancelled").await; + let snapshot = SessionSnapshot { + session_id: "pi-prompt-cancelled".to_string(), + agent: AgentId::Pi, + agent_mode: "build".to_string(), + permission_mode: "default".to_string(), + model: None, + variant: None, + native_session_id: Some("native-pi-prompt-cancelled".to_string()), + }; + let manager_for_task = session_manager.clone(); + let task = tokio::spawn(async move { + manager_for_task + .send_pi_prompt(&snapshot, "This should cancel") + .await + }); + + let _ = stdin_rx.recv().await.expect("prompt request"); + runtime.clear_pending(); + + let err = task + .await + .expect("join") + .expect_err("send_pi_prompt should fail"); + match err { + SandboxError::StreamError { message } => { + assert!(message.contains("pi prompt request cancelled"), "{message}"); + } + other => panic!("expected StreamError, got {other:?}"), + } + } + #[tokio::test] async fn pi_runtime_output_non_json_emits_agent_unparsed() { let (session_manager, runtime, _temp_dir) = setup_pi_session("pi-unparsed").await; diff --git a/server/packages/sandbox-agent/tests/agent-management/agents.rs b/server/packages/sandbox-agent/tests/agent-management/agents.rs index 42e3e7f..572ecdb 100644 --- a/server/packages/sandbox-agent/tests/agent-management/agents.rs +++ b/server/packages/sandbox-agent/tests/agent-management/agents.rs @@ -53,6 +53,25 @@ fn pi_on_path() -> bool { false } +#[test] +fn pi_spawn_streaming_is_rejected_with_runtime_contract_error( +) -> Result<(), Box> { + let temp_dir = tempfile::tempdir()?; + let manager = AgentManager::new(temp_dir.path().join("bin"))?; + let err = manager + .spawn_streaming(AgentId::Pi, SpawnOptions::new(prompt_ok("IGNORED"))) + .expect_err("expected Pi spawn_streaming to be rejected"); + assert!(matches!( + err, + AgentError::UnsupportedRuntimePath { + agent: AgentId::Pi, + operation: "spawn_streaming", + .. + } + )); + Ok(()) +} + #[test] fn test_agents_install_version_spawn() -> Result<(), Box> { let temp_dir = tempfile::tempdir()?; diff --git a/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agent_install_amp.snap.new b/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agent_install_amp.snap similarity index 100% rename from server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agent_install_amp.snap.new rename to server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agent_install_amp.snap diff --git a/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agents_list_global.snap b/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agents_list_global.snap index cb83e46..259f870 100644 --- a/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agents_list_global.snap +++ b/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agents_list_global.snap @@ -1,5 +1,6 @@ --- source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs +assertion_line: 129 expression: normalize_agent_list(&agents) --- agents: @@ -8,3 +9,4 @@ agents: - id: codex - id: mock - id: opencode + - id: pi diff --git a/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agents_list_global.snap.new b/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agents_list_global.snap.new deleted file mode 100644 index 259f870..0000000 --- a/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__agent_endpoints_snapshots@agents_list_global.snap.new +++ /dev/null @@ -1,12 +0,0 @@ ---- -source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs -assertion_line: 129 -expression: normalize_agent_list(&agents) ---- -agents: - - id: amp - - id: claude - - id: codex - - id: mock - - id: opencode - - id: pi diff --git a/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__auth_snapshots@auth_valid_token_global.snap b/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__auth_snapshots@auth_valid_token_global.snap index 09ca205..37566aa 100644 --- a/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__auth_snapshots@auth_valid_token_global.snap +++ b/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__auth_snapshots@auth_valid_token_global.snap @@ -1,5 +1,6 @@ --- source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs +assertion_line: 59 expression: "json!({\n \"status\": status.as_u16(), \"payload\": normalize_agent_list(&payload),\n})" --- payload: @@ -9,4 +10,5 @@ payload: - id: codex - id: mock - id: opencode + - id: pi status: 200 diff --git a/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__auth_snapshots@auth_valid_token_global.snap.new b/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__auth_snapshots@auth_valid_token_global.snap.new deleted file mode 100644 index 37566aa..0000000 --- a/server/packages/sandbox-agent/tests/http/snapshots/http_endpoints__agent_endpoints__auth_snapshots@auth_valid_token_global.snap.new +++ /dev/null @@ -1,14 +0,0 @@ ---- -source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs -assertion_line: 59 -expression: "json!({\n \"status\": status.as_u16(), \"payload\": normalize_agent_list(&payload),\n})" ---- -payload: - agents: - - id: amp - - id: claude - - id: codex - - id: mock - - id: opencode - - id: pi -status: 200