From 843498e9db5e9edaf9ea19d21af4008bbec92b3a Mon Sep 17 00:00:00 2001 From: Franklin Date: Thu, 5 Feb 2026 17:06:53 -0500 Subject: [PATCH] support pi --- Cargo.toml | 2 +- README.md | 10 +- docs/cli.mdx | 2 +- docs/conversion.mdx | 53 +- docs/openapi.json | 2 +- docs/pi-support-plan.md | 200 +++++ docs/session-transcript-schema.mdx | 36 +- frontend/packages/inspector/src/App.tsx | 20 +- .../src/components/SessionSidebar.tsx | 1 + .../src/components/chat/ChatPanel.tsx | 1 + frontend/packages/website/public/logos/pi.svg | 22 + .../website/src/components/CTASection.tsx | 2 +- .../packages/website/src/components/FAQ.tsx | 4 +- .../website/src/components/FeatureGrid.tsx | 2 +- .../packages/website/src/components/Hero.tsx | 11 +- .../website/src/components/PainPoints.tsx | 2 +- .../website/src/components/ProblemsSolved.tsx | 2 +- .../artifacts/json-schema/pi.json | 124 +++ resources/agent-schemas/package.json | 1 + resources/agent-schemas/src/index.ts | 4 +- resources/agent-schemas/src/pi.ts | 191 +++++ .../packages/agent-management/src/agents.rs | 90 +- .../agent-management/src/http_client.rs | 20 + server/packages/agent-management/src/lib.rs | 1 + .../packages/agent-management/src/testing.rs | 42 +- .../packages/extracted-agent-schemas/build.rs | 1 + .../extracted-agent-schemas/src/lib.rs | 6 + .../packages/sandbox-agent/src/http_client.rs | 30 + server/packages/sandbox-agent/src/lib.rs | 1 + server/packages/sandbox-agent/src/main.rs | 29 +- server/packages/sandbox-agent/src/router.rs | 796 +++++++++++++++++- .../packages/sandbox-agent/src/telemetry.rs | 4 +- .../sandbox-agent/tests/agent-flows/mod.rs | 1 + .../tests/agent-flows/pi_rpc_integration.rs | 61 ++ .../tests/agent-management/agents.rs | 29 +- .../sandbox-agent/tests/common/http.rs | 2 +- .../sandbox-agent/tests/common/mod.rs | 2 +- .../universal-agent-schema/src/agents/mod.rs | 1 + .../universal-agent-schema/src/agents/pi.rs | 674 +++++++++++++++ .../universal-agent-schema/src/lib.rs | 10 +- .../tests/pi_conversion.rs | 264 ++++++ 41 files changed, 2654 insertions(+), 102 deletions(-) create mode 100644 docs/pi-support-plan.md create mode 100644 frontend/packages/website/public/logos/pi.svg create mode 100644 resources/agent-schemas/artifacts/json-schema/pi.json create mode 100644 resources/agent-schemas/src/pi.ts create mode 100644 server/packages/agent-management/src/http_client.rs create mode 100644 server/packages/sandbox-agent/src/http_client.rs create mode 100644 server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs create mode 100644 server/packages/universal-agent-schema/src/agents/pi.rs create mode 100644 server/packages/universal-agent-schema/tests/pi_conversion.rs diff --git a/Cargo.toml b/Cargo.toml index a30a351..f7b9861 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" authors = [ "Rivet Gaming, LLC " ] license = "Apache-2.0" repository = "https://github.com/rivet-dev/sandbox-agent" -description = "Universal API for automatic coding agents in sandboxes. Supprots Claude Code, Codex, OpenCode, and Amp." +description = "Universal API for automatic coding agents in sandboxes. Supports Claude Code, Codex, OpenCode, Amp, and Pi." [workspace.dependencies] # Internal crates diff --git a/README.md b/README.md index f5d7b77..ffc0728 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@

Run Coding Agents in Sandboxes. Control Them Over HTTP.

- A server that runs inside your sandbox. Your app connects remotely to control Claude Code, Codex, OpenCode, or Amp — streaming events, handling permissions, managing sessions. + A server that runs inside your sandbox. Your app connects remotely to control Claude Code, Codex, OpenCode, Amp, or Pi — streaming events, handling permissions, managing sessions.

@@ -20,13 +20,13 @@ Sandbox Agent solves three problems: 1. **Coding agents need sandboxes** — You can't let AI execute arbitrary code on your production servers. Coding agents need isolated environments, but existing SDKs assume local execution. Sandbox Agent is a server that runs inside the sandbox and exposes HTTP/SSE. -2. **Every coding agent is different** — Claude Code, Codex, OpenCode, and Amp each have proprietary APIs, event formats, and behaviors. Swapping agents means rewriting your integration. Sandbox Agent provides one HTTP API — write your code once, swap agents with a config change. +2. **Every coding agent is different** — Claude Code, Codex, OpenCode, Amp, and Pi each have proprietary APIs, event formats, and behaviors. Swapping agents means rewriting your integration. Sandbox Agent provides one HTTP API — write your code once, swap agents with a config change. 3. **Sessions are ephemeral** — Agent transcripts live in the sandbox. When the process ends, you lose everything. Sandbox Agent streams events in a universal schema to your storage. Persist to Postgres, ClickHouse, or [Rivet](https://rivet.dev). Replay later, audit everything. ## Features -- **Universal Agent API**: Single interface to control Claude Code, Codex, OpenCode, and Amp with full feature coverage +- **Universal Agent API**: Single interface to control Claude Code, Codex, OpenCode, Amp, and Pi with full feature coverage - **Streaming Events**: Real-time SSE stream of everything the agent does — tool calls, permission requests, file edits, and more - **Universal Session Schema**: [Standardized schema](https://sandboxagent.dev/docs/session-transcript-schema) that normalizes all agent event formats for storage and replay - **Human-in-the-Loop**: Approve or deny tool executions and answer agent questions remotely over HTTP @@ -227,7 +227,7 @@ No, they're complementary. AI SDK is for building chat interfaces and calling LL

Which coding agents are supported? -Claude Code, Codex, OpenCode, and Amp. The SDK normalizes their APIs so you can swap between them without changing your code. +Claude Code, Codex, OpenCode, Amp, and Pi. The SDK normalizes their APIs so you can swap between them without changing your code.
@@ -251,7 +251,7 @@ The server is a single Rust binary that runs anywhere with a curl install. If yo
Can I use this with my personal API keys? -Yes. Use `sandbox-agent credentials extract-env` to extract API keys from your local agent configs (Claude Code, Codex, OpenCode, Amp) and pass them to the sandbox environment. +Yes. Use `sandbox-agent credentials extract-env` to extract API keys from your local agent configs (Claude Code, Codex, OpenCode, Amp, Pi) and pass them to the sandbox environment.
diff --git a/docs/cli.mdx b/docs/cli.mdx index 855bd44..4a6e118 100644 --- a/docs/cli.mdx +++ b/docs/cli.mdx @@ -61,7 +61,7 @@ sandbox-agent credentials extract [OPTIONS] | Option | Description | |--------|-------------| -| `-a, --agent ` | Filter by agent (`claude`, `codex`, `opencode`, `amp`) | +| `-a, --agent ` | Filter by agent (`claude`, `codex`, `opencode`, `amp`, `pi`) | | `-p, --provider ` | Filter by provider (`anthropic`, `openai`) | | `-d, --home-dir ` | Custom home directory for credential search | | `-r, --reveal` | Show full credential values (default: redacted) | diff --git a/docs/conversion.mdx b/docs/conversion.mdx index d155ab4..647c1b5 100644 --- a/docs/conversion.mdx +++ b/docs/conversion.mdx @@ -4,14 +4,14 @@ Source of truth: generated agent schemas in `resources/agent-schemas/artifacts/j Identifiers -+----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+ -| Universal term | Claude | Codex (app-server) | OpenCode | Amp | -+----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+ -| session_id | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) | -| native_session_id | none | threadId | sessionID | none | -| item_id | synthetic | ThreadItem.id | Message.id | StreamJSONMessage.id | -| native_item_id | none | ThreadItem.id | Message.id | StreamJSONMessage.id | -+----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+ ++----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+----------------------+ +| Universal term | Claude | Codex (app-server) | OpenCode | Amp | Pi (RPC) | ++----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+----------------------+ +| session_id | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) | +| native_session_id | none | threadId | sessionID | none | sessionId | +| item_id | synthetic | ThreadItem.id | Message.id | StreamJSONMessage.id | messageId/toolCallId | +| native_item_id | none | ThreadItem.id | Message.id | StreamJSONMessage.id | messageId/toolCallId | ++----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+----------------------+ Notes: - When a provider does not supply IDs (Claude), we synthesize item_id values and keep native_item_id null. @@ -24,22 +24,22 @@ Notes: Events / Message Flow -+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+ -| Universal term | Claude | Codex (app-server) | OpenCode | Amp | -+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+ -| session.started | none | method=thread/started | type=session.created | none | -| session.ended | SDKMessage.type=result | no explicit session end (turn/completed) | no explicit session end (session.deleted)| type=done | -| message (user) | SDKMessage.type=user | item/completed (ThreadItem.type=userMessage)| message.updated (Message.role=user) | type=message | -| message (assistant) | SDKMessage.type=assistant | item/completed (ThreadItem.type=agentMessage)| message.updated (Message.role=assistant)| type=message | -| message.delta | stream_event (partial) or synthetic | method=item/agentMessage/delta | type=message.part.updated (delta) | synthetic | -| tool call | type=tool_use | method=item/mcpToolCall/progress | message.part.updated (part.type=tool) | type=tool_call | -| tool result | user.message.content.tool_result | item/completed (tool result ThreadItem variants) | message.part.updated (part.type=tool, state=completed) | type=tool_result | -| permission.requested | control_request.can_use_tool | none | type=permission.asked | none | -| permission.resolved | daemon reply to can_use_tool | none | type=permission.replied | none | -| question.requested | tool_use (AskUserQuestion) | experimental request_user_input (payload) | type=question.asked | none | -| question.resolved | tool_result (AskUserQuestion) | experimental request_user_input (payload) | type=question.replied / question.rejected | none | -| error | SDKResultMessage.error | method=error | type=session.error (or message error) | type=error | -+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+ ++------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+----------------------------+ +| Universal term | Claude | Codex (app-server) | OpenCode | Amp | Pi (RPC) | ++------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+----------------------------+ +| session.started | none | method=thread/started | type=session.created | none | none | +| session.ended | SDKMessage.type=result | no explicit session end (turn/completed) | no explicit session end (session.deleted)| type=done | none (daemon synthetic) | +| message (user) | SDKMessage.type=user | item/completed (ThreadItem.type=userMessage)| message.updated (Message.role=user) | type=message | none (daemon synthetic) | +| message (assistant) | SDKMessage.type=assistant | item/completed (ThreadItem.type=agentMessage)| message.updated (Message.role=assistant)| type=message | message_start/message_end | +| message.delta | stream_event (partial) or synthetic | method=item/agentMessage/delta | type=message.part.updated (delta) | synthetic | message_update (text_delta/thinking_delta) | +| tool call | type=tool_use | method=item/mcpToolCall/progress | message.part.updated (part.type=tool) | type=tool_call | tool_execution_start | +| tool result | user.message.content.tool_result | item/completed (tool result ThreadItem variants) | message.part.updated (part.type=tool, state=completed) | type=tool_result | tool_execution_end | +| permission.requested | control_request.can_use_tool | none | type=permission.asked | none | none | +| permission.resolved | daemon reply to can_use_tool | none | type=permission.replied | none | none | +| question.requested | tool_use (AskUserQuestion) | experimental request_user_input (payload) | type=question.asked | none | none | +| question.resolved | tool_result (AskUserQuestion) | experimental request_user_input (payload) | type=question.replied / question.rejected | none | none | +| error | SDKResultMessage.error | method=error | type=session.error (or message error) | type=error | hook_error (status item) | ++------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+----------------------------+ Synthetics @@ -64,6 +64,7 @@ Delta handling - Codex emits agent message and other deltas (e.g., item/agentMessage/delta). - OpenCode emits part deltas via message.part.updated with a delta string. - Claude can emit stream_event deltas when partial streaming is enabled; Amp does not emit deltas. +- Pi emits message_update deltas and cumulative tool_execution_update partialResult values (we diff to produce deltas). Policy: - Always emit item.delta across all providers. @@ -80,3 +81,7 @@ Message normalization notes - OpenCode unrolling: message.updated creates/updates the parent message item; tool-related parts emit separate tool item events (item.started/ item.completed) with parent_id pointing to the message item. - If a message.part.updated arrives before message.updated, we create a stub item.started (source=daemon) so deltas have a parent. - Tool calls/results are always emitted as separate tool items to keep behavior consistent across agents. +- If Pi message_update events omit messageId, we synthesize a stable message id and emit a synthetic item.started before the first delta so streaming text stays grouped. +- Pi auto_compaction_start/auto_compaction_end and auto_retry_start/auto_retry_end events are mapped to status items (label `pi.*`). +- Pi extension_ui_request/extension_error events are mapped to status items. +- Pi RPC from pi-coding-agent does not include sessionId in events; we route events to the current Pi session (single-session semantics). diff --git a/docs/openapi.json b/docs/openapi.json index 76c76f0..06b7e41 100644 --- a/docs/openapi.json +++ b/docs/openapi.json @@ -2,7 +2,7 @@ "openapi": "3.0.3", "info": { "title": "sandbox-agent", - "description": "Universal API for automatic coding agents in sandboxes. Supprots Claude Code, Codex, OpenCode, and Amp.", + "description": "Universal API for automatic coding agents in sandboxes. Supports Claude Code, Codex, OpenCode, Amp, and Pi.", "contact": { "name": "Rivet Gaming, LLC", "email": "developer@rivet.gg" diff --git a/docs/pi-support-plan.md b/docs/pi-support-plan.md new file mode 100644 index 0000000..43f404e --- /dev/null +++ b/docs/pi-support-plan.md @@ -0,0 +1,200 @@ +# Pi Agent Support Plan (pi-mono) + +## Investigation Summary + +### Pi CLI modes and RPC protocol +- Pi supports multiple modes including interactive, print/JSON output, RPC, and SDK usage. JSON mode outputs a stream of JSON events suitable for parsing, and RPC mode is intended for programmatic control over stdin/stdout. +- RPC mode is started with `pi --mode rpc` and supports options like `--provider`, `--model`, `--no-session`, and `--session-dir`. +- The RPC protocol is newline-delimited JSON over stdin/stdout: + - Commands are JSON objects written to stdin. + - Responses are JSON objects with `type: "response"` and optional `id`. + - Events are JSON objects without `id`. +- `prompt` can include images using `ImageContent` (base64 or URL) alongside text. +- JSON/print mode (`pi -p` or `pi --print --mode json`) produces JSONL for non-interactive parsing and can resume sessions with a token. + +### RPC commands +RPC commands listed in `rpc.md` include: +- `new_session`, `get_state`, `list_sessions`, `delete_session`, `rename_session`, `clear_session` +- `prompt`, `queue_message`, `abort`, `get_queued_messages` + +### RPC event types +RPC events listed in `rpc.md` include: +- `agent_start`, `agent_end` +- `turn_start`, `turn_end` +- `message_start`, `message_update`, `message_end` +- `tool_execution_start`, `tool_execution_update`, `tool_execution_end` +- `auto_compaction`, `auto_retry`, `hook_error` + +`message_update` uses `assistantMessageEvent` deltas such as: +- `start`, `text_start`, `text_delta`, `text_end` +- `thinking_start`, `thinking_delta`, `thinking_end` +- `toolcall_start`, `toolcall_delta`, `toolcall_end` +- `toolcall_args_start`, `toolcall_args_delta`, `toolcall_args_end` +- `done`, `error` + +`tool_execution_update` includes `partialResult`, which is described as accumulated output so far. + +### Schema source locations (pi-mono) +RPC types are documented as living in: +- `packages/ai/src/types.ts` (Model types) +- `packages/agent/src/types.ts` (AgentResponse types) +- `packages/coding-agent/src/core/messages.ts` (message types) +- `packages/coding-agent/src/modes/rpc/rpc-types.ts` (RPC protocol types) + +### Distribution assets +Pi releases provide platform-specific binaries such as: +- `pi-darwin-arm64`, `pi-darwin-x64` +- `pi-linux-arm64`, `pi-linux-x64` +- `pi-win-x64.zip` + +## Integration Decisions +- Follow the OpenCode pattern: a shared long-running process (stdio RPC) with session multiplexing. +- Primary integration path is RPC streaming (`pi --mode rpc`). +- JSON/print mode is a fallback only (diagnostics or non-interactive runs). +- Create sessions via `new_session`; store the returned `sessionId` as `native_session_id`. +- Use `get_state` as a re-sync path after server restarts. +- Use `prompt` for send-message, with optional image content. +- Convert Pi events into universal events; emit daemon synthetic `session.started` on session creation and `session.ended` only on errors/termination. + +## Implementation Plan + +### 1) Agent Identity + Capabilities +Files: +- `server/packages/agent-management/src/agents.rs` +- `server/packages/sandbox-agent/src/router.rs` +- `docs/cli.mdx`, `docs/conversion.mdx`, `docs/session-transcript-schema.mdx` +- `README.md`, `frontend/packages/website/src/components/FAQ.tsx` + +Tasks: +- Add `AgentId::Pi` with string/binary name `"pi"` and parsing rules. +- Add Pi to `all_agents()` and agent lists. +- Define `AgentCapabilities` for Pi: + - `tool_calls=true`, `tool_results=true` + - `text_messages=true`, `streaming_deltas=true`, `item_started=true` + - `reasoning=true` (from `thinking_*` deltas) + - `images=true` (ImageContent in `prompt`) + - `permissions=false`, `questions=false`, `mcp_tools=false` + - `shared_process=true`, `session_lifecycle=false` (no native session events) + - `error_events=true` (hook_error) + - `command_execution=false`, `file_changes=false`, `file_attachments=false` + +### 2) Installer and Binary Resolution +Files: +- `server/packages/agent-management/src/agents.rs` + +Tasks: +- Add `install_pi()` that: + - Downloads the correct release asset per platform (`pi-`). + - Handles `.zip` on Windows and raw binaries elsewhere. + - Marks binary executable. +- Add Pi to `AgentManager::install`, `is_installed`, `version`. +- Version detection: try `--version`, `version`, `-V`. + +### 3) Schema Extraction for Pi +Files: +- `resources/agent-schemas/src/pi.ts` (new) +- `resources/agent-schemas/src/index.ts` +- `resources/agent-schemas/artifacts/json-schema/pi.json` +- `server/packages/extracted-agent-schemas/build.rs` +- `server/packages/extracted-agent-schemas/src/lib.rs` + +Tasks: +- Implement `extractPiSchema()`: + - Download pi-mono sources (zip/tarball) into a temp dir. + - Use `ts-json-schema-generator` against `packages/coding-agent/src/modes/rpc/rpc-types.ts`. + - Include dependent files per `rpc.md` (ai/types, agent/types, core/messages). + - Extract `RpcEvent`, `RpcResponse`, `RpcCommand` unions (exact type names from source). +- Add fallback schema if remote fetch fails (minimal union with event/response fields). +- Wire pi into extractor index and artifact generation. + +### 4) Universal Schema Conversion (Pi -> Universal) +Files: +- `server/packages/universal-agent-schema/src/agents/pi.rs` (new) +- `server/packages/universal-agent-schema/src/agents/mod.rs` +- `server/packages/universal-agent-schema/src/lib.rs` +- `server/packages/sandbox-agent/src/router.rs` + +Mapping rules: +- `message_start` -> `item.started` (kind=message, role=assistant, native_item_id=messageId) +- `message_update`: + - `text_*` -> `item.delta` (assistant text delta) + - `thinking_*` -> `item.delta` with `ContentPart::Reasoning` (visibility=Private) + - `toolcall_*` and `toolcall_args_*` -> ignore for now (tool_execution_* is authoritative) + - `error` -> `item.completed` with `ItemStatus::Failed` (if no later message_end) +- `message_end` -> `item.completed` (finalize assistant message) +- `tool_execution_start` -> `item.started` (kind=tool_call, ContentPart::ToolCall) +- `tool_execution_update` -> `item.delta` for a synthetic tool_result item: + - Maintain a per-toolCallId buffer to compute delta from accumulated `partialResult`. +- `tool_execution_end` -> `item.completed` (kind=tool_result, output from `result.content`) + - If `isError=true`, set item status to failed. +- `agent_start`, `turn_start`, `turn_end`, `agent_end`, `auto_compaction`, `auto_retry`, `hook_error`: + - Map to `ItemKind::Status` with a label like `pi.agent_start`, `pi.auto_retry`, etc. + - Do not emit `session.ended` for these events. +- If event parsing fails, emit `agent.unparsed` (source=daemon, synthetic=true) and fail tests. + +### 5) Shared RPC Server Integration +Files: +- `server/packages/sandbox-agent/src/router.rs` + +Tasks: +- Add a new managed stdio server type for Pi, similar to Codex: + - Create `PiServer` struct with: + - stdin sender + - pending request map keyed by request id + - per-session native session id mapping + - Extend `ManagedServerKind` to include Pi. + - Add `ensure_pi_server()` and `spawn_pi_server()` using `pi --mode rpc`. + - Add a `handle_pi_server_output()` loop to parse stdout lines into events/responses. +- Session creation: + - On `create_session`, ensure Pi server is running, send `new_session`, store sessionId. + - Register session with `server_manager.register_session` for native mapping. +- Sending messages: + - Use `prompt` command; include sessionId and optional images. + - Emit synthetic `item.started` only if Pi does not emit `message_start`. + +### 6) Router + Streaming Path Changes +Files: +- `server/packages/sandbox-agent/src/router.rs` + +Tasks: +- Add Pi handling to: + - `create_session` (new_session) + - `send_message` (prompt) + - `parse_agent_line` (Pi event conversion) + - `agent_modes` (default to `default` unless Pi exposes a mode list) + - `agent_supports_resume` (true if Pi supports session resume) + +### 7) Tests +Files: +- `server/packages/sandbox-agent/tests/...` +- `server/packages/universal-agent-schema/tests/...` (if present) + +Tasks: +- Unit tests for conversion: + - `message_start/update/end` -> item.started/delta/completed + - `tool_execution_*` -> tool call/result mapping with partialResult delta + - failure -> agent.unparsed +- Integration tests: + - Start Pi RPC server, create session, send prompt, stream events. + - Validate `native_session_id` mapping and event ordering. +- Update HTTP/SSE test coverage to include Pi agent if relevant. + +## Risk Areas / Edge Cases +- `tool_execution_update.partialResult` is cumulative; must compute deltas. +- `message_update` may emit `done`/`error` without `message_end`; handle both paths. +- No native session lifecycle events; rely on daemon synthetic events. +- Session recovery after RPC server restart requires `get_state` + re-register sessions. + +## Acceptance Criteria +- Pi appears in `/v1/agents`, CLI list, and docs. +- `create_session` returns `native_session_id` from Pi `new_session`. +- Streaming prompt yields universal events with proper ordering: + - message -> item.started/delta/completed + - tool execution -> tool call + tool result +- Tests pass and no synthetic data is used in test fixtures. + +## Sources +- https://upd.dev/badlogic/pi-mono/src/commit/d36e0ea07303d8a76d51b4a7bd5f0d6d3c490860/packages/coding-agent/docs/rpc.md +- https://buildwithpi.ai/pi-cli +- https://takopi.dev/docs/pi-cli/ +- https://upd.dev/badlogic/pi-mono/releases diff --git a/docs/session-transcript-schema.mdx b/docs/session-transcript-schema.mdx index a5a0158..7b87029 100644 --- a/docs/session-transcript-schema.mdx +++ b/docs/session-transcript-schema.mdx @@ -12,25 +12,25 @@ The schema is defined in [OpenAPI format](https://github.com/rivet-dev/sandbox-a This table shows which agent feature coverage appears in the universal event stream. All agents retain their full native feature coverage—this only reflects what's normalized into the schema. -| Feature | Claude | Codex | OpenCode | Amp | -|--------------------|:------:|:-----:|:------------:|:------------:| -| Stability | Stable | Stable| Experimental | Experimental | -| Text Messages | ✓ | ✓ | ✓ | ✓ | -| Tool Calls | ✓ | ✓ | ✓ | ✓ | -| Tool Results | ✓ | ✓ | ✓ | ✓ | -| Questions (HITL) | ✓ | | ✓ | | -| Permissions (HITL) | ✓ | ✓ | ✓ | - | -| Images | - | ✓ | ✓ | - | -| File Attachments | - | ✓ | ✓ | - | -| Session Lifecycle | - | ✓ | ✓ | - | -| Error Events | - | ✓ | ✓ | ✓ | -| Reasoning/Thinking | - | ✓ | - | - | -| Command Execution | - | ✓ | - | - | -| File Changes | - | ✓ | - | - | -| MCP Tools | - | ✓ | - | - | -| Streaming Deltas | ✓ | ✓ | ✓ | - | +| Feature | Claude | Codex | OpenCode | Amp | Pi (RPC) | +|--------------------|:------:|:-----:|:------------:|:------------:|:------------:| +| Stability | Stable | Stable| Experimental | Experimental | Experimental | +| Text Messages | ✓ | ✓ | ✓ | ✓ | ✓ | +| Tool Calls | ✓ | ✓ | ✓ | ✓ | ✓ | +| Tool Results | ✓ | ✓ | ✓ | ✓ | ✓ | +| Questions (HITL) | ✓ | | ✓ | | | +| Permissions (HITL) | ✓ | ✓ | ✓ | - | | +| Images | - | ✓ | ✓ | - | ✓ | +| File Attachments | - | ✓ | ✓ | - | | +| Session Lifecycle | - | ✓ | ✓ | - | | +| Error Events | - | ✓ | ✓ | ✓ | ✓ | +| Reasoning/Thinking | - | ✓ | - | - | ✓ | +| Command Execution | - | ✓ | - | - | | +| File Changes | - | ✓ | - | - | | +| MCP Tools | - | ✓ | - | - | | +| Streaming Deltas | ✓ | ✓ | ✓ | - | ✓ | -Agents: [Claude Code](https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview) · [Codex](https://github.com/openai/codex) · [OpenCode](https://github.com/opencode-ai/opencode) · [Amp](https://ampcode.com) +Agents: [Claude Code](https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview) · [Codex](https://github.com/openai/codex) · [OpenCode](https://github.com/opencode-ai/opencode) · [Amp](https://ampcode.com) · [Pi](https://buildwithpi.ai/pi-cli) - ✓ = Appears in session events - \- = Agent supports natively, schema conversion coming soon diff --git a/frontend/packages/inspector/src/App.tsx b/frontend/packages/inspector/src/App.tsx index cd4e118..767e2bb 100644 --- a/frontend/packages/inspector/src/App.tsx +++ b/frontend/packages/inspector/src/App.tsx @@ -19,7 +19,7 @@ import type { RequestLog } from "./types/requestLog"; import { buildCurl } from "./utils/http"; const logoUrl = `${import.meta.env.BASE_URL}logos/sandboxagent.svg`; -const defaultAgents = ["claude", "codex", "opencode", "amp", "mock"]; +const defaultAgents = ["claude", "codex", "opencode", "amp", "pi", "mock"]; type ItemEventData = { item: UniversalItem; @@ -31,6 +31,18 @@ type ItemDeltaEventData = { delta: string; }; +const shouldHidePiStatusItem = (item: UniversalItem) => { + if (item.kind !== "status") return false; + const statusParts = (item.content ?? []).filter( + (part) => (part as { type?: string }).type === "status" + ) as Array<{ label?: string }>; + if (statusParts.length === 0) return false; + return statusParts.every((part) => { + const label = part.label ?? ""; + return label.startsWith("pi.turn_") || label.startsWith("pi.agent_"); + }); +}; + const buildStubItem = (itemId: string, nativeItemId?: string | null): UniversalItem => { return { item_id: itemId, @@ -734,7 +746,10 @@ export default function App() { } } - return entries; + return entries.filter((entry) => { + if (entry.kind !== "item" || !entry.item) return true; + return !shouldHidePiStatusItem(entry.item); + }); }, [events]); useEffect(() => { @@ -841,6 +856,7 @@ export default function App() { codex: "Codex", opencode: "OpenCode", amp: "Amp", + pi: "Pi", mock: "Mock" }; const agentLabel = agentDisplayNames[agentId] ?? agentId; diff --git a/frontend/packages/inspector/src/components/SessionSidebar.tsx b/frontend/packages/inspector/src/components/SessionSidebar.tsx index 4d2a172..996044d 100644 --- a/frontend/packages/inspector/src/components/SessionSidebar.tsx +++ b/frontend/packages/inspector/src/components/SessionSidebar.tsx @@ -45,6 +45,7 @@ const SessionSidebar = ({ codex: "Codex", opencode: "OpenCode", amp: "Amp", + pi: "Pi", mock: "Mock" }; diff --git a/frontend/packages/inspector/src/components/chat/ChatPanel.tsx b/frontend/packages/inspector/src/components/chat/ChatPanel.tsx index 4faea51..7d1a35e 100644 --- a/frontend/packages/inspector/src/components/chat/ChatPanel.tsx +++ b/frontend/packages/inspector/src/components/chat/ChatPanel.tsx @@ -112,6 +112,7 @@ const ChatPanel = ({ codex: "Codex", opencode: "OpenCode", amp: "Amp", + pi: "Pi", mock: "Mock" }; diff --git a/frontend/packages/website/public/logos/pi.svg b/frontend/packages/website/public/logos/pi.svg new file mode 100644 index 0000000..ed14b63 --- /dev/null +++ b/frontend/packages/website/public/logos/pi.svg @@ -0,0 +1,22 @@ + + + + + + + diff --git a/frontend/packages/website/src/components/CTASection.tsx b/frontend/packages/website/src/components/CTASection.tsx index 5328b84..f47f14e 100644 --- a/frontend/packages/website/src/components/CTASection.tsx +++ b/frontend/packages/website/src/components/CTASection.tsx @@ -7,7 +7,7 @@ import { ArrowRight, Terminal, Check } from 'lucide-react'; const CTA_TITLES = [ 'Run coding agents in sandboxes. Control them over HTTP.', 'A server inside your sandbox. An API for your app.', - 'Claude Code, Codex, OpenCode, Amp — one HTTP API.', + 'Claude Code, Codex, OpenCode, Amp, Pi — one HTTP API.', 'Your app connects remotely. The coding agent runs isolated.', 'Streaming events. Handling permissions. Managing sessions.', 'Install with curl. Connect over HTTP. Control any coding agent.', diff --git a/frontend/packages/website/src/components/FAQ.tsx b/frontend/packages/website/src/components/FAQ.tsx index 5525b8e..4000c22 100644 --- a/frontend/packages/website/src/components/FAQ.tsx +++ b/frontend/packages/website/src/components/FAQ.tsx @@ -13,7 +13,7 @@ const faqs = [ { question: 'Which coding agents are supported?', answer: - 'Claude Code, Codex, OpenCode, and Amp. The SDK normalizes their APIs so you can swap between them without changing your code.', + 'Claude Code, Codex, OpenCode, Amp, and Pi. The SDK normalizes their APIs so you can swap between them without changing your code.', }, { question: 'How is session data persisted?', @@ -33,7 +33,7 @@ const faqs = [ { question: 'Can I use this with my personal API keys?', answer: - "Yes. Use sandbox-agent credentials extract-env to extract API keys from your local agent configs (Claude Code, Codex, OpenCode, Amp) and pass them to the sandbox environment.", + "Yes. Use sandbox-agent credentials extract-env to extract API keys from your local agent configs (Claude Code, Codex, OpenCode, Amp, Pi) and pass them to the sandbox environment.", }, { question: 'Why Rust and not [language]?', diff --git a/frontend/packages/website/src/components/FeatureGrid.tsx b/frontend/packages/website/src/components/FeatureGrid.tsx index be17c62..f2ea1ff 100644 --- a/frontend/packages/website/src/components/FeatureGrid.tsx +++ b/frontend/packages/website/src/components/FeatureGrid.tsx @@ -38,7 +38,7 @@ export function FeatureGrid() {

Universal Agent API

- Claude Code, Codex, OpenCode, and Amp each have different APIs. We provide a single, + Claude Code, Codex, OpenCode, Amp, and Pi each have different APIs. We provide a single, unified interface to control them all.

diff --git a/frontend/packages/website/src/components/Hero.tsx b/frontend/packages/website/src/components/Hero.tsx index 1401aa9..72c30bc 100644 --- a/frontend/packages/website/src/components/Hero.tsx +++ b/frontend/packages/website/src/components/Hero.tsx @@ -4,10 +4,11 @@ import { useState, useEffect } from 'react'; import { Terminal, Check, ArrowRight } from 'lucide-react'; const ADAPTERS = [ - { label: 'Claude Code', color: '#D97757', x: 35, y: 70, logo: '/logos/claude.svg' }, - { label: 'Codex', color: '#10A37F', x: 185, y: 70, logo: 'openai' }, - { label: 'Amp', color: '#F59E0B', x: 35, y: 155, logo: '/logos/amp.svg' }, - { label: 'OpenCode', color: '#8B5CF6', x: 185, y: 155, logo: 'opencode' }, + { label: 'Claude Code', color: '#D97757', x: 35, y: 30, logo: '/logos/claude.svg' }, + { label: 'Codex', color: '#10A37F', x: 185, y: 30, logo: 'openai' }, + { label: 'Amp', color: '#F59E0B', x: 35, y: 115, logo: '/logos/amp.svg' }, + { label: 'OpenCode', color: '#8B5CF6', x: 185, y: 115, logo: 'opencode' }, + { label: 'Pi', color: '#38BDF8', x: 110, y: 200, logo: '/logos/pi.svg' }, ]; function UniversalAPIDiagram() { @@ -187,7 +188,7 @@ export function Hero() { Control Them Over HTTP.

- The Sandbox Agent SDK is a server that runs inside your sandbox. Your app connects remotely to control Claude Code, Codex, OpenCode, or Amp — streaming events, handling permissions, managing sessions. + The Sandbox Agent SDK is a server that runs inside your sandbox. Your app connects remotely to control Claude Code, Codex, OpenCode, Amp, or Pi — streaming events, handling permissions, managing sessions.

diff --git a/frontend/packages/website/src/components/PainPoints.tsx b/frontend/packages/website/src/components/PainPoints.tsx index 7b26db4..654286b 100644 --- a/frontend/packages/website/src/components/PainPoints.tsx +++ b/frontend/packages/website/src/components/PainPoints.tsx @@ -16,7 +16,7 @@ const frictions = [ number: '02', title: 'Every Coding Agent is Different', problem: - 'Claude Code, Codex, OpenCode, and Amp each have proprietary APIs, event formats, and behaviors. Swapping coding agents means rewriting your entire integration.', + 'Claude Code, Codex, OpenCode, Amp, and Pi each have proprietary APIs, event formats, and behaviors. Swapping coding agents means rewriting your entire integration.', solution: 'One HTTP API. Write your code once, swap coding agents with a config change.', accentColor: 'purple', }, diff --git a/frontend/packages/website/src/components/ProblemsSolved.tsx b/frontend/packages/website/src/components/ProblemsSolved.tsx index cf6fc4c..6ebcfbb 100644 --- a/frontend/packages/website/src/components/ProblemsSolved.tsx +++ b/frontend/packages/website/src/components/ProblemsSolved.tsx @@ -6,7 +6,7 @@ import { FeatureIcon } from './ui/FeatureIcon'; const problems = [ { title: 'Universal Agent API', - desc: 'Claude Code, Codex, OpenCode, and Amp each have different APIs. We provide a single interface to control them all.', + desc: 'Claude Code, Codex, OpenCode, Amp, and Pi each have different APIs. We provide a single interface to control them all.', icon: Workflow, color: 'text-accent', }, diff --git a/resources/agent-schemas/artifacts/json-schema/pi.json b/resources/agent-schemas/artifacts/json-schema/pi.json new file mode 100644 index 0000000..3c92cfc --- /dev/null +++ b/resources/agent-schemas/artifacts/json-schema/pi.json @@ -0,0 +1,124 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "$id": "https://sandbox-agent/schemas/pi.json", + "title": "Pi RPC Schema", + "definitions": { + "RpcEvent": { + "type": "object", + "properties": { + "type": { + "type": "string" + }, + "sessionId": { + "type": "string" + }, + "messageId": { + "type": "string" + }, + "message": { + "$ref": "#/definitions/RpcMessage" + }, + "assistantMessageEvent": { + "$ref": "#/definitions/AssistantMessageEvent" + }, + "toolCallId": { + "type": "string" + }, + "toolName": { + "type": "string" + }, + "args": {}, + "partialResult": {}, + "result": { + "$ref": "#/definitions/ToolResult" + }, + "isError": { + "type": "boolean" + }, + "error": {} + }, + "required": [ + "type" + ] + }, + "RpcMessage": { + "type": "object", + "properties": { + "role": { + "type": "string" + }, + "content": {} + } + }, + "AssistantMessageEvent": { + "type": "object", + "properties": { + "type": { + "type": "string" + }, + "delta": { + "type": "string" + }, + "content": {}, + "partial": {}, + "messageId": { + "type": "string" + } + } + }, + "ToolResult": { + "type": "object", + "properties": { + "type": { + "type": "string" + }, + "content": { + "type": "string" + }, + "text": { + "type": "string" + } + } + }, + "RpcResponse": { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "response" + }, + "id": { + "type": "integer" + }, + "success": { + "type": "boolean" + }, + "data": {}, + "error": {} + }, + "required": [ + "type" + ] + }, + "RpcCommand": { + "type": "object", + "properties": { + "type": { + "type": "string", + "const": "command" + }, + "id": { + "type": "integer" + }, + "command": { + "type": "string" + }, + "params": {} + }, + "required": [ + "type", + "command" + ] + } + } +} \ No newline at end of file diff --git a/resources/agent-schemas/package.json b/resources/agent-schemas/package.json index 8d8f0c5..0272939 100644 --- a/resources/agent-schemas/package.json +++ b/resources/agent-schemas/package.json @@ -9,6 +9,7 @@ "extract:claude": "tsx src/index.ts --agent=claude", "extract:codex": "tsx src/index.ts --agent=codex", "extract:amp": "tsx src/index.ts --agent=amp", + "extract:pi": "tsx src/index.ts --agent=pi", "extract:claude-events": "tsx src/claude-event-types.ts", "extract:claude-events:sdk": "tsx src/claude-event-types-sdk.ts", "extract:claude-events:cli": "tsx src/claude-event-types-cli.ts", diff --git a/resources/agent-schemas/src/index.ts b/resources/agent-schemas/src/index.ts index 877e695..106eb2b 100644 --- a/resources/agent-schemas/src/index.ts +++ b/resources/agent-schemas/src/index.ts @@ -4,18 +4,20 @@ import { extractOpenCodeSchema } from "./opencode.js"; import { extractClaudeSchema } from "./claude.js"; import { extractCodexSchema } from "./codex.js"; import { extractAmpSchema } from "./amp.js"; +import { extractPiSchema } from "./pi.js"; import { validateSchema, type NormalizedSchema } from "./normalize.js"; const RESOURCE_DIR = join(import.meta.dirname, ".."); const DIST_DIR = join(RESOURCE_DIR, "artifacts", "json-schema"); -type AgentName = "opencode" | "claude" | "codex" | "amp"; +type AgentName = "opencode" | "claude" | "codex" | "amp" | "pi"; const EXTRACTORS: Record Promise> = { opencode: extractOpenCodeSchema, claude: extractClaudeSchema, codex: extractCodexSchema, amp: extractAmpSchema, + pi: extractPiSchema, }; function parseArgs(): { agents: AgentName[] } { diff --git a/resources/agent-schemas/src/pi.ts b/resources/agent-schemas/src/pi.ts new file mode 100644 index 0000000..5c72c75 --- /dev/null +++ b/resources/agent-schemas/src/pi.ts @@ -0,0 +1,191 @@ +import { execSync } from "child_process"; +import { existsSync, mkdtempSync, readdirSync, rmSync, writeFileSync } from "fs"; +import { join } from "path"; +import { tmpdir } from "os"; +import { createGenerator, type Config } from "ts-json-schema-generator"; +import { createNormalizedSchema, type NormalizedSchema } from "./normalize.js"; +import type { JSONSchema7 } from "json-schema"; + +const PI_SOURCE_URL = "https://codeload.github.com/badlogic/pi-mono/tar.gz/refs/heads/main"; +const RPC_TYPES_PATH = "packages/coding-agent/src/modes/rpc/rpc-types.ts"; +const TARGET_TYPES = ["RpcEvent", "RpcResponse", "RpcCommand"] as const; + +export async function extractPiSchema(): Promise { + console.log("Extracting Pi schema from pi-mono sources..."); + + const tempDir = mkdtempSync(join(tmpdir(), "pi-schema-")); + try { + const archivePath = join(tempDir, "pi-mono.tar.gz"); + await downloadToFile(PI_SOURCE_URL, archivePath); + + execSync(`tar -xzf "${archivePath}" -C "${tempDir}"`, { + stdio: ["ignore", "ignore", "ignore"], + }); + + const repoRoot = findRepoRoot(tempDir); + const rpcTypesPath = join(repoRoot, RPC_TYPES_PATH); + if (!existsSync(rpcTypesPath)) { + throw new Error(`rpc-types.ts not found at ${rpcTypesPath}`); + } + + const tsconfig = resolveTsconfig(repoRoot); + const definitions = generateDefinitions(rpcTypesPath, tsconfig); + if (Object.keys(definitions).length === 0) { + console.log(" [warn] No schemas extracted from source, using fallback"); + return createFallbackSchema(); + } + + console.log(` [ok] Extracted ${Object.keys(definitions).length} types from source`); + return createNormalizedSchema("pi", "Pi RPC Schema", definitions); + } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + console.log(` [warn] Pi schema extraction failed: ${errorMessage}`); + console.log(" [fallback] Using embedded schema definitions"); + return createFallbackSchema(); + } finally { + rmSync(tempDir, { recursive: true, force: true }); + } +} + +async function downloadToFile(url: string, filePath: string): Promise { + const response = await fetch(url); + if (!response.ok) { + throw new Error(`HTTP ${response.status}: ${response.statusText}`); + } + const buffer = Buffer.from(await response.arrayBuffer()); + writeFileSync(filePath, buffer); +} + +function findRepoRoot(root: string): string { + const entries = readdirSync(root, { withFileTypes: true }).filter((entry) => entry.isDirectory()); + const repoDir = entries.find((entry) => entry.name.startsWith("pi-mono")); + if (!repoDir) { + throw new Error("pi-mono source directory not found after extraction"); + } + return join(root, repoDir.name); +} + +function resolveTsconfig(root: string): string | undefined { + const candidates = [ + join(root, "tsconfig.json"), + join(root, "tsconfig.base.json"), + join(root, "packages", "coding-agent", "tsconfig.json"), + ]; + return candidates.find((path) => existsSync(path)); +} + +function generateDefinitions( + rpcTypesPath: string, + tsconfigPath?: string +): Record { + const definitions: Record = {}; + + for (const typeName of TARGET_TYPES) { + const config: Config = { + path: rpcTypesPath, + type: typeName, + expose: "all", + skipTypeCheck: false, + topRef: false, + ...(tsconfigPath ? { tsconfig: tsconfigPath } : {}), + }; + const schema = createGenerator(config).createSchema(typeName) as JSONSchema7; + mergeDefinitions(definitions, schema, typeName); + } + + return definitions; +} + +function mergeDefinitions( + target: Record, + schema: JSONSchema7, + typeName: string +): void { + if (schema.definitions) { + for (const [name, def] of Object.entries(schema.definitions)) { + target[name] = def as JSONSchema7; + } + } else if (schema.$defs) { + for (const [name, def] of Object.entries(schema.$defs)) { + target[name] = def as JSONSchema7; + } + } else { + target[typeName] = schema; + } + + if (!target[typeName]) { + target[typeName] = schema; + } +} + +function createFallbackSchema(): NormalizedSchema { + const definitions: Record = { + RpcEvent: { + type: "object", + properties: { + type: { type: "string" }, + sessionId: { type: "string" }, + messageId: { type: "string" }, + message: { $ref: "#/definitions/RpcMessage" }, + assistantMessageEvent: { $ref: "#/definitions/AssistantMessageEvent" }, + toolCallId: { type: "string" }, + toolName: { type: "string" }, + args: {}, + partialResult: {}, + result: { $ref: "#/definitions/ToolResult" }, + isError: { type: "boolean" }, + error: {}, + }, + required: ["type"], + }, + RpcMessage: { + type: "object", + properties: { + role: { type: "string" }, + content: {}, + }, + }, + AssistantMessageEvent: { + type: "object", + properties: { + type: { type: "string" }, + delta: { type: "string" }, + content: {}, + partial: {}, + messageId: { type: "string" }, + }, + }, + ToolResult: { + type: "object", + properties: { + type: { type: "string" }, + content: { type: "string" }, + text: { type: "string" }, + }, + }, + RpcResponse: { + type: "object", + properties: { + type: { type: "string", const: "response" }, + id: { type: "integer" }, + success: { type: "boolean" }, + data: {}, + error: {}, + }, + required: ["type"], + }, + RpcCommand: { + type: "object", + properties: { + type: { type: "string", const: "command" }, + id: { type: "integer" }, + command: { type: "string" }, + params: {}, + }, + required: ["type", "command"], + }, + }; + + console.log(` [ok] Using fallback schema with ${Object.keys(definitions).length} definitions`); + return createNormalizedSchema("pi", "Pi RPC Schema", definitions); +} diff --git a/server/packages/agent-management/src/agents.rs b/server/packages/agent-management/src/agents.rs index e110c96..d896d39 100644 --- a/server/packages/agent-management/src/agents.rs +++ b/server/packages/agent-management/src/agents.rs @@ -7,7 +7,6 @@ use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitSta use std::time::{Duration, Instant}; use flate2::read::GzDecoder; -use reqwest::blocking::Client; use sandbox_agent_extracted_agent_schemas::codex as codex_schema; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -21,6 +20,7 @@ pub enum AgentId { Codex, Opencode, Amp, + Pi, Mock, } @@ -31,17 +31,55 @@ impl AgentId { AgentId::Codex => "codex", AgentId::Opencode => "opencode", AgentId::Amp => "amp", + AgentId::Pi => "pi", AgentId::Mock => "mock", } } pub fn binary_name(self) -> &'static str { match self { - AgentId::Claude => "claude", - AgentId::Codex => "codex", - AgentId::Opencode => "opencode", - AgentId::Amp => "amp", - AgentId::Mock => "mock", + AgentId::Claude => { + if cfg!(windows) { + "claude.exe" + } else { + "claude" + } + } + AgentId::Codex => { + if cfg!(windows) { + "codex.exe" + } else { + "codex" + } + } + AgentId::Opencode => { + if cfg!(windows) { + "opencode.exe" + } else { + "opencode" + } + } + AgentId::Amp => { + if cfg!(windows) { + "amp.exe" + } else { + "amp" + } + } + AgentId::Pi => { + if cfg!(windows) { + "pi.exe" + } else { + "pi" + } + } + AgentId::Mock => { + if cfg!(windows) { + "mock.exe" + } else { + "mock" + } + } } } @@ -51,6 +89,7 @@ impl AgentId { "codex" => Some(AgentId::Codex), "opencode" => Some(AgentId::Opencode), "amp" => Some(AgentId::Amp), + "pi" => Some(AgentId::Pi), "mock" => Some(AgentId::Mock), _ => None, } @@ -151,6 +190,7 @@ impl AgentManager { install_opencode(&install_path, self.platform, options.version.as_deref())? } AgentId::Amp => install_amp(&install_path, self.platform, options.version.as_deref())?, + AgentId::Pi => install_pi(&install_path, self.platform, options.version.as_deref())?, AgentId::Mock => { if !install_path.exists() { fs::write(&install_path, b"mock")?; @@ -284,6 +324,11 @@ impl AgentManager { events, }); } + AgentId::Pi => { + return Err(AgentError::UnsupportedAgent { + agent: agent.as_str().to_string(), + }); + } AgentId::Mock => { return Err(AgentError::UnsupportedAgent { agent: agent.as_str().to_string(), @@ -619,6 +664,11 @@ impl AgentManager { AgentId::Amp => { return Ok(build_amp_command(&path, &working_dir, options)); } + AgentId::Pi => { + return Err(AgentError::UnsupportedAgent { + agent: agent.as_str().to_string(), + }); + } AgentId::Mock => { return Err(AgentError::UnsupportedAgent { agent: agent.as_str().to_string(), @@ -940,6 +990,7 @@ fn extract_session_id(agent: AgentId, events: &[Value]) -> Option { return Some(id); } } + AgentId::Pi => {} AgentId::Mock => {} } } @@ -1022,6 +1073,7 @@ fn extract_result_text(agent: AgentId, events: &[Value]) -> Option { Some(buffer) } } + AgentId::Pi => None, AgentId::Mock => None, } } @@ -1200,7 +1252,7 @@ fn default_install_dir() -> PathBuf { } fn download_bytes(url: &Url) -> Result, AgentError> { - let client = Client::builder().build()?; + let client = crate::http_client::blocking_client_builder().build()?; let mut response = client.get(url.clone()).send()?; if !response.status().is_success() { return Err(AgentError::DownloadFailed { url: url.clone() }); @@ -1210,6 +1262,28 @@ fn download_bytes(url: &Url) -> Result, AgentError> { Ok(bytes) } +fn install_pi(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> { + let asset = match platform { + Platform::LinuxX64 | Platform::LinuxX64Musl => "pi-linux-x64", + Platform::LinuxArm64 => "pi-linux-arm64", + Platform::MacosArm64 => "pi-darwin-arm64", + Platform::MacosX64 => "pi-darwin-x64", + } + .to_string(); + let url = match version { + Some(version) => Url::parse(&format!( + "https://upd.dev/badlogic/pi-mono/releases/download/{version}/{asset}" + ))?, + None => Url::parse(&format!( + "https://upd.dev/badlogic/pi-mono/releases/latest/download/{asset}" + ))?, + }; + + let bytes = download_bytes(&url)?; + write_executable(path, &bytes)?; + Ok(()) +} + fn install_claude( path: &Path, platform: Platform, @@ -1329,7 +1403,7 @@ fn install_opencode( }; install_zip_binary(path, &url, "opencode") } - _ => { + Platform::LinuxX64 | Platform::LinuxX64Musl | Platform::LinuxArm64 => { let platform_segment = match platform { Platform::LinuxX64 => "linux-x64", Platform::LinuxX64Musl => "linux-x64-musl", diff --git a/server/packages/agent-management/src/http_client.rs b/server/packages/agent-management/src/http_client.rs new file mode 100644 index 0000000..e9c8bd6 --- /dev/null +++ b/server/packages/agent-management/src/http_client.rs @@ -0,0 +1,20 @@ +use std::env; + +use reqwest::blocking::ClientBuilder; + +const NO_SYSTEM_PROXY_ENV: &str = "SANDBOX_AGENT_NO_SYSTEM_PROXY"; + +fn disable_system_proxy() -> bool { + env::var(NO_SYSTEM_PROXY_ENV) + .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES")) + .unwrap_or(false) +} + +pub(crate) fn blocking_client_builder() -> ClientBuilder { + let builder = reqwest::blocking::Client::builder(); + if disable_system_proxy() { + builder.no_proxy() + } else { + builder + } +} diff --git a/server/packages/agent-management/src/lib.rs b/server/packages/agent-management/src/lib.rs index 9ef62da..7d776af 100644 --- a/server/packages/agent-management/src/lib.rs +++ b/server/packages/agent-management/src/lib.rs @@ -1,3 +1,4 @@ pub mod agents; pub mod credentials; +mod http_client; pub mod testing; diff --git a/server/packages/agent-management/src/testing.rs b/server/packages/agent-management/src/testing.rs index 6ba411d..34d872d 100644 --- a/server/packages/agent-management/src/testing.rs +++ b/server/packages/agent-management/src/testing.rs @@ -2,7 +2,6 @@ use std::env; use std::path::PathBuf; use std::time::Duration; -use reqwest::blocking::Client; use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE}; use reqwest::StatusCode; use thiserror::Error; @@ -36,6 +35,7 @@ pub enum TestAgentConfigError { const AGENTS_ENV: &str = "SANDBOX_TEST_AGENTS"; const ANTHROPIC_ENV: &str = "SANDBOX_TEST_ANTHROPIC_API_KEY"; const OPENAI_ENV: &str = "SANDBOX_TEST_OPENAI_API_KEY"; +const PI_ENV: &str = "SANDBOX_TEST_PI"; const ANTHROPIC_MODELS_URL: &str = "https://api.anthropic.com/v1/models"; const OPENAI_MODELS_URL: &str = "https://api.openai.com/v1/models"; const ANTHROPIC_VERSION: &str = "2023-06-01"; @@ -63,6 +63,7 @@ pub fn test_agents_from_env() -> Result, TestAgentConfigErr AgentId::Codex, AgentId::Opencode, AgentId::Amp, + AgentId::Pi, ]); continue; } @@ -73,6 +74,12 @@ pub fn test_agents_from_env() -> Result, TestAgentConfigErr agents }; + let include_pi = pi_tests_enabled() && find_in_path(AgentId::Pi.binary_name()); + if !include_pi && agents.iter().any(|agent| *agent == AgentId::Pi) { + eprintln!("Skipping Pi tests: set {PI_ENV}=1 and ensure pi is on PATH."); + } + agents.retain(|agent| *agent != AgentId::Pi || include_pi); + agents.sort_by(|a, b| a.as_str().cmp(b.as_str())); agents.dedup(); @@ -137,6 +144,21 @@ pub fn test_agents_from_env() -> Result, TestAgentConfigErr } credentials_with(anthropic_cred.clone(), openai_cred.clone()) } + AgentId::Pi => { + if anthropic_cred.is_none() && openai_cred.is_none() { + return Err(TestAgentConfigError::MissingCredentials { + agent, + missing: format!("{ANTHROPIC_ENV} or {OPENAI_ENV}"), + }); + } + if let Some(cred) = anthropic_cred.as_ref() { + ensure_anthropic_ok(&mut health_cache, cred)?; + } + if let Some(cred) = openai_cred.as_ref() { + ensure_openai_ok(&mut health_cache, cred)?; + } + credentials_with(anthropic_cred.clone(), openai_cred.clone()) + } AgentId::Mock => credentials_with(None, None), }; configs.push(TestAgentConfig { agent, credentials }); @@ -172,7 +194,7 @@ fn ensure_openai_ok( fn health_check_anthropic(credentials: &ProviderCredentials) -> Result<(), TestAgentConfigError> { let credentials = credentials.clone(); run_blocking_check("anthropic", move || { - let client = Client::builder() + let client = crate::http_client::blocking_client_builder() .timeout(Duration::from_secs(10)) .build() .map_err(|err| TestAgentConfigError::HealthCheckFailed { @@ -226,7 +248,7 @@ fn health_check_anthropic(credentials: &ProviderCredentials) -> Result<(), TestA fn health_check_openai(credentials: &ProviderCredentials) -> Result<(), TestAgentConfigError> { let credentials = credentials.clone(); run_blocking_check("openai", move || { - let client = Client::builder() + let client = crate::http_client::blocking_client_builder() .timeout(Duration::from_secs(10)) .build() .map_err(|err| TestAgentConfigError::HealthCheckFailed { @@ -298,12 +320,15 @@ where } fn detect_system_agents() -> Vec { - let candidates = [ + let mut candidates = vec![ AgentId::Claude, AgentId::Codex, AgentId::Opencode, AgentId::Amp, ]; + if pi_tests_enabled() && find_in_path(AgentId::Pi.binary_name()) { + candidates.push(AgentId::Pi); + } let install_dir = default_install_dir(); candidates .into_iter() @@ -345,6 +370,15 @@ fn read_env_key(name: &str) -> Option { }) } +fn pi_tests_enabled() -> bool { + env::var(PI_ENV) + .map(|value| { + let value = value.trim().to_ascii_lowercase(); + value == "1" || value == "true" || value == "yes" + }) + .unwrap_or(false) +} + fn credentials_with( anthropic_cred: Option, openai_cred: Option, diff --git a/server/packages/extracted-agent-schemas/build.rs b/server/packages/extracted-agent-schemas/build.rs index 5807bfc..7b2f9ef 100644 --- a/server/packages/extracted-agent-schemas/build.rs +++ b/server/packages/extracted-agent-schemas/build.rs @@ -11,6 +11,7 @@ fn main() { ("claude", "claude.json"), ("codex", "codex.json"), ("amp", "amp.json"), + ("pi", "pi.json"), ]; for (name, file) in schemas { diff --git a/server/packages/extracted-agent-schemas/src/lib.rs b/server/packages/extracted-agent-schemas/src/lib.rs index f72a064..2c2ea8a 100644 --- a/server/packages/extracted-agent-schemas/src/lib.rs +++ b/server/packages/extracted-agent-schemas/src/lib.rs @@ -5,6 +5,7 @@ //! - Claude Code SDK //! - Codex SDK //! - AMP Code SDK +//! - Pi RPC pub mod opencode { //! OpenCode SDK types extracted from OpenAPI 3.1.1 spec. @@ -25,3 +26,8 @@ pub mod amp { //! AMP Code SDK types. include!(concat!(env!("OUT_DIR"), "/amp.rs")); } + +pub mod pi { + //! Pi RPC types. + include!(concat!(env!("OUT_DIR"), "/pi.rs")); +} diff --git a/server/packages/sandbox-agent/src/http_client.rs b/server/packages/sandbox-agent/src/http_client.rs new file mode 100644 index 0000000..aed5a71 --- /dev/null +++ b/server/packages/sandbox-agent/src/http_client.rs @@ -0,0 +1,30 @@ +use std::env; + +use reqwest::blocking::ClientBuilder as BlockingClientBuilder; +use reqwest::ClientBuilder; + +const NO_SYSTEM_PROXY_ENV: &str = "SANDBOX_AGENT_NO_SYSTEM_PROXY"; + +fn disable_system_proxy() -> bool { + env::var(NO_SYSTEM_PROXY_ENV) + .map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES")) + .unwrap_or(false) +} + +pub fn client_builder() -> ClientBuilder { + let builder = reqwest::Client::builder(); + if disable_system_proxy() { + builder.no_proxy() + } else { + builder + } +} + +pub fn blocking_client_builder() -> BlockingClientBuilder { + let builder = reqwest::blocking::Client::builder(); + if disable_system_proxy() { + builder.no_proxy() + } else { + builder + } +} diff --git a/server/packages/sandbox-agent/src/lib.rs b/server/packages/sandbox-agent/src/lib.rs index 098d0b3..2bf872f 100644 --- a/server/packages/sandbox-agent/src/lib.rs +++ b/server/packages/sandbox-agent/src/lib.rs @@ -2,6 +2,7 @@ mod agent_server_logs; pub mod credentials; +pub mod http_client; pub mod router; pub mod telemetry; pub mod ui; diff --git a/server/packages/sandbox-agent/src/main.rs b/server/packages/sandbox-agent/src/main.rs index 92587c2..f198dbe 100644 --- a/server/packages/sandbox-agent/src/main.rs +++ b/server/packages/sandbox-agent/src/main.rs @@ -11,6 +11,7 @@ mod build_version { } use reqwest::blocking::Client as HttpClient; use reqwest::Method; +use sandbox_agent::http_client; use sandbox_agent::router::{build_router_with_state, shutdown_servers}; use sandbox_agent::router::{ AgentInstallRequest, AppState, AuthConfig, CreateSessionRequest, MessageRequest, @@ -687,6 +688,7 @@ enum CredentialAgent { Codex, Opencode, Amp, + Pi, } fn credentials_to_output(credentials: ExtractedCredentials, reveal: bool) -> CredentialsOutput { @@ -806,6 +808,31 @@ fn select_token_for_agent( ))) } } + CredentialAgent::Pi => { + if let Some(provider) = provider { + return select_token_for_provider(credentials, provider); + } + if let Some(openai) = credentials.openai.as_ref() { + return Ok(openai.api_key.clone()); + } + if let Some(anthropic) = credentials.anthropic.as_ref() { + return Ok(anthropic.api_key.clone()); + } + if credentials.other.len() == 1 { + if let Some((_, cred)) = credentials.other.iter().next() { + return Ok(cred.api_key.clone()); + } + } + let available = available_providers(credentials); + if available.is_empty() { + Err(CliError::Server("no credentials found for pi".to_string())) + } else { + Err(CliError::Server(format!( + "multiple providers available for pi: {} (use --provider)", + available.join(", ") + ))) + } + } } } @@ -919,7 +946,7 @@ impl ClientContext { } else { cli.token.clone() }; - let client = HttpClient::builder().build()?; + let client = http_client::blocking_client_builder().build()?; Ok(Self { endpoint, token, diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs index 5f16582..69540e1 100644 --- a/server/packages/sandbox-agent/src/router.rs +++ b/server/packages/sandbox-agent/src/router.rs @@ -1,6 +1,6 @@ use std::collections::{HashMap, HashSet, VecDeque}; use std::convert::Infallible; -use std::io::{BufRead, BufReader, Write}; +use std::io::{BufRead, BufReader, Read, Write}; use std::net::TcpListener; use std::path::PathBuf; use std::process::Stdio; @@ -21,6 +21,7 @@ use reqwest::Client; use sandbox_agent_error::{AgentError, ErrorType, ProblemDetails, SandboxError}; use sandbox_agent_universal_agent_schema::{ codex as codex_schema, convert_amp, convert_claude, convert_codex, convert_opencode, + convert_pi, pi as pi_schema, AgentUnparsedData, ContentPart, ErrorData, EventConversion, EventSource, FileAction, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, PermissionEventData, PermissionStatus, QuestionEventData, QuestionStatus, ReasoningVisibility, SessionEndReason, @@ -37,6 +38,7 @@ use tower_http::trace::TraceLayer; use utoipa::{Modify, OpenApi, ToSchema}; use crate::agent_server_logs::AgentServerLogs; +use crate::http_client; use crate::ui; use sandbox_agent_agent_management::agents::{ AgentError as ManagerError, AgentId, AgentManager, InstallOptions, SpawnOptions, StreamingSpawn, @@ -714,7 +716,8 @@ impl SessionState { #[derive(Debug)] enum ManagedServerKind { Http { base_url: String }, - Stdio { server: Arc }, + StdioCodex { server: Arc }, + StdioPi { server: Arc }, } #[derive(Debug)] @@ -847,6 +850,165 @@ impl CodexServer { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum PiRpcDialect { + Mono, + CodingAgent, +} + +fn detect_pi_dialect(path: &PathBuf) -> PiRpcDialect { + let mut file = match std::fs::File::open(path) { + Ok(file) => file, + Err(_) => return PiRpcDialect::Mono, + }; + let mut buffer = [0u8; 256]; + let read = match file.read(&mut buffer) { + Ok(read) => read, + Err(_) => return PiRpcDialect::Mono, + }; + if read == 0 { + return PiRpcDialect::Mono; + } + let sample = &buffer[..read]; + if sample.starts_with(b"#!") { + return PiRpcDialect::CodingAgent; + } + if sample.iter().any(|byte| *byte == 0) { + return PiRpcDialect::Mono; + } + let is_text = sample.iter().all(|byte| byte.is_ascii()); + if is_text { + return PiRpcDialect::CodingAgent; + } + PiRpcDialect::Mono +} + +/// Shared Pi RPC process that multiplexes sessions via newline-delimited JSON. +struct PiServer { + /// Sender for writing to the process stdin + stdin_sender: mpsc::UnboundedSender, + /// Pending RPC requests awaiting responses, keyed by request ID + pending_requests: std::sync::Mutex>>, + /// Next request ID for RPC + next_id: AtomicI64, + /// Mapping from native session ID to daemon session ID + session_map: std::sync::Mutex>, + /// Per-session conversion state (partial tool results, reasoning buffers) + converters: std::sync::Mutex>, + /// RPC dialect used by the Pi binary + dialect: PiRpcDialect, + /// Current daemon session id for coding-agent (no session id in events) + current_session_id: std::sync::Mutex>, + /// Current native session id for coding-agent (for metadata) + current_native_session_id: std::sync::Mutex>, +} + +impl std::fmt::Debug for PiServer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PiServer") + .field("next_id", &self.next_id.load(Ordering::SeqCst)) + .finish() + } +} + +impl PiServer { + fn new(stdin_sender: mpsc::UnboundedSender, dialect: PiRpcDialect) -> Self { + Self { + stdin_sender, + pending_requests: std::sync::Mutex::new(HashMap::new()), + next_id: AtomicI64::new(1), + session_map: std::sync::Mutex::new(HashMap::new()), + converters: std::sync::Mutex::new(HashMap::new()), + dialect, + current_session_id: std::sync::Mutex::new(None), + current_native_session_id: std::sync::Mutex::new(None), + } + } + + fn dialect(&self) -> PiRpcDialect { + self.dialect + } + + fn current_session_id(&self) -> Option { + self.current_session_id.lock().unwrap().clone() + } + + fn current_native_session_id(&self) -> Option { + self.current_native_session_id.lock().unwrap().clone() + } + + fn next_request_id(&self) -> i64 { + self.next_id.fetch_add(1, Ordering::SeqCst) + } + + fn send_request(&self, id: i64, request: &impl Serialize) -> Option> { + let (tx, rx) = oneshot::channel(); + { + let mut pending = self.pending_requests.lock().unwrap(); + pending.insert(id, tx); + } + let line = serde_json::to_string(request).ok()?; + self.stdin_sender.send(line).ok()?; + Some(rx) + } + + fn complete_request(&self, id: i64, result: Value) { + let tx = { + let mut pending = self.pending_requests.lock().unwrap(); + pending.remove(&id) + }; + if let Some(tx) = tx { + let _ = tx.send(result); + } + } + + fn register_session(&self, native_session_id: String, session_id: String) { + let mut sessions = self.session_map.lock().unwrap(); + sessions.insert(native_session_id.clone(), session_id.clone()); + let mut converters = self.converters.lock().unwrap(); + converters + .entry(native_session_id.clone()) + .or_insert_with(convert_pi::PiEventConverter::default); + if self.dialect == PiRpcDialect::CodingAgent { + *self.current_session_id.lock().unwrap() = Some(session_id); + *self.current_native_session_id.lock().unwrap() = Some(native_session_id); + } + } + + fn unregister_session(&self, native_session_id: &str) { + let mut sessions = self.session_map.lock().unwrap(); + sessions.remove(native_session_id); + let mut converters = self.converters.lock().unwrap(); + converters.remove(native_session_id); + if self.dialect == PiRpcDialect::CodingAgent { + let current_native = self.current_native_session_id.lock().unwrap().clone(); + if current_native.as_deref() == Some(native_session_id) { + *self.current_session_id.lock().unwrap() = None; + *self.current_native_session_id.lock().unwrap() = None; + } + } + } + + fn session_for_native(&self, native_session_id: &str) -> Option { + let sessions = self.session_map.lock().unwrap(); + sessions.get(native_session_id).cloned() + } + + fn clear_pending(&self) { + let mut pending = self.pending_requests.lock().unwrap(); + pending.clear(); + } + + fn clear_sessions(&self) { + let mut sessions = self.session_map.lock().unwrap(); + sessions.clear(); + let mut converters = self.converters.lock().unwrap(); + converters.clear(); + *self.current_session_id.lock().unwrap() = None; + *self.current_native_session_id.lock().unwrap() = None; + } +} + struct SessionSubscription { initial_events: Vec, receiver: broadcast::Receiver, @@ -856,7 +1018,7 @@ impl ManagedServer { fn base_url(&self) -> Option { match &self.kind { ManagedServerKind::Http { base_url } => Some(base_url.clone()), - ManagedServerKind::Stdio { .. } => None, + ManagedServerKind::StdioCodex { .. } | ManagedServerKind::StdioPi { .. } => None, } } @@ -956,6 +1118,24 @@ impl AgentServerManager { let mut natives = self.native_sessions.lock().await; natives.remove(&agent); } + + if agent == AgentId::Pi { + if let Some(native_session_id) = native_session_id { + let server = { + let servers = self.servers.lock().await; + servers.get(&AgentId::Pi).and_then(|server| { + if let ManagedServerKind::StdioPi { server } = &server.kind { + Some(server.clone()) + } else { + None + } + }) + }; + if let Some(server) = server { + server.unregister_session(native_session_id); + } + } + } } async fn clear_mappings(&self, agent: AgentId) { @@ -1050,7 +1230,7 @@ impl AgentServerManager { let servers = self.servers.lock().await; if let Some(server) = servers.get(&agent) { if matches!(server.status, ServerStatus::Running) { - if let ManagedServerKind::Stdio { server } = &server.kind { + if let ManagedServerKind::StdioCodex { server } = &server.kind { return Ok((server.clone(), None)); } } @@ -1075,7 +1255,7 @@ impl AgentServerManager { let _ = child.kill(); } } - if let ManagedServerKind::Stdio { server } = &existing.kind { + if let ManagedServerKind::StdioCodex { server } = &existing.kind { return Ok((server.clone(), None)); } } @@ -1083,7 +1263,7 @@ impl AgentServerManager { servers.insert( agent, ManagedServer { - kind: ManagedServerKind::Stdio { + kind: ManagedServerKind::StdioCodex { server: server.clone(), }, child: child.clone(), @@ -1102,6 +1282,65 @@ impl AgentServerManager { Ok((server, Some(stdout_rx))) } + async fn ensure_pi_server( + self: &Arc, + ) -> Result<(Arc, Option>), SandboxError> { + { + let servers = self.servers.lock().await; + if let Some(server) = servers.get(&AgentId::Pi) { + if matches!(server.status, ServerStatus::Running) { + if let ManagedServerKind::StdioPi { server } = &server.kind { + return Ok((server.clone(), None)); + } + } + } + } + + let (server, stdout_rx, child) = self.spawn_pi_server().await?; + let restart_count = { + let servers = self.servers.lock().await; + servers + .get(&AgentId::Pi) + .map(|server| server.restart_count + 1) + .unwrap_or(0) + }; + + { + let mut servers = self.servers.lock().await; + if let Some(existing) = servers.get(&AgentId::Pi) { + if matches!(existing.status, ServerStatus::Running) { + if let Ok(mut guard) = child.lock() { + if let Some(child) = guard.as_mut() { + let _ = child.kill(); + } + } + if let ManagedServerKind::StdioPi { server } = &existing.kind { + return Ok((server.clone(), None)); + } + } + } + servers.insert( + AgentId::Pi, + ManagedServer { + kind: ManagedServerKind::StdioPi { + server: server.clone(), + }, + child: child.clone(), + status: ServerStatus::Running, + start_time: Some(Instant::now()), + restart_count, + last_error: None, + shutdown_requested: false, + instance_id: restart_count, + }, + ); + } + + self.spawn_monitor_task(AgentId::Pi, restart_count, child); + + Ok((server, Some(stdout_rx))) + } + async fn shutdown(&self) { let mut servers = self.servers.lock().await; for server in servers.values_mut() { @@ -1113,10 +1352,14 @@ impl AgentServerManager { let _ = child.kill(); } } - if let ManagedServerKind::Stdio { server } = &server.kind { + if let ManagedServerKind::StdioCodex { server } = &server.kind { server.clear_pending(); server.clear_threads(); } + if let ManagedServerKind::StdioPi { server } = &server.kind { + server.clear_pending(); + server.clear_sessions(); + } } } @@ -1258,6 +1501,95 @@ impl AgentServerManager { )) } + async fn spawn_pi_server( + self: &Arc, + ) -> Result< + ( + Arc, + mpsc::UnboundedReceiver, + Arc>>, + ), + SandboxError, + > { + let manager = self.agent_manager.clone(); + let log_dir = self.log_base_dir.clone(); + let (stdin_tx, stdin_rx) = mpsc::unbounded_channel::(); + let (stdout_tx, stdout_rx) = mpsc::unbounded_channel::(); + + let child = tokio::task::spawn_blocking( + move || -> Result<(std::process::Child, PiRpcDialect), SandboxError> { + let path = manager + .resolve_binary(AgentId::Pi) + .map_err(|err| map_spawn_error(AgentId::Pi, err))?; + let dialect = detect_pi_dialect(&path); + let mut command = std::process::Command::new(path); + let stderr = AgentServerLogs::new(log_dir, AgentId::Pi.as_str()).open()?; + command + .arg("--mode") + .arg("rpc") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(stderr); + + let mut child = command.spawn().map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })?; + + let stdin = child + .stdin + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "pi stdin unavailable".to_string(), + })?; + let stdout = child + .stdout + .take() + .ok_or_else(|| SandboxError::StreamError { + message: "pi stdout unavailable".to_string(), + })?; + + let stdin_rx_mut = std::sync::Mutex::new(stdin_rx); + std::thread::spawn(move || { + let mut stdin = stdin; + let mut rx = stdin_rx_mut.lock().unwrap(); + while let Some(line) = rx.blocking_recv() { + if writeln!(stdin, "{line}").is_err() { + break; + } + if stdin.flush().is_err() { + break; + } + } + }); + + std::thread::spawn(move || { + let reader = BufReader::new(stdout); + for line in reader.lines() { + let Ok(line) = line else { break }; + if stdout_tx.send(line).is_err() { + break; + } + } + }); + + Ok((child, dialect)) + }, + ) + .await + .map_err(|err| SandboxError::StreamError { + message: err.to_string(), + })??; + + let (child, dialect) = child; + let server = Arc::new(PiServer::new(stdin_tx, dialect)); + + Ok(( + server, + stdout_rx, + Arc::new(std::sync::Mutex::new(Some(child))), + )) + } + fn spawn_monitor_task( self: &Arc, agent: AgentId, @@ -1302,6 +1634,7 @@ impl AgentServerManager { let exit_code = status.code(); let message = format!("agent server exited with status {:?}", status); let mut codex_server = None; + let mut pi_server = None; let mut shutdown_requested = false; { let mut servers = self.servers.lock().await; @@ -1322,9 +1655,12 @@ impl AgentServerManager { if let Ok(mut guard) = server.child.lock() { *guard = None; } - if let ManagedServerKind::Stdio { server } = &server.kind { + if let ManagedServerKind::StdioCodex { server } = &server.kind { codex_server = Some(server.clone()); } + if let ManagedServerKind::StdioPi { server } = &server.kind { + pi_server = Some(server.clone()); + } } } @@ -1332,6 +1668,10 @@ impl AgentServerManager { server.clear_pending(); server.clear_threads(); } + if let Some(server) = pi_server { + server.clear_pending(); + server.clear_sessions(); + } if shutdown_requested { self.clear_mappings(agent).await; @@ -1380,6 +1720,21 @@ impl AgentServerManager { } } } + AgentId::Pi => { + let (server, receiver) = self.ensure_pi_server().await?; + if let Some(stdout_rx) = receiver { + let owner = self.owner.lock().expect("owner lock").clone(); + if let Some(owner) = owner.as_ref().and_then(|weak| weak.upgrade()) { + let owner_clone = owner.clone(); + let server_clone = server.clone(); + tokio::spawn(async move { + owner_clone + .handle_pi_server_output(server_clone, stdout_rx) + .await; + }); + } + } + } _ => {} } Ok(()) @@ -1442,9 +1797,12 @@ impl AgentServerManager { impl SessionManager { fn new(agent_manager: Arc) -> Self { let log_base_dir = default_log_dir(); + let http_client = http_client::client_builder() + .build() + .expect("failed to build http client"); let server_manager = Arc::new(AgentServerManager::new( agent_manager.clone(), - Client::new(), + http_client.clone(), log_base_dir, true, )); @@ -1452,7 +1810,7 @@ impl SessionManager { agent_manager, sessions: Mutex::new(Vec::new()), server_manager, - http_client: Client::new(), + http_client, } } @@ -1533,6 +1891,19 @@ impl SessionManager { let thread_id = self.create_codex_thread(&session_id, &snapshot).await?; session.native_session_id = Some(thread_id); } + if agent_id == AgentId::Pi { + let snapshot = SessionSnapshot { + session_id: session_id.clone(), + agent: agent_id, + agent_mode: session.agent_mode.clone(), + permission_mode: session.permission_mode.clone(), + model: session.model.clone(), + variant: session.variant.clone(), + native_session_id: None, + }; + let native_id = self.create_pi_session(&session_id, &snapshot).await?; + session.native_session_id = Some(native_id); + } if agent_id == AgentId::Mock { session.native_session_id = Some(format!("mock-{session_id}")); } @@ -1569,7 +1940,7 @@ impl SessionManager { let mut sessions = self.sessions.lock().await; sessions.push(session); drop(sessions); - if agent_id == AgentId::Opencode || agent_id == AgentId::Codex { + if agent_id == AgentId::Opencode || agent_id == AgentId::Codex || agent_id == AgentId::Pi { self.server_manager .register_session(agent_id, &session_id, native_session_id.as_deref()) .await; @@ -1615,7 +1986,10 @@ impl SessionManager { self.send_mock_message(session_id, message).await?; return Ok(()); } - if matches!(session_snapshot.agent, AgentId::Claude | AgentId::Amp) { + if matches!( + session_snapshot.agent, + AgentId::Claude | AgentId::Amp | AgentId::Pi + ) { let _ = self .record_conversions(&session_id, user_message_conversions(&message)) .await; @@ -1641,6 +2015,15 @@ impl SessionManager { } return Ok(()); } + if session_snapshot.agent == AgentId::Pi { + self.send_pi_prompt(&session_snapshot, &message).await?; + if !agent_supports_item_started(session_snapshot.agent) { + let _ = self + .emit_synthetic_assistant_start(&session_snapshot.session_id) + .await; + } + return Ok(()); + } // Reopen the session if it was ended (for resumable agents) self.reopen_session_if_ended(&session_id).await; @@ -1750,7 +2133,7 @@ impl SessionManager { let agent = session.agent; let native_session_id = session.native_session_id.clone(); drop(sessions); - if agent == AgentId::Opencode || agent == AgentId::Codex { + if agent == AgentId::Opencode || agent == AgentId::Codex || agent == AgentId::Pi { self.server_manager .unregister_session(agent, &session_id, native_session_id.as_deref()) .await; @@ -3051,6 +3434,325 @@ impl SessionManager { Ok(()) } + /// Ensures a shared Pi RPC process is running. + async fn ensure_pi_server(self: &Arc) -> Result, SandboxError> { + let (server, receiver) = self.server_manager.ensure_pi_server().await?; + + if let Some(stdout_rx) = receiver { + let server_for_task = server.clone(); + let self_for_task = Arc::clone(self); + tokio::spawn(async move { + self_for_task + .handle_pi_server_output(server_for_task, stdout_rx) + .await; + }); + } + + Ok(server) + } + + /// Handles output from the Pi RPC server, routing responses and events. + async fn handle_pi_server_output( + self: Arc, + server: Arc, + mut stdout_rx: mpsc::UnboundedReceiver, + ) { + while let Some(line) = stdout_rx.recv().await { + let trimmed = line.trim(); + if trimmed.is_empty() { + continue; + } + + let value: Value = match serde_json::from_str(trimmed) { + Ok(v) => v, + Err(err) => { + self.record_pi_unparsed(None, &err.to_string(), Value::String(trimmed.to_string())) + .await; + continue; + } + }; + + let message_type = value.get("type").and_then(Value::as_str).unwrap_or(""); + if message_type == "response" { + let id = value + .get("id") + .and_then(Value::as_i64) + .or_else(|| value.get("id").and_then(Value::as_str)?.parse::().ok()); + if let Some(id) = id { + server.complete_request(id, value.clone()); + } + continue; + } + + let native_session_id = extract_pi_session_id(&value).or_else(|| { + if server.dialect() == PiRpcDialect::CodingAgent { + server.current_native_session_id() + } else { + None + } + }); + let session_id = native_session_id + .as_ref() + .and_then(|id| server.session_for_native(id)) + .or_else(|| { + if server.dialect() == PiRpcDialect::CodingAgent { + server.current_session_id() + } else { + None + } + }); + let Some(session_id) = session_id else { + self.record_pi_unparsed( + None, + "pi event missing session id", + value.clone(), + ) + .await; + continue; + }; + + let event: pi_schema::RpcEvent = match serde_json::from_value(value.clone()) { + Ok(event) => event, + Err(err) => { + self.record_pi_unparsed(Some(session_id.clone()), &err.to_string(), value.clone()) + .await; + continue; + } + }; + + let conversions = { + let mut converters = server.converters.lock().unwrap(); + let key = native_session_id + .clone() + .unwrap_or_else(|| session_id.clone()); + let converter = converters + .entry(key) + .or_insert_with(convert_pi::PiEventConverter::default); + converter.event_to_universal(&event) + }; + + let mut conversions = match conversions { + Ok(conversions) => conversions, + Err(err) => { + self.record_pi_unparsed(Some(session_id.clone()), &err, value.clone()) + .await; + continue; + } + }; + + for conversion in &mut conversions { + if conversion.native_session_id.is_none() { + conversion.native_session_id = native_session_id.clone(); + } + conversion.raw = Some(value.clone()); + } + + let _ = self.record_conversions(&session_id, conversions).await; + } + } + + async fn create_pi_session( + self: &Arc, + session_id: &str, + _session: &SessionSnapshot, + ) -> Result { + let server = self.ensure_pi_server().await?; + if server.dialect() == PiRpcDialect::CodingAgent && server.current_session_id().is_some() { + return Err(SandboxError::InvalidRequest { + message: + "pi-coding-agent supports a single active session; terminate it before creating a new session" + .to_string(), + }); + } + + let id = server.next_request_id(); + let request = match server.dialect() { + PiRpcDialect::Mono => json!({ + "type": "command", + "id": id, + "command": "new_session", + "params": { "sessionName": session_id } + }), + PiRpcDialect::CodingAgent => json!({ + "type": "new_session", + "id": id + }), + }; + + let rx = server + .send_request(id, &request) + .ok_or_else(|| SandboxError::StreamError { + message: "failed to send pi new_session request".to_string(), + })?; + + let result = tokio::time::timeout(Duration::from_secs(30), rx).await; + match result { + Ok(Ok(response)) => { + if response + .get("success") + .and_then(Value::as_bool) + .is_some_and(|success| !success) + { + return Err(SandboxError::StreamError { + message: format!("pi new_session failed: {response}"), + }); + } + if response + .get("data") + .and_then(|value| value.get("cancelled")) + .and_then(Value::as_bool) + .is_some_and(|cancelled| cancelled) + { + return Err(SandboxError::StreamError { + message: "pi new_session request cancelled".to_string(), + }); + } + + let native_session_id = if server.dialect() == PiRpcDialect::CodingAgent { + let state_id = server.next_request_id(); + let request = json!({ + "type": "get_state", + "id": state_id + }); + let rx = server + .send_request(state_id, &request) + .ok_or_else(|| SandboxError::StreamError { + message: "failed to send pi get_state request".to_string(), + })?; + let result = tokio::time::timeout(Duration::from_secs(30), rx).await; + let response = match result { + Ok(Ok(response)) => response, + Ok(Err(_)) => { + return Err(SandboxError::StreamError { + message: "pi get_state request cancelled".to_string(), + }) + } + Err(_) => { + return Err(SandboxError::StreamError { + message: "pi get_state request timed out".to_string(), + }) + } + }; + if response + .get("success") + .and_then(Value::as_bool) + .is_some_and(|success| !success) + { + return Err(SandboxError::StreamError { + message: format!("pi get_state failed: {response}"), + }); + } + let session_value = response.get("data").unwrap_or(&response); + session_value + .get("sessionId") + .or_else(|| session_value.get("session_id")) + .and_then(Value::as_str) + .ok_or_else(|| SandboxError::StreamError { + message: "pi get_state response missing session id".to_string(), + })? + .to_string() + } else { + let session_value = response.get("data").unwrap_or(&response); + session_value + .get("sessionId") + .or_else(|| session_value.get("session_id")) + .and_then(Value::as_str) + .ok_or_else(|| SandboxError::StreamError { + message: "pi new_session response missing session id".to_string(), + })? + .to_string() + }; + + server.register_session(native_session_id.clone(), session_id.to_string()); + + Ok(native_session_id) + } + Ok(Err(_)) => Err(SandboxError::StreamError { + message: "pi new_session request cancelled".to_string(), + }), + Err(_) => Err(SandboxError::StreamError { + message: "pi new_session request timed out".to_string(), + }), + } + } + + async fn send_pi_prompt( + self: &Arc, + session: &SessionSnapshot, + prompt: &str, + ) -> Result<(), SandboxError> { + let server = self.ensure_pi_server().await?; + let native_session_id = + session + .native_session_id + .as_ref() + .ok_or_else(|| SandboxError::InvalidRequest { + message: "missing Pi session id".to_string(), + })?; + if server.dialect() == PiRpcDialect::CodingAgent { + if let Some(current) = server.current_session_id() { + if current != session.session_id { + return Err(SandboxError::InvalidRequest { + message: "pi-coding-agent supports a single active session; prompt must target the current session" + .to_string(), + }); + } + } + } + + let id = server.next_request_id(); + let request = match server.dialect() { + PiRpcDialect::Mono => json!({ + "type": "command", + "id": id, + "command": "prompt", + "params": { + "sessionId": native_session_id, + "message": { + "role": "user", + "content": [{ "type": "text", "text": prompt }] + } + } + }), + PiRpcDialect::CodingAgent => json!({ + "type": "prompt", + "id": id, + "message": prompt + }), + }; + + server + .send_request(id, &request) + .ok_or_else(|| SandboxError::StreamError { + message: "failed to send pi prompt request".to_string(), + })?; + + Ok(()) + } + + async fn record_pi_unparsed(&self, session_id: Option, error: &str, raw: Value) { + if let Some(session_id) = session_id { + let _ = self + .record_conversions(&session_id, vec![agent_unparsed("pi", error, raw)]) + .await; + return; + } + let session_ids = { + let sessions = self.server_manager.sessions.lock().await; + sessions + .get(&AgentId::Pi) + .cloned() + .unwrap_or_default() + .into_iter() + .collect::>() + }; + for session_id in session_ids { + let _ = self + .record_conversions(&session_id, vec![agent_unparsed("pi", error, raw.clone())]) + .await; + } + } + async fn fetch_opencode_modes(&self) -> Result, SandboxError> { let base_url = self.ensure_opencode_server().await?; let endpoints = [ @@ -3933,12 +4635,13 @@ async fn reply_permission( Ok(StatusCode::NO_CONTENT) } -fn all_agents() -> [AgentId; 5] { +fn all_agents() -> [AgentId; 6] { [ AgentId::Claude, AgentId::Codex, AgentId::Opencode, AgentId::Amp, + AgentId::Pi, AgentId::Mock, ] } @@ -3948,7 +4651,7 @@ fn all_agents() -> [AgentId; 5] { fn agent_supports_resume(agent: AgentId) -> bool { matches!( agent, - AgentId::Claude | AgentId::Amp | AgentId::Opencode | AgentId::Codex + AgentId::Claude | AgentId::Amp | AgentId::Opencode | AgentId::Codex | AgentId::Pi ) } @@ -4040,6 +4743,26 @@ fn agent_capabilities_for(agent: AgentId) -> AgentCapabilities { item_started: false, shared_process: false, // per-turn subprocess with --continue }, + AgentId::Pi => AgentCapabilities { + plan_mode: false, + permissions: false, + questions: false, + tool_calls: true, + tool_results: true, + text_messages: true, + images: true, + file_attachments: false, + session_lifecycle: false, + error_events: true, + reasoning: true, + status: true, + command_execution: false, + file_changes: false, + mcp_tools: false, + streaming_deltas: true, + item_started: true, + shared_process: true, // shared stdio RPC + }, AgentId::Mock => AgentCapabilities { plan_mode: true, permissions: true, @@ -4117,6 +4840,11 @@ fn agent_modes_for(agent: AgentId) -> Vec { name: "Build".to_string(), description: "Default build mode".to_string(), }], + AgentId::Pi => vec![AgentModeInfo { + id: "build".to_string(), + name: "Build".to_string(), + description: "Default build mode".to_string(), + }], AgentId::Mock => vec![ AgentModeInfo { id: "build".to_string(), @@ -4160,6 +4888,14 @@ fn normalize_agent_mode(agent: AgentId, agent_mode: Option<&str>) -> Result match mode { + "build" => Ok("build".to_string()), + value => Err(SandboxError::ModeNotSupported { + agent: agent.as_str().to_string(), + mode: value.to_string(), + } + .into()), + }, AgentId::Mock => match mode { "build" | "plan" => Ok(mode.to_string()), value => Err(SandboxError::ModeNotSupported { @@ -4217,6 +4953,7 @@ fn normalize_permission_mode( AgentId::Codex => matches!(mode, "default" | "plan" | "bypass"), AgentId::Amp => matches!(mode, "default" | "bypass"), AgentId::Opencode => matches!(mode, "default"), + AgentId::Pi => matches!(mode, "default"), AgentId::Mock => matches!(mode, "default" | "plan" | "bypass"), }; if !supported { @@ -5002,6 +5739,11 @@ fn parse_agent_line(agent: AgentId, line: &str, session_id: &str) -> Vec vec![agent_unparsed("amp", &err.to_string(), value)], }, + AgentId::Pi => match serde_json::from_value(value.clone()) { + Ok(event) => convert_pi::event_to_universal(&event) + .unwrap_or_else(|err| vec![agent_unparsed("pi", &err, value)]), + Err(err) => vec![agent_unparsed("pi", &err.to_string(), value)], + }, AgentId::Mock => vec![agent_unparsed( "mock", "mock agent does not parse streaming output", @@ -5042,6 +5784,28 @@ fn extract_opencode_session_id(value: &Value) -> Option { None } +fn extract_pi_session_id(value: &Value) -> Option { + if let Some(id) = value.get("sessionId").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = value.get("session_id").and_then(Value::as_str) { + return Some(id.to_string()); + } + if let Some(id) = extract_nested_string(value, &["session", "id"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(value, &["message", "sessionId"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(value, &["message", "session_id"]) { + return Some(id); + } + if let Some(id) = extract_nested_string(value, &["params", "sessionId"]) { + return Some(id); + } + None +} + fn extract_nested_string(value: &Value, path: &[&str]) -> Option { let mut current = value; for key in path { @@ -5167,7 +5931,7 @@ pub mod test_utils { .insert( agent, ManagedServer { - kind: ManagedServerKind::Stdio { server }, + kind: ManagedServerKind::StdioCodex { server }, child: child.clone(), status: ServerStatus::Running, start_time: Some(Instant::now()), diff --git a/server/packages/sandbox-agent/src/telemetry.rs b/server/packages/sandbox-agent/src/telemetry.rs index 6ff221b..20d0633 100644 --- a/server/packages/sandbox-agent/src/telemetry.rs +++ b/server/packages/sandbox-agent/src/telemetry.rs @@ -10,6 +10,8 @@ use serde::Serialize; use time::OffsetDateTime; use tokio::time::Instant; +use crate::http_client; + const TELEMETRY_URL: &str = "https://tc.rivet.dev"; const TELEMETRY_ENV_DEBUG: &str = "SANDBOX_AGENT_TELEMETRY_DEBUG"; const TELEMETRY_ID_FILE: &str = "telemetry_id"; @@ -77,7 +79,7 @@ pub fn log_enabled_message() { pub fn spawn_telemetry_task() { tokio::spawn(async move { - let client = match Client::builder() + let client = match http_client::client_builder() .timeout(Duration::from_millis(TELEMETRY_TIMEOUT_MS)) .build() { diff --git a/server/packages/sandbox-agent/tests/agent-flows/mod.rs b/server/packages/sandbox-agent/tests/agent-flows/mod.rs index 9b11b7c..41a57ca 100644 --- a/server/packages/sandbox-agent/tests/agent-flows/mod.rs +++ b/server/packages/sandbox-agent/tests/agent-flows/mod.rs @@ -5,3 +5,4 @@ mod agent_permission_flow; mod agent_question_flow; mod agent_termination; mod agent_tool_flow; +mod pi_rpc_integration; diff --git a/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs b/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs new file mode 100644 index 0000000..630c373 --- /dev/null +++ b/server/packages/sandbox-agent/tests/agent-flows/pi_rpc_integration.rs @@ -0,0 +1,61 @@ +// Pi RPC integration tests (gated via SANDBOX_TEST_PI + PATH). +include!("../common/http.rs"); + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn pi_rpc_session_and_stream() { + let configs = match test_agents_from_env() { + Ok(configs) => configs, + Err(err) => { + eprintln!("Skipping Pi RPC integration test: {err}"); + return; + } + }; + let Some(config) = configs.iter().find(|config| config.agent == AgentId::Pi) else { + return; + }; + + let app = TestApp::new(); + let _guard = apply_credentials(&config.credentials); + install_agent(&app.app, config.agent).await; + + let session_id = "pi-rpc-session".to_string(); + let (status, payload) = send_json( + &app.app, + Method::POST, + &format!("/v1/sessions/{session_id}"), + Some(json!({ + "agent": "pi", + "permissionMode": test_permission_mode(AgentId::Pi), + })), + ) + .await; + assert_eq!(status, StatusCode::OK, "create pi session"); + let native_session_id = payload + .get("native_session_id") + .and_then(Value::as_str) + .unwrap_or(""); + assert!( + !native_session_id.is_empty(), + "expected native_session_id for pi session" + ); + + let events = read_turn_stream_events(&app.app, &session_id, Duration::from_secs(120)).await; + assert!(!events.is_empty(), "no events from pi stream"); + assert!( + !events.iter().any(is_unparsed_event), + "agent.unparsed event encountered" + ); + + let mut last_sequence = 0u64; + for event in events { + let sequence = event + .get("sequence") + .and_then(Value::as_u64) + .expect("missing sequence"); + assert!( + sequence > last_sequence, + "sequence did not increase (prev {last_sequence}, next {sequence})" + ); + last_sequence = sequence; + } +} diff --git a/server/packages/sandbox-agent/tests/agent-management/agents.rs b/server/packages/sandbox-agent/tests/agent-management/agents.rs index ab1e7ae..42e3e7f 100644 --- a/server/packages/sandbox-agent/tests/agent-management/agents.rs +++ b/server/packages/sandbox-agent/tests/agent-management/agents.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::env; use sandbox_agent_agent_management::agents::{ AgentError, AgentId, AgentManager, InstallOptions, SpawnOptions, @@ -29,6 +30,29 @@ fn prompt_ok(label: &str) -> String { format!("Respond with exactly the text {label} and nothing else.") } +fn pi_tests_enabled() -> bool { + env::var("SANDBOX_TEST_PI") + .map(|value| { + let value = value.trim().to_ascii_lowercase(); + value == "1" || value == "true" || value == "yes" + }) + .unwrap_or(false) +} + +fn pi_on_path() -> bool { + let binary = AgentId::Pi.binary_name(); + let path_var = match env::var_os("PATH") { + Some(path) => path, + None => return false, + }; + for path in env::split_paths(&path_var) { + if path.join(binary).exists() { + return true; + } + } + false +} + #[test] fn test_agents_install_version_spawn() -> Result<(), Box> { let temp_dir = tempfile::tempdir()?; @@ -36,12 +60,15 @@ fn test_agents_install_version_spawn() -> Result<(), Box> let env = build_env(); assert!(!env.is_empty(), "expected credentials to be available"); - let agents = [ + let mut agents = vec![ AgentId::Claude, AgentId::Codex, AgentId::Opencode, AgentId::Amp, ]; + if pi_tests_enabled() && pi_on_path() { + agents.push(AgentId::Pi); + } for agent in agents { let install = manager.install(agent, InstallOptions::default())?; assert!(install.path.exists(), "expected install for {agent}"); diff --git a/server/packages/sandbox-agent/tests/common/http.rs b/server/packages/sandbox-agent/tests/common/http.rs index 8910e62..6ff9ff4 100644 --- a/server/packages/sandbox-agent/tests/common/http.rs +++ b/server/packages/sandbox-agent/tests/common/http.rs @@ -178,7 +178,7 @@ async fn install_agent(app: &Router, agent: AgentId) { /// while other agents support "bypass" which skips tool approval. fn test_permission_mode(agent: AgentId) -> &'static str { match agent { - AgentId::Opencode => "default", + AgentId::Opencode | AgentId::Pi => "default", _ => "bypass", } } diff --git a/server/packages/sandbox-agent/tests/common/mod.rs b/server/packages/sandbox-agent/tests/common/mod.rs index ea98b47..92bf266 100644 --- a/server/packages/sandbox-agent/tests/common/mod.rs +++ b/server/packages/sandbox-agent/tests/common/mod.rs @@ -182,7 +182,7 @@ pub async fn create_session_with_mode( pub fn test_permission_mode(agent: AgentId) -> &'static str { match agent { - AgentId::Opencode => "default", + AgentId::Opencode | AgentId::Pi => "default", _ => "bypass", } } diff --git a/server/packages/universal-agent-schema/src/agents/mod.rs b/server/packages/universal-agent-schema/src/agents/mod.rs index 8108098..458b91d 100644 --- a/server/packages/universal-agent-schema/src/agents/mod.rs +++ b/server/packages/universal-agent-schema/src/agents/mod.rs @@ -2,3 +2,4 @@ pub mod amp; pub mod claude; pub mod codex; pub mod opencode; +pub mod pi; diff --git a/server/packages/universal-agent-schema/src/agents/pi.rs b/server/packages/universal-agent-schema/src/agents/pi.rs new file mode 100644 index 0000000..ef91071 --- /dev/null +++ b/server/packages/universal-agent-schema/src/agents/pi.rs @@ -0,0 +1,674 @@ +use std::collections::{HashMap, HashSet}; + +use serde_json::Value; + +use crate::pi as schema; +use crate::{ + ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus, + ReasoningVisibility, UniversalEventData, UniversalEventType, UniversalItem, +}; + +#[derive(Default)] +pub struct PiEventConverter { + tool_result_buffers: HashMap, + tool_result_started: HashSet, + message_errors: HashSet, + message_reasoning: HashMap, + message_text: HashMap, + last_message_id: Option, + message_started: HashSet, + message_counter: u64, +} + +impl PiEventConverter { + fn next_synthetic_message_id(&mut self) -> String { + self.message_counter += 1; + format!("pi_msg_{}", self.message_counter) + } + + fn ensure_message_id(&mut self, message_id: Option) -> String { + if let Some(id) = message_id { + self.last_message_id = Some(id.clone()); + return id; + } + if let Some(id) = self.last_message_id.clone() { + return id; + } + let id = self.next_synthetic_message_id(); + self.last_message_id = Some(id.clone()); + id + } + + fn ensure_message_started(&mut self, message_id: &str) -> Option { + if !self.message_started.insert(message_id.to_string()) { + return None; + } + let item = UniversalItem { + item_id: String::new(), + native_item_id: Some(message_id.to_string()), + parent_id: None, + kind: ItemKind::Message, + role: Some(ItemRole::Assistant), + content: Vec::new(), + status: ItemStatus::InProgress, + }; + Some( + EventConversion::new( + UniversalEventType::ItemStarted, + UniversalEventData::Item(ItemEventData { item }), + ) + .synthetic(), + ) + } + + fn clear_last_message_id(&mut self, message_id: Option<&str>) { + if message_id.is_none() || self.last_message_id.as_deref() == message_id { + self.last_message_id = None; + } + } + + pub fn event_to_universal( + &mut self, + event: &schema::RpcEvent, + ) -> Result, String> { + let raw = serde_json::to_value(event).map_err(|err| err.to_string())?; + let event_type = raw + .get("type") + .and_then(Value::as_str) + .ok_or_else(|| "missing event type".to_string())?; + let native_session_id = extract_session_id(&raw); + + let conversions = match event_type { + "message_start" => self.message_start(&raw), + "message_update" => self.message_update(&raw), + "message_end" => self.message_end(&raw), + "tool_execution_start" => self.tool_execution_start(&raw), + "tool_execution_update" => self.tool_execution_update(&raw), + "tool_execution_end" => self.tool_execution_end(&raw), + "agent_start" + | "agent_end" + | "turn_start" + | "turn_end" + | "auto_compaction" + | "auto_compaction_start" + | "auto_compaction_end" + | "auto_retry" + | "auto_retry_start" + | "auto_retry_end" + | "hook_error" => Ok(vec![status_event(event_type, &raw)]), + "extension_ui_request" | "extension_ui_response" | "extension_error" => { + Ok(vec![status_event(event_type, &raw)]) + } + other => Err(format!("unsupported Pi event type: {other}")), + }?; + + Ok(conversions + .into_iter() + .map(|conversion| attach_metadata(conversion, &native_session_id, &raw)) + .collect()) + } + + fn message_start(&mut self, raw: &Value) -> Result, String> { + let message = raw.get("message"); + if is_user_role(message) { + return Ok(Vec::new()); + } + let message_id = self.ensure_message_id(extract_message_id(raw)); + self.message_started.insert(message_id.clone()); + let content = message.and_then(parse_message_content).unwrap_or_default(); + let entry = self.message_text.entry(message_id.clone()).or_default(); + for part in &content { + if let ContentPart::Text { text } = part { + entry.push_str(text); + } + } + let item = UniversalItem { + item_id: String::new(), + native_item_id: Some(message_id), + parent_id: None, + kind: ItemKind::Message, + role: Some(ItemRole::Assistant), + content, + status: ItemStatus::InProgress, + }; + Ok(vec![EventConversion::new( + UniversalEventType::ItemStarted, + UniversalEventData::Item(ItemEventData { item }), + )]) + } + + fn message_update(&mut self, raw: &Value) -> Result, String> { + let assistant_event = raw + .get("assistantMessageEvent") + .or_else(|| raw.get("assistant_message_event")) + .ok_or_else(|| "missing assistantMessageEvent".to_string())?; + let event_type = assistant_event + .get("type") + .and_then(Value::as_str) + .unwrap_or(""); + let message_id = extract_message_id(raw) + .or_else(|| extract_message_id(assistant_event)) + .or_else(|| self.last_message_id.clone()); + + match event_type { + "start" => { + if let Some(id) = message_id { + self.last_message_id = Some(id); + } + Ok(Vec::new()) + } + "text_start" | "text_delta" | "text_end" => { + let Some(delta) = extract_delta_text(assistant_event) else { + return Ok(Vec::new()); + }; + let message_id = self.ensure_message_id(message_id); + let entry = self.message_text.entry(message_id.clone()).or_default(); + entry.push_str(&delta); + let mut conversions = Vec::new(); + if let Some(start) = self.ensure_message_started(&message_id) { + conversions.push(start); + } + conversions.push(item_delta(Some(message_id), delta)); + Ok(conversions) + } + "thinking_start" | "thinking_delta" | "thinking_end" => { + let Some(delta) = extract_delta_text(assistant_event) else { + return Ok(Vec::new()); + }; + let message_id = self.ensure_message_id(message_id); + let entry = self + .message_reasoning + .entry(message_id.clone()) + .or_default(); + entry.push_str(&delta); + let mut conversions = Vec::new(); + if let Some(start) = self.ensure_message_started(&message_id) { + conversions.push(start); + } + conversions.push(item_delta(Some(message_id), delta)); + Ok(conversions) + } + "toolcall_start" + | "toolcall_delta" + | "toolcall_end" + | "toolcall_args_start" + | "toolcall_args_delta" + | "toolcall_args_end" => Ok(Vec::new()), + "done" => { + let message_id = self.ensure_message_id(message_id); + if self.message_errors.remove(&message_id) { + self.message_text.remove(&message_id); + self.message_reasoning.remove(&message_id); + self.message_started.remove(&message_id); + self.clear_last_message_id(Some(&message_id)); + return Ok(Vec::new()); + } + let message = raw + .get("message") + .or_else(|| assistant_event.get("message")); + let conversion = self.complete_message(Some(message_id.clone()), message); + self.clear_last_message_id(Some(&message_id)); + Ok(vec![conversion]) + } + "error" => { + let message_id = self.ensure_message_id(message_id); + let error_text = assistant_event + .get("error") + .or_else(|| raw.get("error")) + .map(value_to_string) + .unwrap_or_else(|| "Pi message error".to_string()); + self.message_reasoning.remove(&message_id); + self.message_text.remove(&message_id); + self.message_errors.insert(message_id.clone()); + self.message_started.remove(&message_id); + self.clear_last_message_id(Some(&message_id)); + let item = UniversalItem { + item_id: String::new(), + native_item_id: Some(message_id), + parent_id: None, + kind: ItemKind::Message, + role: Some(ItemRole::Assistant), + content: vec![ContentPart::Text { text: error_text }], + status: ItemStatus::Failed, + }; + Ok(vec![EventConversion::new( + UniversalEventType::ItemCompleted, + UniversalEventData::Item(ItemEventData { item }), + )]) + } + other => Err(format!("unsupported assistantMessageEvent: {other}")), + } + } + + fn message_end(&mut self, raw: &Value) -> Result, String> { + let message = raw.get("message"); + if is_user_role(message) { + return Ok(Vec::new()); + } + let message_id = self + .ensure_message_id(extract_message_id(raw).or_else(|| self.last_message_id.clone())); + if self.message_errors.remove(&message_id) { + self.message_text.remove(&message_id); + self.message_reasoning.remove(&message_id); + self.message_started.remove(&message_id); + self.clear_last_message_id(Some(&message_id)); + return Ok(Vec::new()); + } + let conversion = self.complete_message(Some(message_id.clone()), message); + self.clear_last_message_id(Some(&message_id)); + Ok(vec![conversion]) + } + + fn complete_message( + &mut self, + message_id: Option, + message: Option<&Value>, + ) -> EventConversion { + let mut content = message.and_then(parse_message_content).unwrap_or_default(); + + if let Some(id) = message_id.clone() { + if content.is_empty() { + if let Some(text) = self.message_text.remove(&id) { + if !text.is_empty() { + content.push(ContentPart::Text { text }); + } + } + } else { + self.message_text.remove(&id); + } + + if let Some(reasoning) = self.message_reasoning.remove(&id) { + if !reasoning.trim().is_empty() + && !content + .iter() + .any(|part| matches!(part, ContentPart::Reasoning { .. })) + { + content.push(ContentPart::Reasoning { + text: reasoning, + visibility: ReasoningVisibility::Private, + }); + } + } + self.message_started.remove(&id); + } + + let item = UniversalItem { + item_id: String::new(), + native_item_id: message_id, + parent_id: None, + kind: ItemKind::Message, + role: Some(ItemRole::Assistant), + content, + status: ItemStatus::Completed, + }; + EventConversion::new( + UniversalEventType::ItemCompleted, + UniversalEventData::Item(ItemEventData { item }), + ) + } + + fn tool_execution_start(&mut self, raw: &Value) -> Result, String> { + let tool_call_id = + extract_tool_call_id(raw).ok_or_else(|| "missing toolCallId".to_string())?; + let tool_name = extract_tool_name(raw).unwrap_or_else(|| "tool".to_string()); + let arguments = raw + .get("args") + .or_else(|| raw.get("arguments")) + .map(value_to_string) + .unwrap_or_else(|| "{}".to_string()); + let item = UniversalItem { + item_id: String::new(), + native_item_id: Some(tool_call_id.clone()), + parent_id: None, + kind: ItemKind::ToolCall, + role: Some(ItemRole::Assistant), + content: vec![ContentPart::ToolCall { + name: tool_name, + arguments, + call_id: tool_call_id, + }], + status: ItemStatus::InProgress, + }; + Ok(vec![EventConversion::new( + UniversalEventType::ItemStarted, + UniversalEventData::Item(ItemEventData { item }), + )]) + } + + fn tool_execution_update(&mut self, raw: &Value) -> Result, String> { + let tool_call_id = match extract_tool_call_id(raw) { + Some(id) => id, + None => return Ok(Vec::new()), + }; + let partial = match raw + .get("partialResult") + .or_else(|| raw.get("partial_result")) + { + Some(value) => value_to_string(value), + None => return Ok(Vec::new()), + }; + let prior = self + .tool_result_buffers + .get(&tool_call_id) + .cloned() + .unwrap_or_default(); + let delta = delta_from_partial(&prior, &partial); + self.tool_result_buffers + .insert(tool_call_id.clone(), partial); + + let mut conversions = Vec::new(); + if self.tool_result_started.insert(tool_call_id.clone()) { + let item = UniversalItem { + item_id: String::new(), + native_item_id: Some(tool_call_id.clone()), + parent_id: None, + kind: ItemKind::ToolResult, + role: Some(ItemRole::Tool), + content: vec![ContentPart::ToolResult { + call_id: tool_call_id.clone(), + output: String::new(), + }], + status: ItemStatus::InProgress, + }; + conversions.push( + EventConversion::new( + UniversalEventType::ItemStarted, + UniversalEventData::Item(ItemEventData { item }), + ) + .synthetic(), + ); + } + + if !delta.is_empty() { + conversions.push( + EventConversion::new( + UniversalEventType::ItemDelta, + UniversalEventData::ItemDelta(ItemDeltaData { + item_id: String::new(), + native_item_id: Some(tool_call_id.clone()), + delta, + }), + ) + .synthetic(), + ); + } + + Ok(conversions) + } + + fn tool_execution_end(&mut self, raw: &Value) -> Result, String> { + let tool_call_id = + extract_tool_call_id(raw).ok_or_else(|| "missing toolCallId".to_string())?; + self.tool_result_buffers.remove(&tool_call_id); + self.tool_result_started.remove(&tool_call_id); + + let output = raw + .get("result") + .and_then(extract_result_content) + .unwrap_or_default(); + let is_error = raw.get("isError").and_then(Value::as_bool).unwrap_or(false); + let item = UniversalItem { + item_id: String::new(), + native_item_id: Some(tool_call_id.clone()), + parent_id: None, + kind: ItemKind::ToolResult, + role: Some(ItemRole::Tool), + content: vec![ContentPart::ToolResult { + call_id: tool_call_id, + output, + }], + status: if is_error { + ItemStatus::Failed + } else { + ItemStatus::Completed + }, + }; + Ok(vec![EventConversion::new( + UniversalEventType::ItemCompleted, + UniversalEventData::Item(ItemEventData { item }), + )]) + } +} + +pub fn event_to_universal(event: &schema::RpcEvent) -> Result, String> { + PiEventConverter::default().event_to_universal(event) +} + +fn attach_metadata( + conversion: EventConversion, + native_session_id: &Option, + raw: &Value, +) -> EventConversion { + conversion + .with_native_session(native_session_id.clone()) + .with_raw(Some(raw.clone())) +} + +fn status_event(label: &str, raw: &Value) -> EventConversion { + let detail = raw + .get("error") + .or_else(|| raw.get("message")) + .map(value_to_string); + let item = UniversalItem { + item_id: String::new(), + native_item_id: None, + parent_id: None, + kind: ItemKind::Status, + role: Some(ItemRole::System), + content: vec![ContentPart::Status { + label: format!("pi.{label}"), + detail, + }], + status: ItemStatus::Completed, + }; + EventConversion::new( + UniversalEventType::ItemCompleted, + UniversalEventData::Item(ItemEventData { item }), + ) +} + +fn item_delta(message_id: Option, delta: String) -> EventConversion { + EventConversion::new( + UniversalEventType::ItemDelta, + UniversalEventData::ItemDelta(ItemDeltaData { + item_id: String::new(), + native_item_id: message_id, + delta, + }), + ) +} + +fn is_user_role(message: Option<&Value>) -> bool { + message + .and_then(|msg| msg.get("role")) + .and_then(Value::as_str) + .is_some_and(|role| role == "user") +} + +fn extract_session_id(value: &Value) -> Option { + extract_string(value, &["sessionId"]) + .or_else(|| extract_string(value, &["session_id"])) + .or_else(|| extract_string(value, &["session", "id"])) + .or_else(|| extract_string(value, &["message", "sessionId"])) +} + +fn extract_message_id(value: &Value) -> Option { + extract_string(value, &["messageId"]) + .or_else(|| extract_string(value, &["message_id"])) + .or_else(|| extract_string(value, &["message", "id"])) + .or_else(|| extract_string(value, &["message", "messageId"])) + .or_else(|| extract_string(value, &["assistantMessageEvent", "messageId"])) +} + +fn extract_tool_call_id(value: &Value) -> Option { + extract_string(value, &["toolCallId"]).or_else(|| extract_string(value, &["tool_call_id"])) +} + +fn extract_tool_name(value: &Value) -> Option { + extract_string(value, &["toolName"]).or_else(|| extract_string(value, &["tool_name"])) +} + +fn extract_string(value: &Value, path: &[&str]) -> Option { + let mut current = value; + for key in path { + current = current.get(*key)?; + } + current.as_str().map(|value| value.to_string()) +} + +fn extract_delta_text(event: &Value) -> Option { + if let Some(value) = event.get("delta") { + return Some(value_to_string(value)); + } + if let Some(value) = event.get("text") { + return Some(value_to_string(value)); + } + if let Some(value) = event.get("partial") { + return extract_text_from_value(value); + } + if let Some(value) = event.get("content") { + return extract_text_from_value(value); + } + None +} + +fn extract_text_from_value(value: &Value) -> Option { + if let Some(text) = value.as_str() { + return Some(text.to_string()); + } + if let Some(text) = value.get("text").and_then(Value::as_str) { + return Some(text.to_string()); + } + if let Some(text) = value.get("content").and_then(Value::as_str) { + return Some(text.to_string()); + } + None +} + +fn extract_result_content(value: &Value) -> Option { + let content = value.get("content").and_then(Value::as_str); + let text = value.get("text").and_then(Value::as_str); + content + .or(text) + .map(|value| value.to_string()) + .or_else(|| Some(value_to_string(value))) +} + +fn parse_message_content(message: &Value) -> Option> { + if let Some(text) = message.as_str() { + return Some(vec![ContentPart::Text { + text: text.to_string(), + }]); + } + let content_value = message + .get("content") + .or_else(|| message.get("text")) + .or_else(|| message.get("value"))?; + let mut parts = Vec::new(); + match content_value { + Value::String(text) => parts.push(ContentPart::Text { text: text.clone() }), + Value::Array(items) => { + for item in items { + if let Some(part) = content_part_from_value(item) { + parts.push(part); + } + } + } + Value::Object(_) => { + if let Some(part) = content_part_from_value(content_value) { + parts.push(part); + } + } + _ => {} + } + Some(parts) +} + +fn content_part_from_value(value: &Value) -> Option { + if let Some(text) = value.as_str() { + return Some(ContentPart::Text { + text: text.to_string(), + }); + } + let part_type = value.get("type").and_then(Value::as_str); + match part_type { + Some("text") | Some("markdown") => { + extract_text_from_value(value).map(|text| ContentPart::Text { text }) + } + Some("thinking") | Some("reasoning") => { + extract_text_from_value(value).map(|text| ContentPart::Reasoning { + text, + visibility: ReasoningVisibility::Private, + }) + } + Some("image") => value + .get("path") + .or_else(|| value.get("url")) + .and_then(|path| { + path.as_str().map(|path| ContentPart::Image { + path: path.to_string(), + mime: value + .get("mime") + .or_else(|| value.get("mimeType")) + .and_then(Value::as_str) + .map(|mime| mime.to_string()), + }) + }), + Some("tool_call") | Some("toolcall") => { + let name = value + .get("name") + .and_then(Value::as_str) + .unwrap_or("tool") + .to_string(); + let arguments = value + .get("arguments") + .or_else(|| value.get("args")) + .map(value_to_string) + .unwrap_or_else(|| "{}".to_string()); + let call_id = value + .get("call_id") + .or_else(|| value.get("callId")) + .and_then(Value::as_str) + .unwrap_or_default() + .to_string(); + Some(ContentPart::ToolCall { + name, + arguments, + call_id, + }) + } + Some("tool_result") => { + let call_id = value + .get("call_id") + .or_else(|| value.get("callId")) + .and_then(Value::as_str) + .unwrap_or_default() + .to_string(); + let output = value + .get("output") + .or_else(|| value.get("content")) + .map(value_to_string) + .unwrap_or_default(); + Some(ContentPart::ToolResult { call_id, output }) + } + _ => Some(ContentPart::Json { + json: value.clone(), + }), + } +} + +fn value_to_string(value: &Value) -> String { + if let Some(text) = value.as_str() { + text.to_string() + } else { + value.to_string() + } +} + +fn delta_from_partial(previous: &str, next: &str) -> String { + if next.starts_with(previous) { + next[previous.len()..].to_string() + } else { + next.to_string() + } +} diff --git a/server/packages/universal-agent-schema/src/lib.rs b/server/packages/universal-agent-schema/src/lib.rs index f4735f0..7307eef 100644 --- a/server/packages/universal-agent-schema/src/lib.rs +++ b/server/packages/universal-agent-schema/src/lib.rs @@ -3,13 +3,13 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use utoipa::ToSchema; -pub use sandbox_agent_extracted_agent_schemas::{amp, claude, codex, opencode}; +pub use sandbox_agent_extracted_agent_schemas::{amp, claude, codex, opencode, pi}; pub mod agents; pub use agents::{ - amp as convert_amp, claude as convert_claude, codex as convert_codex, - opencode as convert_opencode, + amp as convert_amp, claude as convert_claude, codex as convert_codex, opencode as convert_opencode, + pi as convert_pi, }; #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] @@ -204,7 +204,7 @@ pub enum ItemKind { Unknown, } -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, ToSchema)] #[serde(rename_all = "snake_case")] pub enum ItemRole { User, @@ -213,7 +213,7 @@ pub enum ItemRole { Tool, } -#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, ToSchema)] #[serde(rename_all = "snake_case")] pub enum ItemStatus { InProgress, diff --git a/server/packages/universal-agent-schema/tests/pi_conversion.rs b/server/packages/universal-agent-schema/tests/pi_conversion.rs new file mode 100644 index 0000000..a59d137 --- /dev/null +++ b/server/packages/universal-agent-schema/tests/pi_conversion.rs @@ -0,0 +1,264 @@ +use sandbox_agent_universal_agent_schema::convert_pi::PiEventConverter; +use sandbox_agent_universal_agent_schema::pi as pi_schema; +use sandbox_agent_universal_agent_schema::{ + ContentPart, ItemKind, ItemRole, ItemStatus, UniversalEventData, UniversalEventType, +}; +use serde_json::json; + +fn parse_event(value: serde_json::Value) -> pi_schema::RpcEvent { + serde_json::from_value(value).expect("pi event") +} + +#[test] +fn pi_message_flow_converts() { + let mut converter = PiEventConverter::default(); + + let start_event = parse_event(json!({ + "type": "message_start", + "sessionId": "session-1", + "messageId": "msg-1", + "message": { + "role": "assistant", + "content": [{ "type": "text", "text": "Hello" }] + } + })); + let start_events = converter + .event_to_universal(&start_event) + .expect("start conversions"); + assert_eq!(start_events[0].event_type, UniversalEventType::ItemStarted); + if let UniversalEventData::Item(item) = &start_events[0].data { + assert_eq!(item.item.kind, ItemKind::Message); + assert_eq!(item.item.role, Some(ItemRole::Assistant)); + assert_eq!(item.item.status, ItemStatus::InProgress); + } else { + panic!("expected item event"); + } + + let update_event = parse_event(json!({ + "type": "message_update", + "sessionId": "session-1", + "messageId": "msg-1", + "assistantMessageEvent": { "type": "text_delta", "delta": " world" } + })); + let update_events = converter + .event_to_universal(&update_event) + .expect("update conversions"); + assert_eq!(update_events[0].event_type, UniversalEventType::ItemDelta); + + let end_event = parse_event(json!({ + "type": "message_end", + "sessionId": "session-1", + "messageId": "msg-1", + "message": { + "role": "assistant", + "content": [{ "type": "text", "text": "Hello world" }] + } + })); + let end_events = converter + .event_to_universal(&end_event) + .expect("end conversions"); + assert_eq!(end_events[0].event_type, UniversalEventType::ItemCompleted); + if let UniversalEventData::Item(item) = &end_events[0].data { + assert_eq!(item.item.kind, ItemKind::Message); + assert_eq!(item.item.role, Some(ItemRole::Assistant)); + assert_eq!(item.item.status, ItemStatus::Completed); + } else { + panic!("expected item event"); + } +} + +#[test] +fn pi_user_message_echo_is_skipped() { + let mut converter = PiEventConverter::default(); + + // Pi may echo the user message as a message_start with role "user". + // The daemon already records synthetic user events, so the converter + // must skip these to avoid a duplicate assistant-looking bubble. + let start_event = parse_event(json!({ + "type": "message_start", + "sessionId": "session-1", + "messageId": "user-msg-1", + "message": { + "role": "user", + "content": [{ "type": "text", "text": "hello!" }] + } + })); + let events = converter + .event_to_universal(&start_event) + .expect("user message_start should not error"); + assert!( + events.is_empty(), + "user message_start should produce no events, got {}", + events.len() + ); + + let end_event = parse_event(json!({ + "type": "message_end", + "sessionId": "session-1", + "messageId": "user-msg-1", + "message": { + "role": "user", + "content": [{ "type": "text", "text": "hello!" }] + } + })); + let events = converter + .event_to_universal(&end_event) + .expect("user message_end should not error"); + assert!( + events.is_empty(), + "user message_end should produce no events, got {}", + events.len() + ); + + // A subsequent assistant message should still work normally. + let assistant_start = parse_event(json!({ + "type": "message_start", + "sessionId": "session-1", + "messageId": "msg-1", + "message": { + "role": "assistant", + "content": [{ "type": "text", "text": "Hello! How can I help?" }] + } + })); + let events = converter + .event_to_universal(&assistant_start) + .expect("assistant message_start"); + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_type, UniversalEventType::ItemStarted); + if let UniversalEventData::Item(item) = &events[0].data { + assert_eq!(item.item.role, Some(ItemRole::Assistant)); + } else { + panic!("expected item event"); + } +} + +#[test] +fn pi_tool_execution_converts_with_partial_deltas() { + let mut converter = PiEventConverter::default(); + + let start_event = parse_event(json!({ + "type": "tool_execution_start", + "sessionId": "session-1", + "toolCallId": "call-1", + "toolName": "bash", + "args": { "command": "ls" } + })); + let start_events = converter + .event_to_universal(&start_event) + .expect("tool start"); + assert_eq!(start_events[0].event_type, UniversalEventType::ItemStarted); + if let UniversalEventData::Item(item) = &start_events[0].data { + assert_eq!(item.item.kind, ItemKind::ToolCall); + assert_eq!(item.item.role, Some(ItemRole::Assistant)); + match &item.item.content[0] { + ContentPart::ToolCall { name, .. } => assert_eq!(name, "bash"), + _ => panic!("expected tool call content"), + } + } + + let update_event = parse_event(json!({ + "type": "tool_execution_update", + "sessionId": "session-1", + "toolCallId": "call-1", + "partialResult": "foo" + })); + let update_events = converter + .event_to_universal(&update_event) + .expect("tool update"); + assert!(update_events + .iter() + .any(|event| event.event_type == UniversalEventType::ItemDelta)); + + let update_event2 = parse_event(json!({ + "type": "tool_execution_update", + "sessionId": "session-1", + "toolCallId": "call-1", + "partialResult": "foobar" + })); + let update_events2 = converter + .event_to_universal(&update_event2) + .expect("tool update 2"); + let delta = update_events2 + .iter() + .find_map(|event| match &event.data { + UniversalEventData::ItemDelta(data) => Some(data.delta.clone()), + _ => None, + }) + .unwrap_or_default(); + assert_eq!(delta, "bar"); + + let end_event = parse_event(json!({ + "type": "tool_execution_end", + "sessionId": "session-1", + "toolCallId": "call-1", + "result": { "type": "text", "content": "done" }, + "isError": false + })); + let end_events = converter.event_to_universal(&end_event).expect("tool end"); + assert_eq!(end_events[0].event_type, UniversalEventType::ItemCompleted); + if let UniversalEventData::Item(item) = &end_events[0].data { + assert_eq!(item.item.kind, ItemKind::ToolResult); + assert_eq!(item.item.role, Some(ItemRole::Tool)); + match &item.item.content[0] { + ContentPart::ToolResult { output, .. } => assert_eq!(output, "done"), + _ => panic!("expected tool result content"), + } + } +} + +#[test] +fn pi_unknown_event_returns_error() { + let mut converter = PiEventConverter::default(); + let event = parse_event(json!({ + "type": "unknown_event", + "sessionId": "session-1" + })); + assert!(converter.event_to_universal(&event).is_err()); +} + +#[test] +fn pi_message_done_completes_without_message_end() { + let mut converter = PiEventConverter::default(); + + let start_event = parse_event(json!({ + "type": "message_start", + "sessionId": "session-1", + "messageId": "msg-1", + "message": { + "role": "assistant", + "content": [{ "type": "text", "text": "Hello" }] + } + })); + let _start_events = converter + .event_to_universal(&start_event) + .expect("start conversions"); + + let update_event = parse_event(json!({ + "type": "message_update", + "sessionId": "session-1", + "messageId": "msg-1", + "assistantMessageEvent": { "type": "text_delta", "delta": " world" } + })); + let _update_events = converter + .event_to_universal(&update_event) + .expect("update conversions"); + + let done_event = parse_event(json!({ + "type": "message_update", + "sessionId": "session-1", + "messageId": "msg-1", + "assistantMessageEvent": { "type": "done" } + })); + let done_events = converter + .event_to_universal(&done_event) + .expect("done conversions"); + assert_eq!(done_events[0].event_type, UniversalEventType::ItemCompleted); + if let UniversalEventData::Item(item) = &done_events[0].data { + assert_eq!(item.item.status, ItemStatus::Completed); + assert!( + matches!(item.item.content.get(0), Some(ContentPart::Text { text }) if text == "Hello world") + ); + } else { + panic!("expected item event"); + } +}