support pi

This commit is contained in:
Franklin 2026-02-05 17:06:53 -05:00
parent cc5a9e0d73
commit 843498e9db
41 changed files with 2654 additions and 102 deletions

View file

@ -7,7 +7,6 @@ use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitSta
use std::time::{Duration, Instant};
use flate2::read::GzDecoder;
use reqwest::blocking::Client;
use sandbox_agent_extracted_agent_schemas::codex as codex_schema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -21,6 +20,7 @@ pub enum AgentId {
Codex,
Opencode,
Amp,
Pi,
Mock,
}
@ -31,17 +31,55 @@ impl AgentId {
AgentId::Codex => "codex",
AgentId::Opencode => "opencode",
AgentId::Amp => "amp",
AgentId::Pi => "pi",
AgentId::Mock => "mock",
}
}
pub fn binary_name(self) -> &'static str {
match self {
AgentId::Claude => "claude",
AgentId::Codex => "codex",
AgentId::Opencode => "opencode",
AgentId::Amp => "amp",
AgentId::Mock => "mock",
AgentId::Claude => {
if cfg!(windows) {
"claude.exe"
} else {
"claude"
}
}
AgentId::Codex => {
if cfg!(windows) {
"codex.exe"
} else {
"codex"
}
}
AgentId::Opencode => {
if cfg!(windows) {
"opencode.exe"
} else {
"opencode"
}
}
AgentId::Amp => {
if cfg!(windows) {
"amp.exe"
} else {
"amp"
}
}
AgentId::Pi => {
if cfg!(windows) {
"pi.exe"
} else {
"pi"
}
}
AgentId::Mock => {
if cfg!(windows) {
"mock.exe"
} else {
"mock"
}
}
}
}
@ -51,6 +89,7 @@ impl AgentId {
"codex" => Some(AgentId::Codex),
"opencode" => Some(AgentId::Opencode),
"amp" => Some(AgentId::Amp),
"pi" => Some(AgentId::Pi),
"mock" => Some(AgentId::Mock),
_ => None,
}
@ -151,6 +190,7 @@ impl AgentManager {
install_opencode(&install_path, self.platform, options.version.as_deref())?
}
AgentId::Amp => install_amp(&install_path, self.platform, options.version.as_deref())?,
AgentId::Pi => install_pi(&install_path, self.platform, options.version.as_deref())?,
AgentId::Mock => {
if !install_path.exists() {
fs::write(&install_path, b"mock")?;
@ -284,6 +324,11 @@ impl AgentManager {
events,
});
}
AgentId::Pi => {
return Err(AgentError::UnsupportedAgent {
agent: agent.as_str().to_string(),
});
}
AgentId::Mock => {
return Err(AgentError::UnsupportedAgent {
agent: agent.as_str().to_string(),
@ -619,6 +664,11 @@ impl AgentManager {
AgentId::Amp => {
return Ok(build_amp_command(&path, &working_dir, options));
}
AgentId::Pi => {
return Err(AgentError::UnsupportedAgent {
agent: agent.as_str().to_string(),
});
}
AgentId::Mock => {
return Err(AgentError::UnsupportedAgent {
agent: agent.as_str().to_string(),
@ -940,6 +990,7 @@ fn extract_session_id(agent: AgentId, events: &[Value]) -> Option<String> {
return Some(id);
}
}
AgentId::Pi => {}
AgentId::Mock => {}
}
}
@ -1022,6 +1073,7 @@ fn extract_result_text(agent: AgentId, events: &[Value]) -> Option<String> {
Some(buffer)
}
}
AgentId::Pi => None,
AgentId::Mock => None,
}
}
@ -1200,7 +1252,7 @@ fn default_install_dir() -> PathBuf {
}
fn download_bytes(url: &Url) -> Result<Vec<u8>, AgentError> {
let client = Client::builder().build()?;
let client = crate::http_client::blocking_client_builder().build()?;
let mut response = client.get(url.clone()).send()?;
if !response.status().is_success() {
return Err(AgentError::DownloadFailed { url: url.clone() });
@ -1210,6 +1262,28 @@ fn download_bytes(url: &Url) -> Result<Vec<u8>, AgentError> {
Ok(bytes)
}
fn install_pi(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> {
let asset = match platform {
Platform::LinuxX64 | Platform::LinuxX64Musl => "pi-linux-x64",
Platform::LinuxArm64 => "pi-linux-arm64",
Platform::MacosArm64 => "pi-darwin-arm64",
Platform::MacosX64 => "pi-darwin-x64",
}
.to_string();
let url = match version {
Some(version) => Url::parse(&format!(
"https://upd.dev/badlogic/pi-mono/releases/download/{version}/{asset}"
))?,
None => Url::parse(&format!(
"https://upd.dev/badlogic/pi-mono/releases/latest/download/{asset}"
))?,
};
let bytes = download_bytes(&url)?;
write_executable(path, &bytes)?;
Ok(())
}
fn install_claude(
path: &Path,
platform: Platform,
@ -1329,7 +1403,7 @@ fn install_opencode(
};
install_zip_binary(path, &url, "opencode")
}
_ => {
Platform::LinuxX64 | Platform::LinuxX64Musl | Platform::LinuxArm64 => {
let platform_segment = match platform {
Platform::LinuxX64 => "linux-x64",
Platform::LinuxX64Musl => "linux-x64-musl",

View file

@ -0,0 +1,20 @@
use std::env;
use reqwest::blocking::ClientBuilder;
const NO_SYSTEM_PROXY_ENV: &str = "SANDBOX_AGENT_NO_SYSTEM_PROXY";
fn disable_system_proxy() -> bool {
env::var(NO_SYSTEM_PROXY_ENV)
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
pub(crate) fn blocking_client_builder() -> ClientBuilder {
let builder = reqwest::blocking::Client::builder();
if disable_system_proxy() {
builder.no_proxy()
} else {
builder
}
}

View file

@ -1,3 +1,4 @@
pub mod agents;
pub mod credentials;
mod http_client;
pub mod testing;

View file

@ -2,7 +2,6 @@ use std::env;
use std::path::PathBuf;
use std::time::Duration;
use reqwest::blocking::Client;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use reqwest::StatusCode;
use thiserror::Error;
@ -36,6 +35,7 @@ pub enum TestAgentConfigError {
const AGENTS_ENV: &str = "SANDBOX_TEST_AGENTS";
const ANTHROPIC_ENV: &str = "SANDBOX_TEST_ANTHROPIC_API_KEY";
const OPENAI_ENV: &str = "SANDBOX_TEST_OPENAI_API_KEY";
const PI_ENV: &str = "SANDBOX_TEST_PI";
const ANTHROPIC_MODELS_URL: &str = "https://api.anthropic.com/v1/models";
const OPENAI_MODELS_URL: &str = "https://api.openai.com/v1/models";
const ANTHROPIC_VERSION: &str = "2023-06-01";
@ -63,6 +63,7 @@ pub fn test_agents_from_env() -> Result<Vec<TestAgentConfig>, TestAgentConfigErr
AgentId::Codex,
AgentId::Opencode,
AgentId::Amp,
AgentId::Pi,
]);
continue;
}
@ -73,6 +74,12 @@ pub fn test_agents_from_env() -> Result<Vec<TestAgentConfig>, TestAgentConfigErr
agents
};
let include_pi = pi_tests_enabled() && find_in_path(AgentId::Pi.binary_name());
if !include_pi && agents.iter().any(|agent| *agent == AgentId::Pi) {
eprintln!("Skipping Pi tests: set {PI_ENV}=1 and ensure pi is on PATH.");
}
agents.retain(|agent| *agent != AgentId::Pi || include_pi);
agents.sort_by(|a, b| a.as_str().cmp(b.as_str()));
agents.dedup();
@ -137,6 +144,21 @@ pub fn test_agents_from_env() -> Result<Vec<TestAgentConfig>, TestAgentConfigErr
}
credentials_with(anthropic_cred.clone(), openai_cred.clone())
}
AgentId::Pi => {
if anthropic_cred.is_none() && openai_cred.is_none() {
return Err(TestAgentConfigError::MissingCredentials {
agent,
missing: format!("{ANTHROPIC_ENV} or {OPENAI_ENV}"),
});
}
if let Some(cred) = anthropic_cred.as_ref() {
ensure_anthropic_ok(&mut health_cache, cred)?;
}
if let Some(cred) = openai_cred.as_ref() {
ensure_openai_ok(&mut health_cache, cred)?;
}
credentials_with(anthropic_cred.clone(), openai_cred.clone())
}
AgentId::Mock => credentials_with(None, None),
};
configs.push(TestAgentConfig { agent, credentials });
@ -172,7 +194,7 @@ fn ensure_openai_ok(
fn health_check_anthropic(credentials: &ProviderCredentials) -> Result<(), TestAgentConfigError> {
let credentials = credentials.clone();
run_blocking_check("anthropic", move || {
let client = Client::builder()
let client = crate::http_client::blocking_client_builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|err| TestAgentConfigError::HealthCheckFailed {
@ -226,7 +248,7 @@ fn health_check_anthropic(credentials: &ProviderCredentials) -> Result<(), TestA
fn health_check_openai(credentials: &ProviderCredentials) -> Result<(), TestAgentConfigError> {
let credentials = credentials.clone();
run_blocking_check("openai", move || {
let client = Client::builder()
let client = crate::http_client::blocking_client_builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|err| TestAgentConfigError::HealthCheckFailed {
@ -298,12 +320,15 @@ where
}
fn detect_system_agents() -> Vec<AgentId> {
let candidates = [
let mut candidates = vec![
AgentId::Claude,
AgentId::Codex,
AgentId::Opencode,
AgentId::Amp,
];
if pi_tests_enabled() && find_in_path(AgentId::Pi.binary_name()) {
candidates.push(AgentId::Pi);
}
let install_dir = default_install_dir();
candidates
.into_iter()
@ -345,6 +370,15 @@ fn read_env_key(name: &str) -> Option<String> {
})
}
fn pi_tests_enabled() -> bool {
env::var(PI_ENV)
.map(|value| {
let value = value.trim().to_ascii_lowercase();
value == "1" || value == "true" || value == "yes"
})
.unwrap_or(false)
}
fn credentials_with(
anthropic_cred: Option<ProviderCredentials>,
openai_cred: Option<ProviderCredentials>,

View file

@ -11,6 +11,7 @@ fn main() {
("claude", "claude.json"),
("codex", "codex.json"),
("amp", "amp.json"),
("pi", "pi.json"),
];
for (name, file) in schemas {

View file

@ -5,6 +5,7 @@
//! - Claude Code SDK
//! - Codex SDK
//! - AMP Code SDK
//! - Pi RPC
pub mod opencode {
//! OpenCode SDK types extracted from OpenAPI 3.1.1 spec.
@ -25,3 +26,8 @@ pub mod amp {
//! AMP Code SDK types.
include!(concat!(env!("OUT_DIR"), "/amp.rs"));
}
pub mod pi {
//! Pi RPC types.
include!(concat!(env!("OUT_DIR"), "/pi.rs"));
}

View file

@ -0,0 +1,30 @@
use std::env;
use reqwest::blocking::ClientBuilder as BlockingClientBuilder;
use reqwest::ClientBuilder;
const NO_SYSTEM_PROXY_ENV: &str = "SANDBOX_AGENT_NO_SYSTEM_PROXY";
fn disable_system_proxy() -> bool {
env::var(NO_SYSTEM_PROXY_ENV)
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
pub fn client_builder() -> ClientBuilder {
let builder = reqwest::Client::builder();
if disable_system_proxy() {
builder.no_proxy()
} else {
builder
}
}
pub fn blocking_client_builder() -> BlockingClientBuilder {
let builder = reqwest::blocking::Client::builder();
if disable_system_proxy() {
builder.no_proxy()
} else {
builder
}
}

View file

@ -2,6 +2,7 @@
mod agent_server_logs;
pub mod credentials;
pub mod http_client;
pub mod router;
pub mod telemetry;
pub mod ui;

View file

@ -11,6 +11,7 @@ mod build_version {
}
use reqwest::blocking::Client as HttpClient;
use reqwest::Method;
use sandbox_agent::http_client;
use sandbox_agent::router::{build_router_with_state, shutdown_servers};
use sandbox_agent::router::{
AgentInstallRequest, AppState, AuthConfig, CreateSessionRequest, MessageRequest,
@ -687,6 +688,7 @@ enum CredentialAgent {
Codex,
Opencode,
Amp,
Pi,
}
fn credentials_to_output(credentials: ExtractedCredentials, reveal: bool) -> CredentialsOutput {
@ -806,6 +808,31 @@ fn select_token_for_agent(
)))
}
}
CredentialAgent::Pi => {
if let Some(provider) = provider {
return select_token_for_provider(credentials, provider);
}
if let Some(openai) = credentials.openai.as_ref() {
return Ok(openai.api_key.clone());
}
if let Some(anthropic) = credentials.anthropic.as_ref() {
return Ok(anthropic.api_key.clone());
}
if credentials.other.len() == 1 {
if let Some((_, cred)) = credentials.other.iter().next() {
return Ok(cred.api_key.clone());
}
}
let available = available_providers(credentials);
if available.is_empty() {
Err(CliError::Server("no credentials found for pi".to_string()))
} else {
Err(CliError::Server(format!(
"multiple providers available for pi: {} (use --provider)",
available.join(", ")
)))
}
}
}
}
@ -919,7 +946,7 @@ impl ClientContext {
} else {
cli.token.clone()
};
let client = HttpClient::builder().build()?;
let client = http_client::blocking_client_builder().build()?;
Ok(Self {
endpoint,
token,

File diff suppressed because it is too large Load diff

View file

@ -10,6 +10,8 @@ use serde::Serialize;
use time::OffsetDateTime;
use tokio::time::Instant;
use crate::http_client;
const TELEMETRY_URL: &str = "https://tc.rivet.dev";
const TELEMETRY_ENV_DEBUG: &str = "SANDBOX_AGENT_TELEMETRY_DEBUG";
const TELEMETRY_ID_FILE: &str = "telemetry_id";
@ -77,7 +79,7 @@ pub fn log_enabled_message() {
pub fn spawn_telemetry_task() {
tokio::spawn(async move {
let client = match Client::builder()
let client = match http_client::client_builder()
.timeout(Duration::from_millis(TELEMETRY_TIMEOUT_MS))
.build()
{

View file

@ -5,3 +5,4 @@ mod agent_permission_flow;
mod agent_question_flow;
mod agent_termination;
mod agent_tool_flow;
mod pi_rpc_integration;

View file

@ -0,0 +1,61 @@
// Pi RPC integration tests (gated via SANDBOX_TEST_PI + PATH).
include!("../common/http.rs");
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pi_rpc_session_and_stream() {
let configs = match test_agents_from_env() {
Ok(configs) => configs,
Err(err) => {
eprintln!("Skipping Pi RPC integration test: {err}");
return;
}
};
let Some(config) = configs.iter().find(|config| config.agent == AgentId::Pi) else {
return;
};
let app = TestApp::new();
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_id = "pi-rpc-session".to_string();
let (status, payload) = send_json(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_id}"),
Some(json!({
"agent": "pi",
"permissionMode": test_permission_mode(AgentId::Pi),
})),
)
.await;
assert_eq!(status, StatusCode::OK, "create pi session");
let native_session_id = payload
.get("native_session_id")
.and_then(Value::as_str)
.unwrap_or("");
assert!(
!native_session_id.is_empty(),
"expected native_session_id for pi session"
);
let events = read_turn_stream_events(&app.app, &session_id, Duration::from_secs(120)).await;
assert!(!events.is_empty(), "no events from pi stream");
assert!(
!events.iter().any(is_unparsed_event),
"agent.unparsed event encountered"
);
let mut last_sequence = 0u64;
for event in events {
let sequence = event
.get("sequence")
.and_then(Value::as_u64)
.expect("missing sequence");
assert!(
sequence > last_sequence,
"sequence did not increase (prev {last_sequence}, next {sequence})"
);
last_sequence = sequence;
}
}

View file

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::env;
use sandbox_agent_agent_management::agents::{
AgentError, AgentId, AgentManager, InstallOptions, SpawnOptions,
@ -29,6 +30,29 @@ fn prompt_ok(label: &str) -> String {
format!("Respond with exactly the text {label} and nothing else.")
}
fn pi_tests_enabled() -> bool {
env::var("SANDBOX_TEST_PI")
.map(|value| {
let value = value.trim().to_ascii_lowercase();
value == "1" || value == "true" || value == "yes"
})
.unwrap_or(false)
}
fn pi_on_path() -> bool {
let binary = AgentId::Pi.binary_name();
let path_var = match env::var_os("PATH") {
Some(path) => path,
None => return false,
};
for path in env::split_paths(&path_var) {
if path.join(binary).exists() {
return true;
}
}
false
}
#[test]
fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
@ -36,12 +60,15 @@ fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>>
let env = build_env();
assert!(!env.is_empty(), "expected credentials to be available");
let agents = [
let mut agents = vec![
AgentId::Claude,
AgentId::Codex,
AgentId::Opencode,
AgentId::Amp,
];
if pi_tests_enabled() && pi_on_path() {
agents.push(AgentId::Pi);
}
for agent in agents {
let install = manager.install(agent, InstallOptions::default())?;
assert!(install.path.exists(), "expected install for {agent}");

View file

@ -178,7 +178,7 @@ async fn install_agent(app: &Router, agent: AgentId) {
/// while other agents support "bypass" which skips tool approval.
fn test_permission_mode(agent: AgentId) -> &'static str {
match agent {
AgentId::Opencode => "default",
AgentId::Opencode | AgentId::Pi => "default",
_ => "bypass",
}
}

View file

@ -182,7 +182,7 @@ pub async fn create_session_with_mode(
pub fn test_permission_mode(agent: AgentId) -> &'static str {
match agent {
AgentId::Opencode => "default",
AgentId::Opencode | AgentId::Pi => "default",
_ => "bypass",
}
}

View file

@ -2,3 +2,4 @@ pub mod amp;
pub mod claude;
pub mod codex;
pub mod opencode;
pub mod pi;

View file

@ -0,0 +1,674 @@
use std::collections::{HashMap, HashSet};
use serde_json::Value;
use crate::pi as schema;
use crate::{
ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus,
ReasoningVisibility, UniversalEventData, UniversalEventType, UniversalItem,
};
#[derive(Default)]
pub struct PiEventConverter {
tool_result_buffers: HashMap<String, String>,
tool_result_started: HashSet<String>,
message_errors: HashSet<String>,
message_reasoning: HashMap<String, String>,
message_text: HashMap<String, String>,
last_message_id: Option<String>,
message_started: HashSet<String>,
message_counter: u64,
}
impl PiEventConverter {
fn next_synthetic_message_id(&mut self) -> String {
self.message_counter += 1;
format!("pi_msg_{}", self.message_counter)
}
fn ensure_message_id(&mut self, message_id: Option<String>) -> String {
if let Some(id) = message_id {
self.last_message_id = Some(id.clone());
return id;
}
if let Some(id) = self.last_message_id.clone() {
return id;
}
let id = self.next_synthetic_message_id();
self.last_message_id = Some(id.clone());
id
}
fn ensure_message_started(&mut self, message_id: &str) -> Option<EventConversion> {
if !self.message_started.insert(message_id.to_string()) {
return None;
}
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(message_id.to_string()),
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content: Vec::new(),
status: ItemStatus::InProgress,
};
Some(
EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
)
.synthetic(),
)
}
fn clear_last_message_id(&mut self, message_id: Option<&str>) {
if message_id.is_none() || self.last_message_id.as_deref() == message_id {
self.last_message_id = None;
}
}
pub fn event_to_universal(
&mut self,
event: &schema::RpcEvent,
) -> Result<Vec<EventConversion>, String> {
let raw = serde_json::to_value(event).map_err(|err| err.to_string())?;
let event_type = raw
.get("type")
.and_then(Value::as_str)
.ok_or_else(|| "missing event type".to_string())?;
let native_session_id = extract_session_id(&raw);
let conversions = match event_type {
"message_start" => self.message_start(&raw),
"message_update" => self.message_update(&raw),
"message_end" => self.message_end(&raw),
"tool_execution_start" => self.tool_execution_start(&raw),
"tool_execution_update" => self.tool_execution_update(&raw),
"tool_execution_end" => self.tool_execution_end(&raw),
"agent_start"
| "agent_end"
| "turn_start"
| "turn_end"
| "auto_compaction"
| "auto_compaction_start"
| "auto_compaction_end"
| "auto_retry"
| "auto_retry_start"
| "auto_retry_end"
| "hook_error" => Ok(vec![status_event(event_type, &raw)]),
"extension_ui_request" | "extension_ui_response" | "extension_error" => {
Ok(vec![status_event(event_type, &raw)])
}
other => Err(format!("unsupported Pi event type: {other}")),
}?;
Ok(conversions
.into_iter()
.map(|conversion| attach_metadata(conversion, &native_session_id, &raw))
.collect())
}
fn message_start(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let message = raw.get("message");
if is_user_role(message) {
return Ok(Vec::new());
}
let message_id = self.ensure_message_id(extract_message_id(raw));
self.message_started.insert(message_id.clone());
let content = message.and_then(parse_message_content).unwrap_or_default();
let entry = self.message_text.entry(message_id.clone()).or_default();
for part in &content {
if let ContentPart::Text { text } = part {
entry.push_str(text);
}
}
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(message_id),
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content,
status: ItemStatus::InProgress,
};
Ok(vec![EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
)])
}
fn message_update(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let assistant_event = raw
.get("assistantMessageEvent")
.or_else(|| raw.get("assistant_message_event"))
.ok_or_else(|| "missing assistantMessageEvent".to_string())?;
let event_type = assistant_event
.get("type")
.and_then(Value::as_str)
.unwrap_or("");
let message_id = extract_message_id(raw)
.or_else(|| extract_message_id(assistant_event))
.or_else(|| self.last_message_id.clone());
match event_type {
"start" => {
if let Some(id) = message_id {
self.last_message_id = Some(id);
}
Ok(Vec::new())
}
"text_start" | "text_delta" | "text_end" => {
let Some(delta) = extract_delta_text(assistant_event) else {
return Ok(Vec::new());
};
let message_id = self.ensure_message_id(message_id);
let entry = self.message_text.entry(message_id.clone()).or_default();
entry.push_str(&delta);
let mut conversions = Vec::new();
if let Some(start) = self.ensure_message_started(&message_id) {
conversions.push(start);
}
conversions.push(item_delta(Some(message_id), delta));
Ok(conversions)
}
"thinking_start" | "thinking_delta" | "thinking_end" => {
let Some(delta) = extract_delta_text(assistant_event) else {
return Ok(Vec::new());
};
let message_id = self.ensure_message_id(message_id);
let entry = self
.message_reasoning
.entry(message_id.clone())
.or_default();
entry.push_str(&delta);
let mut conversions = Vec::new();
if let Some(start) = self.ensure_message_started(&message_id) {
conversions.push(start);
}
conversions.push(item_delta(Some(message_id), delta));
Ok(conversions)
}
"toolcall_start"
| "toolcall_delta"
| "toolcall_end"
| "toolcall_args_start"
| "toolcall_args_delta"
| "toolcall_args_end" => Ok(Vec::new()),
"done" => {
let message_id = self.ensure_message_id(message_id);
if self.message_errors.remove(&message_id) {
self.message_text.remove(&message_id);
self.message_reasoning.remove(&message_id);
self.message_started.remove(&message_id);
self.clear_last_message_id(Some(&message_id));
return Ok(Vec::new());
}
let message = raw
.get("message")
.or_else(|| assistant_event.get("message"));
let conversion = self.complete_message(Some(message_id.clone()), message);
self.clear_last_message_id(Some(&message_id));
Ok(vec![conversion])
}
"error" => {
let message_id = self.ensure_message_id(message_id);
let error_text = assistant_event
.get("error")
.or_else(|| raw.get("error"))
.map(value_to_string)
.unwrap_or_else(|| "Pi message error".to_string());
self.message_reasoning.remove(&message_id);
self.message_text.remove(&message_id);
self.message_errors.insert(message_id.clone());
self.message_started.remove(&message_id);
self.clear_last_message_id(Some(&message_id));
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(message_id),
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content: vec![ContentPart::Text { text: error_text }],
status: ItemStatus::Failed,
};
Ok(vec![EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
)])
}
other => Err(format!("unsupported assistantMessageEvent: {other}")),
}
}
fn message_end(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let message = raw.get("message");
if is_user_role(message) {
return Ok(Vec::new());
}
let message_id = self
.ensure_message_id(extract_message_id(raw).or_else(|| self.last_message_id.clone()));
if self.message_errors.remove(&message_id) {
self.message_text.remove(&message_id);
self.message_reasoning.remove(&message_id);
self.message_started.remove(&message_id);
self.clear_last_message_id(Some(&message_id));
return Ok(Vec::new());
}
let conversion = self.complete_message(Some(message_id.clone()), message);
self.clear_last_message_id(Some(&message_id));
Ok(vec![conversion])
}
fn complete_message(
&mut self,
message_id: Option<String>,
message: Option<&Value>,
) -> EventConversion {
let mut content = message.and_then(parse_message_content).unwrap_or_default();
if let Some(id) = message_id.clone() {
if content.is_empty() {
if let Some(text) = self.message_text.remove(&id) {
if !text.is_empty() {
content.push(ContentPart::Text { text });
}
}
} else {
self.message_text.remove(&id);
}
if let Some(reasoning) = self.message_reasoning.remove(&id) {
if !reasoning.trim().is_empty()
&& !content
.iter()
.any(|part| matches!(part, ContentPart::Reasoning { .. }))
{
content.push(ContentPart::Reasoning {
text: reasoning,
visibility: ReasoningVisibility::Private,
});
}
}
self.message_started.remove(&id);
}
let item = UniversalItem {
item_id: String::new(),
native_item_id: message_id,
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content,
status: ItemStatus::Completed,
};
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
)
}
fn tool_execution_start(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let tool_call_id =
extract_tool_call_id(raw).ok_or_else(|| "missing toolCallId".to_string())?;
let tool_name = extract_tool_name(raw).unwrap_or_else(|| "tool".to_string());
let arguments = raw
.get("args")
.or_else(|| raw.get("arguments"))
.map(value_to_string)
.unwrap_or_else(|| "{}".to_string());
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(tool_call_id.clone()),
parent_id: None,
kind: ItemKind::ToolCall,
role: Some(ItemRole::Assistant),
content: vec![ContentPart::ToolCall {
name: tool_name,
arguments,
call_id: tool_call_id,
}],
status: ItemStatus::InProgress,
};
Ok(vec![EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
)])
}
fn tool_execution_update(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let tool_call_id = match extract_tool_call_id(raw) {
Some(id) => id,
None => return Ok(Vec::new()),
};
let partial = match raw
.get("partialResult")
.or_else(|| raw.get("partial_result"))
{
Some(value) => value_to_string(value),
None => return Ok(Vec::new()),
};
let prior = self
.tool_result_buffers
.get(&tool_call_id)
.cloned()
.unwrap_or_default();
let delta = delta_from_partial(&prior, &partial);
self.tool_result_buffers
.insert(tool_call_id.clone(), partial);
let mut conversions = Vec::new();
if self.tool_result_started.insert(tool_call_id.clone()) {
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(tool_call_id.clone()),
parent_id: None,
kind: ItemKind::ToolResult,
role: Some(ItemRole::Tool),
content: vec![ContentPart::ToolResult {
call_id: tool_call_id.clone(),
output: String::new(),
}],
status: ItemStatus::InProgress,
};
conversions.push(
EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
)
.synthetic(),
);
}
if !delta.is_empty() {
conversions.push(
EventConversion::new(
UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(),
native_item_id: Some(tool_call_id.clone()),
delta,
}),
)
.synthetic(),
);
}
Ok(conversions)
}
fn tool_execution_end(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let tool_call_id =
extract_tool_call_id(raw).ok_or_else(|| "missing toolCallId".to_string())?;
self.tool_result_buffers.remove(&tool_call_id);
self.tool_result_started.remove(&tool_call_id);
let output = raw
.get("result")
.and_then(extract_result_content)
.unwrap_or_default();
let is_error = raw.get("isError").and_then(Value::as_bool).unwrap_or(false);
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(tool_call_id.clone()),
parent_id: None,
kind: ItemKind::ToolResult,
role: Some(ItemRole::Tool),
content: vec![ContentPart::ToolResult {
call_id: tool_call_id,
output,
}],
status: if is_error {
ItemStatus::Failed
} else {
ItemStatus::Completed
},
};
Ok(vec![EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
)])
}
}
pub fn event_to_universal(event: &schema::RpcEvent) -> Result<Vec<EventConversion>, String> {
PiEventConverter::default().event_to_universal(event)
}
fn attach_metadata(
conversion: EventConversion,
native_session_id: &Option<String>,
raw: &Value,
) -> EventConversion {
conversion
.with_native_session(native_session_id.clone())
.with_raw(Some(raw.clone()))
}
fn status_event(label: &str, raw: &Value) -> EventConversion {
let detail = raw
.get("error")
.or_else(|| raw.get("message"))
.map(value_to_string);
let item = UniversalItem {
item_id: String::new(),
native_item_id: None,
parent_id: None,
kind: ItemKind::Status,
role: Some(ItemRole::System),
content: vec![ContentPart::Status {
label: format!("pi.{label}"),
detail,
}],
status: ItemStatus::Completed,
};
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
)
}
fn item_delta(message_id: Option<String>, delta: String) -> EventConversion {
EventConversion::new(
UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(),
native_item_id: message_id,
delta,
}),
)
}
fn is_user_role(message: Option<&Value>) -> bool {
message
.and_then(|msg| msg.get("role"))
.and_then(Value::as_str)
.is_some_and(|role| role == "user")
}
fn extract_session_id(value: &Value) -> Option<String> {
extract_string(value, &["sessionId"])
.or_else(|| extract_string(value, &["session_id"]))
.or_else(|| extract_string(value, &["session", "id"]))
.or_else(|| extract_string(value, &["message", "sessionId"]))
}
fn extract_message_id(value: &Value) -> Option<String> {
extract_string(value, &["messageId"])
.or_else(|| extract_string(value, &["message_id"]))
.or_else(|| extract_string(value, &["message", "id"]))
.or_else(|| extract_string(value, &["message", "messageId"]))
.or_else(|| extract_string(value, &["assistantMessageEvent", "messageId"]))
}
fn extract_tool_call_id(value: &Value) -> Option<String> {
extract_string(value, &["toolCallId"]).or_else(|| extract_string(value, &["tool_call_id"]))
}
fn extract_tool_name(value: &Value) -> Option<String> {
extract_string(value, &["toolName"]).or_else(|| extract_string(value, &["tool_name"]))
}
fn extract_string(value: &Value, path: &[&str]) -> Option<String> {
let mut current = value;
for key in path {
current = current.get(*key)?;
}
current.as_str().map(|value| value.to_string())
}
fn extract_delta_text(event: &Value) -> Option<String> {
if let Some(value) = event.get("delta") {
return Some(value_to_string(value));
}
if let Some(value) = event.get("text") {
return Some(value_to_string(value));
}
if let Some(value) = event.get("partial") {
return extract_text_from_value(value);
}
if let Some(value) = event.get("content") {
return extract_text_from_value(value);
}
None
}
fn extract_text_from_value(value: &Value) -> Option<String> {
if let Some(text) = value.as_str() {
return Some(text.to_string());
}
if let Some(text) = value.get("text").and_then(Value::as_str) {
return Some(text.to_string());
}
if let Some(text) = value.get("content").and_then(Value::as_str) {
return Some(text.to_string());
}
None
}
fn extract_result_content(value: &Value) -> Option<String> {
let content = value.get("content").and_then(Value::as_str);
let text = value.get("text").and_then(Value::as_str);
content
.or(text)
.map(|value| value.to_string())
.or_else(|| Some(value_to_string(value)))
}
fn parse_message_content(message: &Value) -> Option<Vec<ContentPart>> {
if let Some(text) = message.as_str() {
return Some(vec![ContentPart::Text {
text: text.to_string(),
}]);
}
let content_value = message
.get("content")
.or_else(|| message.get("text"))
.or_else(|| message.get("value"))?;
let mut parts = Vec::new();
match content_value {
Value::String(text) => parts.push(ContentPart::Text { text: text.clone() }),
Value::Array(items) => {
for item in items {
if let Some(part) = content_part_from_value(item) {
parts.push(part);
}
}
}
Value::Object(_) => {
if let Some(part) = content_part_from_value(content_value) {
parts.push(part);
}
}
_ => {}
}
Some(parts)
}
fn content_part_from_value(value: &Value) -> Option<ContentPart> {
if let Some(text) = value.as_str() {
return Some(ContentPart::Text {
text: text.to_string(),
});
}
let part_type = value.get("type").and_then(Value::as_str);
match part_type {
Some("text") | Some("markdown") => {
extract_text_from_value(value).map(|text| ContentPart::Text { text })
}
Some("thinking") | Some("reasoning") => {
extract_text_from_value(value).map(|text| ContentPart::Reasoning {
text,
visibility: ReasoningVisibility::Private,
})
}
Some("image") => value
.get("path")
.or_else(|| value.get("url"))
.and_then(|path| {
path.as_str().map(|path| ContentPart::Image {
path: path.to_string(),
mime: value
.get("mime")
.or_else(|| value.get("mimeType"))
.and_then(Value::as_str)
.map(|mime| mime.to_string()),
})
}),
Some("tool_call") | Some("toolcall") => {
let name = value
.get("name")
.and_then(Value::as_str)
.unwrap_or("tool")
.to_string();
let arguments = value
.get("arguments")
.or_else(|| value.get("args"))
.map(value_to_string)
.unwrap_or_else(|| "{}".to_string());
let call_id = value
.get("call_id")
.or_else(|| value.get("callId"))
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
Some(ContentPart::ToolCall {
name,
arguments,
call_id,
})
}
Some("tool_result") => {
let call_id = value
.get("call_id")
.or_else(|| value.get("callId"))
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let output = value
.get("output")
.or_else(|| value.get("content"))
.map(value_to_string)
.unwrap_or_default();
Some(ContentPart::ToolResult { call_id, output })
}
_ => Some(ContentPart::Json {
json: value.clone(),
}),
}
}
fn value_to_string(value: &Value) -> String {
if let Some(text) = value.as_str() {
text.to_string()
} else {
value.to_string()
}
}
fn delta_from_partial(previous: &str, next: &str) -> String {
if next.starts_with(previous) {
next[previous.len()..].to_string()
} else {
next.to_string()
}
}

View file

@ -3,13 +3,13 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use utoipa::ToSchema;
pub use sandbox_agent_extracted_agent_schemas::{amp, claude, codex, opencode};
pub use sandbox_agent_extracted_agent_schemas::{amp, claude, codex, opencode, pi};
pub mod agents;
pub use agents::{
amp as convert_amp, claude as convert_claude, codex as convert_codex,
opencode as convert_opencode,
amp as convert_amp, claude as convert_claude, codex as convert_codex, opencode as convert_opencode,
pi as convert_pi,
};
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
@ -204,7 +204,7 @@ pub enum ItemKind {
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum ItemRole {
User,
@ -213,7 +213,7 @@ pub enum ItemRole {
Tool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum ItemStatus {
InProgress,

View file

@ -0,0 +1,264 @@
use sandbox_agent_universal_agent_schema::convert_pi::PiEventConverter;
use sandbox_agent_universal_agent_schema::pi as pi_schema;
use sandbox_agent_universal_agent_schema::{
ContentPart, ItemKind, ItemRole, ItemStatus, UniversalEventData, UniversalEventType,
};
use serde_json::json;
fn parse_event(value: serde_json::Value) -> pi_schema::RpcEvent {
serde_json::from_value(value).expect("pi event")
}
#[test]
fn pi_message_flow_converts() {
let mut converter = PiEventConverter::default();
let start_event = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello" }]
}
}));
let start_events = converter
.event_to_universal(&start_event)
.expect("start conversions");
assert_eq!(start_events[0].event_type, UniversalEventType::ItemStarted);
if let UniversalEventData::Item(item) = &start_events[0].data {
assert_eq!(item.item.kind, ItemKind::Message);
assert_eq!(item.item.role, Some(ItemRole::Assistant));
assert_eq!(item.item.status, ItemStatus::InProgress);
} else {
panic!("expected item event");
}
let update_event = parse_event(json!({
"type": "message_update",
"sessionId": "session-1",
"messageId": "msg-1",
"assistantMessageEvent": { "type": "text_delta", "delta": " world" }
}));
let update_events = converter
.event_to_universal(&update_event)
.expect("update conversions");
assert_eq!(update_events[0].event_type, UniversalEventType::ItemDelta);
let end_event = parse_event(json!({
"type": "message_end",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello world" }]
}
}));
let end_events = converter
.event_to_universal(&end_event)
.expect("end conversions");
assert_eq!(end_events[0].event_type, UniversalEventType::ItemCompleted);
if let UniversalEventData::Item(item) = &end_events[0].data {
assert_eq!(item.item.kind, ItemKind::Message);
assert_eq!(item.item.role, Some(ItemRole::Assistant));
assert_eq!(item.item.status, ItemStatus::Completed);
} else {
panic!("expected item event");
}
}
#[test]
fn pi_user_message_echo_is_skipped() {
let mut converter = PiEventConverter::default();
// Pi may echo the user message as a message_start with role "user".
// The daemon already records synthetic user events, so the converter
// must skip these to avoid a duplicate assistant-looking bubble.
let start_event = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "user-msg-1",
"message": {
"role": "user",
"content": [{ "type": "text", "text": "hello!" }]
}
}));
let events = converter
.event_to_universal(&start_event)
.expect("user message_start should not error");
assert!(
events.is_empty(),
"user message_start should produce no events, got {}",
events.len()
);
let end_event = parse_event(json!({
"type": "message_end",
"sessionId": "session-1",
"messageId": "user-msg-1",
"message": {
"role": "user",
"content": [{ "type": "text", "text": "hello!" }]
}
}));
let events = converter
.event_to_universal(&end_event)
.expect("user message_end should not error");
assert!(
events.is_empty(),
"user message_end should produce no events, got {}",
events.len()
);
// A subsequent assistant message should still work normally.
let assistant_start = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello! How can I help?" }]
}
}));
let events = converter
.event_to_universal(&assistant_start)
.expect("assistant message_start");
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, UniversalEventType::ItemStarted);
if let UniversalEventData::Item(item) = &events[0].data {
assert_eq!(item.item.role, Some(ItemRole::Assistant));
} else {
panic!("expected item event");
}
}
#[test]
fn pi_tool_execution_converts_with_partial_deltas() {
let mut converter = PiEventConverter::default();
let start_event = parse_event(json!({
"type": "tool_execution_start",
"sessionId": "session-1",
"toolCallId": "call-1",
"toolName": "bash",
"args": { "command": "ls" }
}));
let start_events = converter
.event_to_universal(&start_event)
.expect("tool start");
assert_eq!(start_events[0].event_type, UniversalEventType::ItemStarted);
if let UniversalEventData::Item(item) = &start_events[0].data {
assert_eq!(item.item.kind, ItemKind::ToolCall);
assert_eq!(item.item.role, Some(ItemRole::Assistant));
match &item.item.content[0] {
ContentPart::ToolCall { name, .. } => assert_eq!(name, "bash"),
_ => panic!("expected tool call content"),
}
}
let update_event = parse_event(json!({
"type": "tool_execution_update",
"sessionId": "session-1",
"toolCallId": "call-1",
"partialResult": "foo"
}));
let update_events = converter
.event_to_universal(&update_event)
.expect("tool update");
assert!(update_events
.iter()
.any(|event| event.event_type == UniversalEventType::ItemDelta));
let update_event2 = parse_event(json!({
"type": "tool_execution_update",
"sessionId": "session-1",
"toolCallId": "call-1",
"partialResult": "foobar"
}));
let update_events2 = converter
.event_to_universal(&update_event2)
.expect("tool update 2");
let delta = update_events2
.iter()
.find_map(|event| match &event.data {
UniversalEventData::ItemDelta(data) => Some(data.delta.clone()),
_ => None,
})
.unwrap_or_default();
assert_eq!(delta, "bar");
let end_event = parse_event(json!({
"type": "tool_execution_end",
"sessionId": "session-1",
"toolCallId": "call-1",
"result": { "type": "text", "content": "done" },
"isError": false
}));
let end_events = converter.event_to_universal(&end_event).expect("tool end");
assert_eq!(end_events[0].event_type, UniversalEventType::ItemCompleted);
if let UniversalEventData::Item(item) = &end_events[0].data {
assert_eq!(item.item.kind, ItemKind::ToolResult);
assert_eq!(item.item.role, Some(ItemRole::Tool));
match &item.item.content[0] {
ContentPart::ToolResult { output, .. } => assert_eq!(output, "done"),
_ => panic!("expected tool result content"),
}
}
}
#[test]
fn pi_unknown_event_returns_error() {
let mut converter = PiEventConverter::default();
let event = parse_event(json!({
"type": "unknown_event",
"sessionId": "session-1"
}));
assert!(converter.event_to_universal(&event).is_err());
}
#[test]
fn pi_message_done_completes_without_message_end() {
let mut converter = PiEventConverter::default();
let start_event = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello" }]
}
}));
let _start_events = converter
.event_to_universal(&start_event)
.expect("start conversions");
let update_event = parse_event(json!({
"type": "message_update",
"sessionId": "session-1",
"messageId": "msg-1",
"assistantMessageEvent": { "type": "text_delta", "delta": " world" }
}));
let _update_events = converter
.event_to_universal(&update_event)
.expect("update conversions");
let done_event = parse_event(json!({
"type": "message_update",
"sessionId": "session-1",
"messageId": "msg-1",
"assistantMessageEvent": { "type": "done" }
}));
let done_events = converter
.event_to_universal(&done_event)
.expect("done conversions");
assert_eq!(done_events[0].event_type, UniversalEventType::ItemCompleted);
if let UniversalEventData::Item(item) = &done_events[0].data {
assert_eq!(item.item.status, ItemStatus::Completed);
assert!(
matches!(item.item.content.get(0), Some(ContentPart::Text { text }) if text == "Hello world")
);
} else {
panic!("expected item event");
}
}