pi working

This commit is contained in:
Franklin 2026-02-06 18:18:43 -05:00
parent 9a26604001
commit e2e7f11b9a
10 changed files with 451 additions and 38 deletions

View file

@ -22,6 +22,21 @@ Notes:
- opt-in via `include_raw=true` on events endpoints (HTTP + SSE). - opt-in via `include_raw=true` on events endpoints (HTTP + SSE).
- If parsing fails, emit agent.unparsed (source=daemon, synthetic=true). Tests must assert zero unparsed events. - If parsing fails, emit agent.unparsed (source=daemon, synthetic=true). Tests must assert zero unparsed events.
Runtime model by agent
| Agent | Runtime model | Notes |
|---|---|---|
| Claude | Per-message subprocess streaming | Routed through `AgentManager::spawn_streaming` with Claude stream-json stdin. |
| Amp | Per-message subprocess streaming | Routed through `AgentManager::spawn_streaming` with parsed JSONL output. |
| Codex | Shared app-server (stdio JSON-RPC) | One shared server process, daemon sessions map to Codex thread IDs. |
| OpenCode | Shared HTTP server + SSE | One shared HTTP server, daemon sessions map to OpenCode session IDs. |
| Pi | Dedicated per-session RPC process | Canonical path is router-managed Pi runtime (`pi --mode rpc`), one process per daemon session. |
Pi runtime contract:
- Session/message lifecycle for Pi must stay on router-managed per-session RPC runtime.
- `AgentManager::spawn(Pi)` is kept for one-shot utility/testing flows.
- `AgentManager::spawn_streaming(Pi)` is intentionally unsupported.
Events / Message Flow Events / Message Flow
+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+----------------------------+ +------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+----------------------------+

View file

@ -29,13 +29,14 @@ This table shows which agent feature coverage appears in the universal event str
| File Changes | - | ✓ | - | - | | | File Changes | - | ✓ | - | - | |
| MCP Tools | - | ✓ | - | - | | | MCP Tools | - | ✓ | - | - | |
| Streaming Deltas | ✓ | ✓ | ✓ | - | ✓ | | Streaming Deltas | ✓ | ✓ | ✓ | - | ✓ |
| Variants | | ✓ | ✓ | ✓ | | | Variants | | ✓ | ✓ | ✓ | |
Agents: [Claude Code](https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview) · [Codex](https://github.com/openai/codex) · [OpenCode](https://github.com/opencode-ai/opencode) · [Amp](https://ampcode.com) · [Pi](https://buildwithpi.ai/pi-cli) Agents: [Claude Code](https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview) · [Codex](https://github.com/openai/codex) · [OpenCode](https://github.com/opencode-ai/opencode) · [Amp](https://ampcode.com) · [Pi](https://buildwithpi.ai/pi-cli)
- ✓ = Appears in session events - ✓ = Appears in session events
- \- = Agent supports natively, schema conversion coming soon - \- = Agent supports natively, schema conversion coming soon
- (blank) = Not supported by agent - (blank) = Not supported by agent
- Pi runtime model is router-managed per-session RPC (`pi --mode rpc`); it does not use generic subprocess streaming.
<AccordionGroup> <AccordionGroup>
<Accordion title="Text Messages"> <Accordion title="Text Messages">

View file

@ -325,8 +325,17 @@ impl AgentManager {
}); });
} }
AgentId::Pi => { AgentId::Pi => {
return Err(AgentError::UnsupportedAgent { let output = spawn_pi(&path, &working_dir, &options)?;
agent: agent.as_str().to_string(), let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let events = parse_jsonl_from_outputs(&stdout, &stderr);
return Ok(SpawnResult {
status: output.status,
stdout,
stderr,
session_id: extract_session_id(agent, &events),
result: extract_result_text(agent, &events),
events,
}); });
} }
AgentId::Mock => { AgentId::Mock => {
@ -359,6 +368,15 @@ impl AgentManager {
agent: AgentId, agent: AgentId,
mut options: SpawnOptions, mut options: SpawnOptions,
) -> Result<StreamingSpawn, AgentError> { ) -> Result<StreamingSpawn, AgentError> {
// Pi sessions are intentionally handled by the router's dedicated RPC runtime
// (one process per daemon session), not by generic subprocess streaming.
if agent == AgentId::Pi {
return Err(AgentError::UnsupportedRuntimePath {
agent,
operation: "spawn_streaming",
recommended_path: "router-managed per-session RPC runtime",
});
}
let codex_options = if agent == AgentId::Codex { let codex_options = if agent == AgentId::Codex {
Some(options.clone()) Some(options.clone())
} else { } else {
@ -596,6 +614,19 @@ impl AgentManager {
} }
fn build_command(&self, agent: AgentId, options: &SpawnOptions) -> Result<Command, AgentError> { fn build_command(&self, agent: AgentId, options: &SpawnOptions) -> Result<Command, AgentError> {
if agent == AgentId::Pi {
return Err(AgentError::UnsupportedRuntimePath {
agent,
operation: "build_command",
recommended_path: "router-managed per-session RPC runtime",
});
}
if agent == AgentId::Mock {
return Err(AgentError::UnsupportedAgent {
agent: agent.as_str().to_string(),
});
}
let path = self.resolve_binary(agent)?; let path = self.resolve_binary(agent)?;
let working_dir = options let working_dir = options
.working_dir .working_dir
@ -665,14 +696,10 @@ impl AgentManager {
return Ok(build_amp_command(&path, &working_dir, options)); return Ok(build_amp_command(&path, &working_dir, options));
} }
AgentId::Pi => { AgentId::Pi => {
return Err(AgentError::UnsupportedAgent { unreachable!("Pi is handled by router RPC runtime");
agent: agent.as_str().to_string(),
});
} }
AgentId::Mock => { AgentId::Mock => {
return Err(AgentError::UnsupportedAgent { unreachable!("Mock is handled above");
agent: agent.as_str().to_string(),
});
} }
} }
@ -789,6 +816,12 @@ pub enum AgentError {
ExtractFailed(String), ExtractFailed(String),
#[error("resume unsupported for {agent}")] #[error("resume unsupported for {agent}")]
ResumeUnsupported { agent: AgentId }, ResumeUnsupported { agent: AgentId },
#[error("unsupported runtime path for {agent}: {operation}; use {recommended_path}")]
UnsupportedRuntimePath {
agent: AgentId,
operation: &'static str,
recommended_path: &'static str,
},
} }
fn parse_version_output(output: &std::process::Output) -> Option<String> { fn parse_version_output(output: &std::process::Output) -> Option<String> {
@ -990,7 +1023,25 @@ fn extract_session_id(agent: AgentId, events: &[Value]) -> Option<String> {
return Some(id); return Some(id);
} }
} }
AgentId::Pi => {} AgentId::Pi => {
if event.get("type").and_then(Value::as_str) == Some("session") {
if let Some(id) = event.get("id").and_then(Value::as_str) {
return Some(id.to_string());
}
}
if let Some(id) = event.get("session_id").and_then(Value::as_str) {
return Some(id.to_string());
}
if let Some(id) = event.get("sessionId").and_then(Value::as_str) {
return Some(id.to_string());
}
if let Some(id) = extract_nested_string(event, &["data", "sessionId"]) {
return Some(id);
}
if let Some(id) = extract_nested_string(event, &["session", "id"]) {
return Some(id);
}
}
AgentId::Mock => {} AgentId::Mock => {}
} }
} }
@ -1073,11 +1124,124 @@ fn extract_result_text(agent: AgentId, events: &[Value]) -> Option<String> {
Some(buffer) Some(buffer)
} }
} }
AgentId::Pi => None, AgentId::Pi => extract_pi_result_text(events),
AgentId::Mock => None, AgentId::Mock => None,
} }
} }
fn extract_text_from_content_parts(content: &Value) -> Option<String> {
let parts = content.as_array()?;
let mut text = String::new();
for part in parts {
if part.get("type").and_then(Value::as_str) != Some("text") {
continue;
}
if let Some(part_text) = part.get("text").and_then(Value::as_str) {
text.push_str(part_text);
}
}
if text.is_empty() {
None
} else {
Some(text)
}
}
fn extract_assistant_message_text(message: &Value) -> Option<String> {
if message.get("role").and_then(Value::as_str) != Some("assistant") {
return None;
}
if let Some(content) = message.get("content") {
return extract_text_from_content_parts(content);
}
None
}
fn extract_pi_result_text(events: &[Value]) -> Option<String> {
let mut delta_buffer = String::new();
let mut last_full = None;
for event in events {
if event.get("type").and_then(Value::as_str) == Some("message_update") {
if let Some(delta_kind) =
extract_nested_string(event, &["assistantMessageEvent", "type"])
{
if delta_kind == "text_delta" {
if let Some(delta) =
extract_nested_string(event, &["assistantMessageEvent", "delta"])
{
delta_buffer.push_str(&delta);
}
if let Some(delta) =
extract_nested_string(event, &["assistantMessageEvent", "text"])
{
delta_buffer.push_str(&delta);
}
}
}
}
if let Some(message) = event.get("message") {
if let Some(text) = extract_assistant_message_text(message) {
last_full = Some(text);
}
}
if event.get("type").and_then(Value::as_str) == Some("agent_end") {
if let Some(messages) = event.get("messages").and_then(Value::as_array) {
for message in messages {
if let Some(text) = extract_assistant_message_text(message) {
last_full = Some(text);
}
}
}
}
}
if delta_buffer.is_empty() {
last_full
} else {
Some(delta_buffer)
}
}
fn apply_pi_model_args(command: &mut Command, model: Option<&str>) {
let Some(model) = model else {
return;
};
if let Some((provider, model_id)) = model.split_once('/') {
command
.arg("--provider")
.arg(provider)
.arg("--model")
.arg(model_id);
return;
}
command.arg("--model").arg(model);
}
fn spawn_pi(
path: &Path,
working_dir: &Path,
options: &SpawnOptions,
) -> Result<std::process::Output, AgentError> {
if options.session_id.is_some() {
return Err(AgentError::ResumeUnsupported { agent: AgentId::Pi });
}
let mut command = Command::new(path);
command
.current_dir(working_dir)
.arg("--mode")
.arg("json")
.arg("--print");
apply_pi_model_args(&mut command, options.model.as_deref());
if let Some(variant) = options.variant.as_deref() {
command.arg("--thinking").arg(variant);
}
command.arg(&options.prompt);
for (key, value) in &options.env {
command.env(key, value);
}
command.output().map_err(AgentError::Io)
}
fn spawn_amp( fn spawn_amp(
path: &Path, path: &Path,
working_dir: &Path, working_dir: &Path,
@ -1505,3 +1669,87 @@ fn find_file_recursive(dir: &Path, filename: &str) -> Result<Option<PathBuf>, Ag
} }
Ok(None) Ok(None)
} }
#[cfg(test)]
mod tests {
use serde_json::json;
use super::{
extract_result_text, extract_session_id, AgentError, AgentId, AgentManager, SpawnOptions,
};
#[test]
fn pi_spawn_streaming_fails_fast_with_runtime_contract_error() {
let temp_dir = tempfile::tempdir().expect("temp dir");
let manager = AgentManager::new(temp_dir.path().join("bin")).expect("agent manager");
let err = manager
.spawn_streaming(AgentId::Pi, SpawnOptions::new("hello"))
.expect_err("expected Pi spawn_streaming to be rejected");
assert!(matches!(
err,
AgentError::UnsupportedRuntimePath {
agent: AgentId::Pi,
operation: "spawn_streaming",
..
}
));
}
#[test]
fn extract_pi_session_id_from_session_event() {
let events = vec![json!({
"type": "session",
"id": "pi-session-123"
})];
assert_eq!(
extract_session_id(AgentId::Pi, &events).as_deref(),
Some("pi-session-123")
);
}
#[test]
fn extract_pi_result_text_from_agent_end_message() {
let events = vec![json!({
"type": "agent_end",
"messages": [
{
"role": "assistant",
"content": [
{
"type": "text",
"text": "OK"
}
]
}
]
})];
assert_eq!(
extract_result_text(AgentId::Pi, &events).as_deref(),
Some("OK")
);
}
#[test]
fn extract_pi_result_text_from_message_update_deltas() {
let events = vec![
json!({
"type": "message_update",
"assistantMessageEvent": {
"type": "text_delta",
"delta": "O"
}
}),
json!({
"type": "message_update",
"assistantMessageEvent": {
"type": "text_delta",
"delta": "K"
}
}),
];
assert_eq!(
extract_result_text(AgentId::Pi, &events).as_deref(),
Some("OK")
);
}
}

View file

@ -1786,6 +1786,8 @@ impl SessionManager {
session.native_session_id = Some(thread_id); session.native_session_id = Some(thread_id);
} }
if agent_id == AgentId::Pi { if agent_id == AgentId::Pi {
// Pi uses one dedicated RPC process per daemon session.
// This is the canonical runtime path for Pi sessions.
let pi = self let pi = self
.create_pi_session(&session_id, session.model.as_deref()) .create_pi_session(&session_id, session.model.as_deref())
.await?; .await?;
@ -1970,6 +1972,8 @@ impl SessionManager {
return Ok(()); return Ok(());
} }
if session_snapshot.agent == AgentId::Pi { if session_snapshot.agent == AgentId::Pi {
// Pi bypasses generic AgentManager::spawn_streaming and stays on
// router-managed per-session RPC runtime for lifecycle isolation.
self.send_pi_prompt(&session_snapshot, &message).await?; self.send_pi_prompt(&session_snapshot, &message).await?;
if !agent_supports_item_started(session_snapshot.agent) { if !agent_supports_item_started(session_snapshot.agent) {
let _ = self let _ = self
@ -3859,11 +3863,33 @@ impl SessionManager {
"message": prompt "message": prompt
}); });
runtime let response_rx = runtime
.send_request(id, &request) .send_request(id, &request)
.ok_or_else(|| SandboxError::StreamError { .ok_or_else(|| SandboxError::StreamError {
message: "failed to send pi prompt request".to_string(), message: "failed to send pi prompt request".to_string(),
})?; })?;
let response = tokio::time::timeout(Duration::from_secs(30), response_rx)
.await
.map_err(|_| SandboxError::StreamError {
message: "pi prompt request timed out".to_string(),
})?
.map_err(|_| SandboxError::StreamError {
message: "pi prompt request cancelled".to_string(),
})?;
if response
.get("success")
.and_then(Value::as_bool)
.is_some_and(|success| !success)
{
let detail = response
.get("error")
.cloned()
.or_else(|| response.get("data").and_then(|data| data.get("error")).cloned())
.unwrap_or_else(|| response.clone());
return Err(SandboxError::InvalidRequest {
message: format!("pi prompt failed: {detail}"),
});
}
Ok(()) Ok(())
} }
@ -5718,6 +5744,9 @@ fn map_install_error(agent: AgentId, err: ManagerError) -> SandboxError {
ManagerError::ResumeUnsupported { agent } => SandboxError::InvalidRequest { ManagerError::ResumeUnsupported { agent } => SandboxError::InvalidRequest {
message: format!("resume unsupported for {agent}"), message: format!("resume unsupported for {agent}"),
}, },
ManagerError::UnsupportedRuntimePath { .. } => SandboxError::InvalidRequest {
message: err.to_string(),
},
ManagerError::UnsupportedPlatform { .. } ManagerError::UnsupportedPlatform { .. }
| ManagerError::DownloadFailed { .. } | ManagerError::DownloadFailed { .. }
| ManagerError::Http(_) | ManagerError::Http(_)
@ -5738,6 +5767,9 @@ fn map_spawn_error(agent: AgentId, err: ManagerError) -> SandboxError {
ManagerError::ResumeUnsupported { agent } => SandboxError::InvalidRequest { ManagerError::ResumeUnsupported { agent } => SandboxError::InvalidRequest {
message: format!("resume unsupported for {agent}"), message: format!("resume unsupported for {agent}"),
}, },
ManagerError::UnsupportedRuntimePath { .. } => SandboxError::InvalidRequest {
message: err.to_string(),
},
_ => SandboxError::AgentProcessExited { _ => SandboxError::AgentProcessExited {
agent: agent.as_str().to_string(), agent: agent.as_str().to_string(),
exit_code: None, exit_code: None,
@ -6670,6 +6702,33 @@ mod agent_capabilities_tests {
} }
} }
#[cfg(test)]
mod runtime_contract_tests {
use super::*;
#[test]
fn map_spawn_error_maps_unsupported_runtime_path_to_invalid_request() {
let error = map_spawn_error(
AgentId::Pi,
ManagerError::UnsupportedRuntimePath {
agent: AgentId::Pi,
operation: "spawn_streaming",
recommended_path: "router-managed per-session RPC runtime",
},
);
match error {
SandboxError::InvalidRequest { message } => {
assert!(message.contains("spawn_streaming"), "{message}");
assert!(
message.contains("router-managed per-session RPC runtime"),
"{message}"
);
}
other => panic!("expected InvalidRequest, got {other:?}"),
}
}
}
#[cfg(test)] #[cfg(test)]
mod pi_runtime_tests { mod pi_runtime_tests {
use super::*; use super::*;
@ -6933,10 +6992,103 @@ mod pi_runtime_tests {
prompt_request.get("message").and_then(Value::as_str), prompt_request.get("message").and_then(Value::as_str),
Some("Hello") Some("Hello")
); );
let prompt_id = prompt_request
.get("id")
.and_then(Value::as_i64)
.expect("prompt id");
runtime.complete_request(
prompt_id,
json!({
"type": "response",
"id": prompt_id,
"success": true
}),
);
task.await.expect("join").expect("send_pi_prompt ok"); task.await.expect("join").expect("send_pi_prompt ok");
} }
#[tokio::test]
async fn send_pi_prompt_maps_explicit_rpc_error_to_invalid_request() {
let (session_manager, runtime, mut stdin_rx, _temp_dir) =
setup_pi_session_with_stdin("pi-prompt-error").await;
let snapshot = SessionSnapshot {
session_id: "pi-prompt-error".to_string(),
agent: AgentId::Pi,
agent_mode: "build".to_string(),
permission_mode: "default".to_string(),
model: None,
variant: None,
native_session_id: Some("native-pi-prompt-error".to_string()),
};
let manager_for_task = session_manager.clone();
let task =
tokio::spawn(async move { manager_for_task.send_pi_prompt(&snapshot, "Hello").await });
let prompt_line = stdin_rx.recv().await.expect("prompt request");
let prompt_request: Value = serde_json::from_str(&prompt_line).expect("json request");
assert_eq!(prompt_request.get("type").and_then(Value::as_str), Some("prompt"));
let prompt_id = prompt_request
.get("id")
.and_then(Value::as_i64)
.expect("prompt id");
runtime.complete_request(
prompt_id,
json!({
"type": "response",
"id": prompt_id,
"success": false,
"error": "turn already in progress"
}),
);
let err = task
.await
.expect("join")
.expect_err("send_pi_prompt should fail");
match err {
SandboxError::InvalidRequest { message } => {
assert!(message.contains("turn already in progress"), "{message}");
}
other => panic!("expected InvalidRequest, got {other:?}"),
}
}
#[tokio::test]
async fn send_pi_prompt_reports_cancelled_response() {
let (session_manager, runtime, mut stdin_rx, _temp_dir) =
setup_pi_session_with_stdin("pi-prompt-cancelled").await;
let snapshot = SessionSnapshot {
session_id: "pi-prompt-cancelled".to_string(),
agent: AgentId::Pi,
agent_mode: "build".to_string(),
permission_mode: "default".to_string(),
model: None,
variant: None,
native_session_id: Some("native-pi-prompt-cancelled".to_string()),
};
let manager_for_task = session_manager.clone();
let task = tokio::spawn(async move {
manager_for_task
.send_pi_prompt(&snapshot, "This should cancel")
.await
});
let _ = stdin_rx.recv().await.expect("prompt request");
runtime.clear_pending();
let err = task
.await
.expect("join")
.expect_err("send_pi_prompt should fail");
match err {
SandboxError::StreamError { message } => {
assert!(message.contains("pi prompt request cancelled"), "{message}");
}
other => panic!("expected StreamError, got {other:?}"),
}
}
#[tokio::test] #[tokio::test]
async fn pi_runtime_output_non_json_emits_agent_unparsed() { async fn pi_runtime_output_non_json_emits_agent_unparsed() {
let (session_manager, runtime, _temp_dir) = setup_pi_session("pi-unparsed").await; let (session_manager, runtime, _temp_dir) = setup_pi_session("pi-unparsed").await;

View file

@ -53,6 +53,25 @@ fn pi_on_path() -> bool {
false false
} }
#[test]
fn pi_spawn_streaming_is_rejected_with_runtime_contract_error(
) -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let manager = AgentManager::new(temp_dir.path().join("bin"))?;
let err = manager
.spawn_streaming(AgentId::Pi, SpawnOptions::new(prompt_ok("IGNORED")))
.expect_err("expected Pi spawn_streaming to be rejected");
assert!(matches!(
err,
AgentError::UnsupportedRuntimePath {
agent: AgentId::Pi,
operation: "spawn_streaming",
..
}
));
Ok(())
}
#[test] #[test]
fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>> { fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?; let temp_dir = tempfile::tempdir()?;

View file

@ -1,5 +1,6 @@
--- ---
source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs
assertion_line: 129
expression: normalize_agent_list(&agents) expression: normalize_agent_list(&agents)
--- ---
agents: agents:
@ -8,3 +9,4 @@ agents:
- id: codex - id: codex
- id: mock - id: mock
- id: opencode - id: opencode
- id: pi

View file

@ -1,12 +0,0 @@
---
source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs
assertion_line: 129
expression: normalize_agent_list(&agents)
---
agents:
- id: amp
- id: claude
- id: codex
- id: mock
- id: opencode
- id: pi

View file

@ -1,5 +1,6 @@
--- ---
source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs
assertion_line: 59
expression: "json!({\n \"status\": status.as_u16(), \"payload\": normalize_agent_list(&payload),\n})" expression: "json!({\n \"status\": status.as_u16(), \"payload\": normalize_agent_list(&payload),\n})"
--- ---
payload: payload:
@ -9,4 +10,5 @@ payload:
- id: codex - id: codex
- id: mock - id: mock
- id: opencode - id: opencode
- id: pi
status: 200 status: 200

View file

@ -1,14 +0,0 @@
---
source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs
assertion_line: 59
expression: "json!({\n \"status\": status.as_u16(), \"payload\": normalize_agent_list(&payload),\n})"
---
payload:
agents:
- id: amp
- id: claude
- id: codex
- id: mock
- id: opencode
- id: pi
status: 200