feat: support Pi harness (#121)

This commit is contained in:
Nathan Flurry 2026-02-10 22:27:07 -08:00
commit a33b1323ff
48 changed files with 4460 additions and 118 deletions

View file

@ -8,7 +8,7 @@ edition = "2021"
authors = [ "Rivet Gaming, LLC <developer@rivet.gg>" ]
license = "Apache-2.0"
repository = "https://github.com/rivet-dev/sandbox-agent"
description = "Universal API for automatic coding agents in sandboxes. Supports Claude Code, Codex, OpenCode, and Amp."
description = "Universal API for automatic coding agents in sandboxes. Supports Claude Code, Codex, OpenCode, Cursor, Amp, and Pi."
[workspace.dependencies]
# Internal crates

View file

@ -5,7 +5,7 @@
<h3 align="center">Run Coding Agents in Sandboxes. Control Them Over HTTP.</h3>
<p align="center">
A server that runs inside your sandbox. Your app connects remotely to control Claude Code, Codex, OpenCode, Cursor, or Amp — streaming events, handling permissions, managing sessions.
A server that runs inside your sandbox. Your app connects remotely to control Claude Code, Codex, OpenCode, Cursor, Amp, or Pi — streaming events, handling permissions, managing sessions.
</p>
<p align="center">
@ -24,13 +24,13 @@ Sandbox Agent solves three problems:
1. **Coding agents need sandboxes** — You can't let AI execute arbitrary code on your production servers. Coding agents need isolated environments, but existing SDKs assume local execution. Sandbox Agent is a server that runs inside the sandbox and exposes HTTP/SSE.
2. **Every coding agent is different** — Claude Code, Codex, OpenCode, Cursor, and Amp each have proprietary APIs, event formats, and behaviors. Swapping agents means rewriting your integration. Sandbox Agent provides one HTTP API — write your code once, swap agents with a config change.
2. **Every coding agent is different** — Claude Code, Codex, OpenCode, Cursor, Amp, and Pi each have proprietary APIs, event formats, and behaviors. Swapping agents means rewriting your integration. Sandbox Agent provides one HTTP API — write your code once, swap agents with a config change.
3. **Sessions are ephemeral** — Agent transcripts live in the sandbox. When the process ends, you lose everything. Sandbox Agent streams events in a universal schema to your storage. Persist to Postgres, ClickHouse, or [Rivet](https://rivet.dev). Replay later, audit everything.
## Features
- **Universal Agent API**: Single interface to control Claude Code, Codex, OpenCode, Cursor, and Amp with full feature coverage
- **Universal Agent API**: Single interface to control Claude Code, Codex, OpenCode, Cursor, Amp, and Pi with full feature coverage
- **Streaming Events**: Real-time SSE stream of everything the agent does — tool calls, permission requests, file edits, and more
- **Universal Session Schema**: [Standardized schema](https://sandboxagent.dev/docs/session-transcript-schema) that normalizes all agent event formats for storage and replay
- **Human-in-the-Loop**: Approve or deny tool executions and answer agent questions remotely over HTTP
@ -234,7 +234,7 @@ No, they're complementary. AI SDK is for building chat interfaces and calling LL
<details>
<summary><strong>Which coding agents are supported?</strong></summary>
Claude Code, Codex, OpenCode, Cursor, and Amp. The SDK normalizes their APIs so you can swap between them without changing your code.
Claude Code, Codex, OpenCode, Cursor, Amp, and Pi. The SDK normalizes their APIs so you can swap between them without changing your code.
</details>
<details>
@ -258,7 +258,7 @@ The server is a single Rust binary that runs anywhere with a curl install. If yo
<details>
<summary><strong>Can I use this with my personal API keys?</strong></summary>
Yes. Use `sandbox-agent credentials extract-env` to extract API keys from your local agent configs (Claude Code, Codex, OpenCode, Amp) and pass them to the sandbox environment.
Yes. Use `sandbox-agent credentials extract-env` to extract API keys from your local agent configs (Claude Code, Codex, OpenCode, Amp, Pi) and pass them to the sandbox environment.
</details>
<details>

View file

@ -138,7 +138,7 @@ sandbox-agent credentials extract [OPTIONS]
| Option | Description |
|--------|-------------|
| `-a, --agent <AGENT>` | Filter by agent (`claude`, `codex`, `opencode`, `amp`) |
| `-a, --agent <AGENT>` | Filter by agent (`claude`, `codex`, `opencode`, `amp`, `pi`) |
| `-p, --provider <PROVIDER>` | Filter by provider (`anthropic`, `openai`) |
| `-d, --home-dir <DIR>` | Custom home directory for credential search |
| `-r, --reveal` | Show full credential values (default: redacted) |

View file

@ -4,14 +4,14 @@ Source of truth: generated agent schemas in `resources/agent-schemas/artifacts/j
Identifiers
+----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+
| Universal term | Claude | Codex (app-server) | OpenCode | Amp |
+----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+
| session_id | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) |
| native_session_id | none | threadId | sessionID | none |
| item_id | synthetic | ThreadItem.id | Message.id | StreamJSONMessage.id |
| native_item_id | none | ThreadItem.id | Message.id | StreamJSONMessage.id |
+----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+
+----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+----------------------+
| Universal term | Claude | Codex (app-server) | OpenCode | Amp | Pi (RPC) |
+----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+----------------------+
| session_id | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) | n/a (daemon-only) |
| native_session_id | none | threadId | sessionID | none | sessionId |
| item_id | synthetic | ThreadItem.id | Message.id | StreamJSONMessage.id | messageId/toolCallId |
| native_item_id | none | ThreadItem.id | Message.id | StreamJSONMessage.id | messageId/toolCallId |
+----------------------+------------------------+------------------------------------------+-----------------------------+------------------------+----------------------+
Notes:
- When a provider does not supply IDs (Claude), we synthesize item_id values and keep native_item_id null.
@ -22,26 +22,41 @@ Notes:
- opt-in via `include_raw=true` on events endpoints (HTTP + SSE).
- If parsing fails, emit agent.unparsed (source=daemon, synthetic=true). Tests must assert zero unparsed events.
Runtime model by agent
| Agent | Runtime model | Notes |
|---|---|---|
| Claude | Per-message subprocess streaming | Routed through `AgentManager::spawn_streaming` with Claude stream-json stdin. |
| Amp | Per-message subprocess streaming | Routed through `AgentManager::spawn_streaming` with parsed JSONL output. |
| Codex | Shared app-server (stdio JSON-RPC) | One shared server process, daemon sessions map to Codex thread IDs. |
| OpenCode | Shared HTTP server + SSE | One shared HTTP server, daemon sessions map to OpenCode session IDs. |
| Pi | Dedicated per-session RPC process | Canonical path is router-managed Pi runtime (`pi --mode rpc`), one process per daemon session. |
Pi runtime contract:
- Session/message lifecycle for Pi must stay on router-managed per-session RPC runtime.
- `AgentManager::spawn(Pi)` is kept for one-shot utility/testing flows.
- `AgentManager::spawn_streaming(Pi)` is intentionally unsupported.
Events / Message Flow
+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+
| Universal term | Claude | Codex (app-server) | OpenCode | Amp |
+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+
| session.started | none | method=thread/started | type=session.created | none |
| session.ended | SDKMessage.type=result | no explicit session end (turn/completed) | no explicit session end (session.deleted)| type=done |
| turn.started | synthetic on message send | method=turn/started | type=session.status (busy) | synthetic on message send |
| turn.ended | synthetic after result | method=turn/completed | type=session.idle | synthetic on done |
| message (user) | SDKMessage.type=user | item/completed (ThreadItem.type=userMessage)| message.updated (Message.role=user) | type=message |
| message (assistant) | SDKMessage.type=assistant | item/completed (ThreadItem.type=agentMessage)| message.updated (Message.role=assistant)| type=message |
| message.delta | stream_event (partial) or synthetic | method=item/agentMessage/delta | type=message.part.updated (text-part delta) | synthetic |
| tool call | type=tool_use | method=item/mcpToolCall/progress | message.part.updated (part.type=tool) | type=tool_call |
| tool result | user.message.content.tool_result | item/completed (tool result ThreadItem variants) | message.part.updated (part.type=tool, state=completed) | type=tool_result |
| permission.requested | control_request.can_use_tool | none | type=permission.asked | none |
| permission.resolved | daemon reply to can_use_tool | none | type=permission.replied | none |
| question.requested | tool_use (AskUserQuestion) | experimental request_user_input (payload) | type=question.asked | none |
| question.resolved | tool_result (AskUserQuestion) | experimental request_user_input (payload) | type=question.replied / question.rejected | none |
| error | SDKResultMessage.error | method=error | type=session.error (or message error) | type=error |
+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+
+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+----------------------------+
| Universal term | Claude | Codex (app-server) | OpenCode | Amp | Pi (RPC) |
+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+----------------------------+
| session.started | none | method=thread/started | type=session.created | none | none |
| session.ended | SDKMessage.type=result | no explicit session end (turn/completed) | no explicit session end (session.deleted)| type=done | none (daemon synthetic) |
| turn.started | synthetic on message send | method=turn/started | type=session.status (busy) | synthetic on message send | none (daemon synthetic) |
| turn.ended | synthetic after result | method=turn/completed | type=session.idle | synthetic on done | none (daemon synthetic) |
| message (user) | SDKMessage.type=user | item/completed (ThreadItem.type=userMessage)| message.updated (Message.role=user) | type=message | none (daemon synthetic) |
| message (assistant) | SDKMessage.type=assistant | item/completed (ThreadItem.type=agentMessage)| message.updated (Message.role=assistant)| type=message | message_start/message_end |
| message.delta | stream_event (partial) or synthetic | method=item/agentMessage/delta | type=message.part.updated (text-part delta) | synthetic | message_update (text_delta/thinking_delta) |
| tool call | type=tool_use | method=item/mcpToolCall/progress | message.part.updated (part.type=tool) | type=tool_call | tool_execution_start |
| tool result | user.message.content.tool_result | item/completed (tool result ThreadItem variants) | message.part.updated (part.type=tool, state=completed) | type=tool_result | tool_execution_end |
| permission.requested | control_request.can_use_tool | none | type=permission.asked | none | none |
| permission.resolved | daemon reply to can_use_tool | none | type=permission.replied | none | none |
| question.requested | tool_use (AskUserQuestion) | experimental request_user_input (payload) | type=question.asked | none | none |
| question.resolved | tool_result (AskUserQuestion) | experimental request_user_input (payload) | type=question.replied / question.rejected | none | none |
| error | SDKResultMessage.error | method=error | type=session.error (or message error) | type=error | hook_error (status item) |
+------------------------+------------------------------+--------------------------------------------+-----------------------------------------+----------------------------------+----------------------------+
Permission status normalization:
- `permission.requested` uses `status=requested`.
@ -72,6 +87,7 @@ Delta handling
- Codex emits agent message and other deltas (e.g., item/agentMessage/delta).
- OpenCode emits part deltas via message.part.updated with a delta string.
- Claude can emit stream_event deltas when partial streaming is enabled; Amp does not emit deltas.
- Pi emits message_update deltas and cumulative tool_execution_update partialResult values (we diff to produce deltas).
Policy:
- Emit item.delta for streamable text content across providers.
@ -89,3 +105,9 @@ Message normalization notes
- OpenCode unrolling: message.updated creates/updates the parent message item; tool-related parts emit separate tool item events (item.started/ item.completed) with parent_id pointing to the message item.
- If a message.part.updated arrives before message.updated, we create a stub item.started (source=daemon) so deltas have a parent.
- Tool calls/results are always emitted as separate tool items to keep behavior consistent across agents.
- If Pi message_update events omit messageId, we synthesize a stable message id and emit a synthetic item.started before the first delta so streaming text stays grouped.
- Pi auto_compaction_start/auto_compaction_end and auto_retry_start/auto_retry_end events are mapped to status items (label `pi.*`).
- Pi extension_ui_request/extension_error events are mapped to status items.
- Pi RPC from pi-coding-agent does not include sessionId in events; each daemon session owns a dedicated Pi RPC process, so events are routed by runtime ownership (parallel sessions supported).
- PI `variant` maps directly to PI RPC `set_thinking_level.level` before prompts are sent.
- PI remains source of truth for thinking-level constraints: unsupported levels (including non-reasoning models and model-specific limits such as `xhigh`) are PI-native clamped or rejected.

View file

@ -34,6 +34,7 @@ console.log(url);
- **Send messages**: Post messages to a session directly from the UI
- **Agent selection**: Switch between agents and modes
- **Request log**: View raw HTTP requests and responses for debugging
- **Pi concurrent sessions**: Pi sessions run concurrently by default via per-session runtime processes
## When to Use

View file

@ -2,7 +2,7 @@
"openapi": "3.0.3",
"info": {
"title": "sandbox-agent",
"description": "Universal API for automatic coding agents in sandboxes. Supports Claude Code, Codex, OpenCode, and Amp.",
"description": "Universal API for automatic coding agents in sandboxes. Supports Claude Code, Codex, OpenCode, Cursor, Amp, and Pi.",
"contact": {
"name": "Rivet Gaming, LLC",
"email": "developer@rivet.gg"

210
docs/pi-support-plan.md Normal file
View file

@ -0,0 +1,210 @@
# Pi Agent Support Plan (pi-mono)
## Implementation Status Update
- Runtime selection now supports two internal modes:
- `PerSession` (default for unknown/non-allowlisted Pi capabilities)
- `Shared` (allowlist-only compatibility path)
- Pi sessions now use per-session process isolation by default, enabling true concurrent Pi sessions in Inspector and API clients.
- Shared Pi server code remains available and is used only when capability checks allow multiplexing.
- Session termination for per-session Pi mode hard-kills the underlying Pi process and clears queued prompts/pending waiters.
- In-session concurrent sends are serialized with an unbounded daemon-side FIFO queue per session.
## Investigation Summary
### Pi CLI modes and RPC protocol
- Pi supports multiple modes including interactive, print/JSON output, RPC, and SDK usage. JSON mode outputs a stream of JSON events suitable for parsing, and RPC mode is intended for programmatic control over stdin/stdout.
- RPC mode is started with `pi --mode rpc` and supports options like `--provider`, `--model`, `--no-session`, and `--session-dir`.
- The RPC protocol is newline-delimited JSON over stdin/stdout:
- Commands are JSON objects written to stdin.
- Responses are JSON objects with `type: "response"` and optional `id`.
- Events are JSON objects without `id`.
- `prompt` can include images using `ImageContent` (base64 or URL) alongside text.
- JSON/print mode (`pi -p` or `pi --print --mode json`) produces JSONL for non-interactive parsing and can resume sessions with a token.
### RPC commands
RPC commands listed in `rpc.md` include:
- `new_session`, `get_state`, `list_sessions`, `delete_session`, `rename_session`, `clear_session`
- `prompt`, `queue_message`, `abort`, `get_queued_messages`
### RPC event types
RPC events listed in `rpc.md` include:
- `agent_start`, `agent_end`
- `turn_start`, `turn_end`
- `message_start`, `message_update`, `message_end`
- `tool_execution_start`, `tool_execution_update`, `tool_execution_end`
- `auto_compaction`, `auto_retry`, `hook_error`
`message_update` uses `assistantMessageEvent` deltas such as:
- `start`, `text_start`, `text_delta`, `text_end`
- `thinking_start`, `thinking_delta`, `thinking_end`
- `toolcall_start`, `toolcall_delta`, `toolcall_end`
- `toolcall_args_start`, `toolcall_args_delta`, `toolcall_args_end`
- `done`, `error`
`tool_execution_update` includes `partialResult`, which is described as accumulated output so far.
### Schema source locations (pi-mono)
RPC types are documented as living in:
- `packages/ai/src/types.ts` (Model types)
- `packages/agent/src/types.ts` (AgentResponse types)
- `packages/coding-agent/src/core/messages.ts` (message types)
- `packages/coding-agent/src/modes/rpc/rpc-types.ts` (RPC protocol types)
### Distribution assets
Pi releases provide platform-specific binaries such as:
- `pi-darwin-arm64`, `pi-darwin-x64`
- `pi-linux-arm64`, `pi-linux-x64`
- `pi-win-x64.zip`
## Integration Decisions
- Follow the OpenCode pattern: a shared long-running process (stdio RPC) with session multiplexing.
- Primary integration path is RPC streaming (`pi --mode rpc`).
- JSON/print mode is a fallback only (diagnostics or non-interactive runs).
- Create sessions via `new_session`; store the returned `sessionId` as `native_session_id`.
- Use `get_state` as a re-sync path after server restarts.
- Use `prompt` for send-message, with optional image content.
- Convert Pi events into universal events; emit daemon synthetic `session.started` on session creation and `session.ended` only on errors/termination.
## Implementation Plan
### 1) Agent Identity + Capabilities
Files:
- `server/packages/agent-management/src/agents.rs`
- `server/packages/sandbox-agent/src/router.rs`
- `docs/cli.mdx`, `docs/conversion.mdx`, `docs/session-transcript-schema.mdx`
- `README.md`, `frontend/packages/website/src/components/FAQ.tsx`
Tasks:
- Add `AgentId::Pi` with string/binary name `"pi"` and parsing rules.
- Add Pi to `all_agents()` and agent lists.
- Define `AgentCapabilities` for Pi:
- `tool_calls=true`, `tool_results=true`
- `text_messages=true`, `streaming_deltas=true`, `item_started=true`
- `reasoning=true` (from `thinking_*` deltas)
- `images=true` (ImageContent in `prompt`)
- `permissions=false`, `questions=false`, `mcp_tools=false`
- `shared_process=true`, `session_lifecycle=false` (no native session events)
- `error_events=true` (hook_error)
- `command_execution=false`, `file_changes=false`, `file_attachments=false`
### 2) Installer and Binary Resolution
Files:
- `server/packages/agent-management/src/agents.rs`
Tasks:
- Add `install_pi()` that:
- Downloads the correct release asset per platform (`pi-<platform>`).
- Handles `.zip` on Windows and raw binaries elsewhere.
- Marks binary executable.
- Add Pi to `AgentManager::install`, `is_installed`, `version`.
- Version detection: try `--version`, `version`, `-V`.
### 3) Schema Extraction for Pi
Files:
- `resources/agent-schemas/src/pi.ts` (new)
- `resources/agent-schemas/src/index.ts`
- `resources/agent-schemas/artifacts/json-schema/pi.json`
- `server/packages/extracted-agent-schemas/build.rs`
- `server/packages/extracted-agent-schemas/src/lib.rs`
Tasks:
- Implement `extractPiSchema()`:
- Download pi-mono sources (zip/tarball) into a temp dir.
- Use `ts-json-schema-generator` against `packages/coding-agent/src/modes/rpc/rpc-types.ts`.
- Include dependent files per `rpc.md` (ai/types, agent/types, core/messages).
- Extract `RpcEvent`, `RpcResponse`, `RpcCommand` unions (exact type names from source).
- Add fallback schema if remote fetch fails (minimal union with event/response fields).
- Wire pi into extractor index and artifact generation.
### 4) Universal Schema Conversion (Pi -> Universal)
Files:
- `server/packages/universal-agent-schema/src/agents/pi.rs` (new)
- `server/packages/universal-agent-schema/src/agents/mod.rs`
- `server/packages/universal-agent-schema/src/lib.rs`
- `server/packages/sandbox-agent/src/router.rs`
Mapping rules:
- `message_start` -> `item.started` (kind=message, role=assistant, native_item_id=messageId)
- `message_update`:
- `text_*` -> `item.delta` (assistant text delta)
- `thinking_*` -> `item.delta` with `ContentPart::Reasoning` (visibility=Private)
- `toolcall_*` and `toolcall_args_*` -> ignore for now (tool_execution_* is authoritative)
- `error` -> `item.completed` with `ItemStatus::Failed` (if no later message_end)
- `message_end` -> `item.completed` (finalize assistant message)
- `tool_execution_start` -> `item.started` (kind=tool_call, ContentPart::ToolCall)
- `tool_execution_update` -> `item.delta` for a synthetic tool_result item:
- Maintain a per-toolCallId buffer to compute delta from accumulated `partialResult`.
- `tool_execution_end` -> `item.completed` (kind=tool_result, output from `result.content`)
- If `isError=true`, set item status to failed.
- `agent_start`, `turn_start`, `turn_end`, `agent_end`, `auto_compaction`, `auto_retry`, `hook_error`:
- Map to `ItemKind::Status` with a label like `pi.agent_start`, `pi.auto_retry`, etc.
- Do not emit `session.ended` for these events.
- If event parsing fails, emit `agent.unparsed` (source=daemon, synthetic=true) and fail tests.
### 5) Shared RPC Server Integration
Files:
- `server/packages/sandbox-agent/src/router.rs`
Tasks:
- Add a new managed stdio server type for Pi, similar to Codex:
- Create `PiServer` struct with:
- stdin sender
- pending request map keyed by request id
- per-session native session id mapping
- Extend `ManagedServerKind` to include Pi.
- Add `ensure_pi_server()` and `spawn_pi_server()` using `pi --mode rpc`.
- Add a `handle_pi_server_output()` loop to parse stdout lines into events/responses.
- Session creation:
- On `create_session`, ensure Pi server is running, send `new_session`, store sessionId.
- Register session with `server_manager.register_session` for native mapping.
- Sending messages:
- Use `prompt` command; include sessionId and optional images.
- Emit synthetic `item.started` only if Pi does not emit `message_start`.
### 6) Router + Streaming Path Changes
Files:
- `server/packages/sandbox-agent/src/router.rs`
Tasks:
- Add Pi handling to:
- `create_session` (new_session)
- `send_message` (prompt)
- `parse_agent_line` (Pi event conversion)
- `agent_modes` (default to `default` unless Pi exposes a mode list)
- `agent_supports_resume` (true if Pi supports session resume)
### 7) Tests
Files:
- `server/packages/sandbox-agent/tests/...`
- `server/packages/universal-agent-schema/tests/...` (if present)
Tasks:
- Unit tests for conversion:
- `message_start/update/end` -> item.started/delta/completed
- `tool_execution_*` -> tool call/result mapping with partialResult delta
- failure -> agent.unparsed
- Integration tests:
- Start Pi RPC server, create session, send prompt, stream events.
- Validate `native_session_id` mapping and event ordering.
- Update HTTP/SSE test coverage to include Pi agent if relevant.
## Risk Areas / Edge Cases
- `tool_execution_update.partialResult` is cumulative; must compute deltas.
- `message_update` may emit `done`/`error` without `message_end`; handle both paths.
- No native session lifecycle events; rely on daemon synthetic events.
- Session recovery after RPC server restart requires `get_state` + re-register sessions.
## Acceptance Criteria
- Pi appears in `/v1/agents`, CLI list, and docs.
- `create_session` returns `native_session_id` from Pi `new_session`.
- Streaming prompt yields universal events with proper ordering:
- message -> item.started/delta/completed
- tool execution -> tool call + tool result
- Tests pass and no synthetic data is used in test fixtures.
## Sources
- https://upd.dev/badlogic/pi-mono/src/commit/d36e0ea07303d8a76d51b4a7bd5f0d6d3c490860/packages/coding-agent/docs/rpc.md
- https://buildwithpi.ai/pi-cli
- https://takopi.dev/docs/pi-cli/
- https://upd.dev/badlogic/pi-mono/releases

View file

@ -11,30 +11,31 @@ The schema is defined in [OpenAPI format](https://github.com/rivet-dev/sandbox-a
This table shows which agent feature coverage appears in the universal event stream. All agents retain their full native feature coverage—this only reflects what's normalized into the schema.
| Feature | Claude | Codex | OpenCode | Amp |
|--------------------|:------:|:-----:|:------------:|:------------:|
| Stability | Stable | Stable| Experimental | Experimental |
| Text Messages | ✓ | ✓ | ✓ | ✓ |
| Tool Calls | ✓ | ✓ | ✓ | ✓ |
| Tool Results | ✓ | ✓ | ✓ | ✓ |
| Questions (HITL) | ✓ | | ✓ | |
| Permissions (HITL) | ✓ | ✓ | ✓ | - |
| Images | - | ✓ | ✓ | - |
| File Attachments | - | ✓ | ✓ | - |
| Session Lifecycle | - | ✓ | ✓ | - |
| Error Events | - | ✓ | ✓ | ✓ |
| Reasoning/Thinking | - | ✓ | - | - |
| Command Execution | - | ✓ | - | - |
| File Changes | - | ✓ | - | - |
| MCP Tools | ✓ | ✓ | ✓ | ✓ |
| Streaming Deltas | ✓ | ✓ | ✓ | - |
| Variants | | ✓ | ✓ | ✓ |
| Feature | Claude | Codex | OpenCode | Amp | Pi (RPC) |
|--------------------|:------:|:-----:|:------------:|:------------:|:------------:|
| Stability | Stable | Stable| Experimental | Experimental | Experimental |
| Text Messages | ✓ | ✓ | ✓ | ✓ | ✓ |
| Tool Calls | ✓ | ✓ | ✓ | ✓ | ✓ |
| Tool Results | ✓ | ✓ | ✓ | ✓ | ✓ |
| Questions (HITL) | ✓ | | ✓ | | |
| Permissions (HITL) | ✓ | ✓ | ✓ | - | |
| Images | - | ✓ | ✓ | - | ✓ |
| File Attachments | - | ✓ | ✓ | - | |
| Session Lifecycle | - | ✓ | ✓ | - | |
| Error Events | - | ✓ | ✓ | ✓ | ✓ |
| Reasoning/Thinking | - | ✓ | - | - | ✓ |
| Command Execution | - | ✓ | - | - | |
| File Changes | - | ✓ | - | - | |
| MCP Tools | ✓ | ✓ | ✓ | ✓ | |
| Streaming Deltas | ✓ | ✓ | ✓ | - | ✓ |
| Variants | | ✓ | ✓ | ✓ | ✓ |
Agents: [Claude Code](https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview) · [Codex](https://github.com/openai/codex) · [OpenCode](https://github.com/opencode-ai/opencode) · [Amp](https://ampcode.com)
Agents: [Claude Code](https://docs.anthropic.com/en/docs/agents-and-tools/claude-code/overview) · [Codex](https://github.com/openai/codex) · [OpenCode](https://github.com/opencode-ai/opencode) · [Amp](https://ampcode.com) · [Pi](https://buildwithpi.ai/pi-cli)
- ✓ = Appears in session events
- \- = Agent supports natively, schema conversion coming soon
- (blank) = Not supported by agent
- Pi runtime model is router-managed per-session RPC (`pi --mode rpc`); it does not use generic subprocess streaming.
<AccordionGroup>
<Accordion title="Text Messages">

View file

@ -22,7 +22,7 @@ import type { RequestLog } from "./types/requestLog";
import { buildCurl } from "./utils/http";
const logoUrl = `${import.meta.env.BASE_URL}logos/sandboxagent.svg`;
const defaultAgents = ["claude", "codex", "opencode", "amp", "mock"];
const defaultAgents = ["claude", "codex", "opencode", "amp", "pi", "mock"];
type ItemEventData = {
item: UniversalItem;
@ -939,6 +939,7 @@ export default function App() {
codex: "Codex",
opencode: "OpenCode",
amp: "Amp",
pi: "Pi",
mock: "Mock"
};
const agentLabel = agentDisplayNames[agentId] ?? agentId;

View file

@ -9,6 +9,7 @@ const agentLabels: Record<string, string> = {
codex: "Codex",
opencode: "OpenCode",
amp: "Amp",
pi: "Pi",
mock: "Mock"
};

View file

@ -0,0 +1,22 @@
<?xml version="1.0" encoding="UTF-8"?>
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 800 800">
<!-- P shape: outer boundary clockwise, inner hole counter-clockwise -->
<path fill="#fff" fill-rule="evenodd" d="
M165.29 165.29
H517.36
V400
H400
V517.36
H282.65
V634.72
H165.29
Z
M282.65 282.65
V400
H400
V282.65
Z
"/>
<!-- i dot -->
<path fill="#fff" d="M517.36 400 H634.72 V634.72 H517.36 Z"/>
</svg>

After

Width:  |  Height:  |  Size: 473 B

View file

@ -7,7 +7,7 @@ import { ArrowRight, Terminal, Check } from 'lucide-react';
const CTA_TITLES = [
'Run coding agents in sandboxes. Control them over HTTP.',
'A server inside your sandbox. An API for your app.',
'Claude Code, Codex, OpenCode, Amp — one HTTP API.',
'Claude Code, Codex, OpenCode, Amp, Pi — one HTTP API.',
'Your app connects remotely. The coding agent runs isolated.',
'Streaming events. Handling permissions. Managing sessions.',
'Install with curl. Connect over HTTP. Control any coding agent.',

View file

@ -13,7 +13,7 @@ const faqs = [
{
question: 'Which coding agents are supported?',
answer:
'Claude Code, Codex, OpenCode, and Amp. The SDK normalizes their APIs so you can swap between them without changing your code.',
'Claude Code, Codex, OpenCode, Amp, and Pi. The SDK normalizes their APIs so you can swap between them without changing your code.',
},
{
question: 'How is session data persisted?',
@ -33,7 +33,7 @@ const faqs = [
{
question: 'Can I use this with my personal API keys?',
answer:
"Yes. Use <code>sandbox-agent credentials extract-env</code> to extract API keys from your local agent configs (Claude Code, Codex, OpenCode, Amp) and pass them to the sandbox environment.",
"Yes. Use <code>sandbox-agent credentials extract-env</code> to extract API keys from your local agent configs (Claude Code, Codex, OpenCode, Amp, Pi) and pass them to the sandbox environment.",
},
{
question: 'Why Rust and not [language]?',

View file

@ -38,7 +38,7 @@ export function FeatureGrid() {
<h4 className="text-sm font-medium uppercase tracking-wider text-white">Universal Agent API</h4>
</div>
<p className="text-zinc-400 leading-relaxed text-lg max-w-2xl">
Claude Code, Codex, OpenCode, and Amp each have different APIs. We provide a single,
Claude Code, Codex, OpenCode, Amp, and Pi each have different APIs. We provide a single,
unified interface to control them all.
</p>
</div>

View file

@ -4,10 +4,11 @@ import { useState, useEffect } from 'react';
import { Terminal, Check, ArrowRight } from 'lucide-react';
const ADAPTERS = [
{ label: 'Claude Code', color: '#D97757', x: 35, y: 70, logo: '/logos/claude.svg' },
{ label: 'Codex', color: '#10A37F', x: 185, y: 70, logo: 'openai' },
{ label: 'Amp', color: '#F59E0B', x: 35, y: 155, logo: '/logos/amp.svg' },
{ label: 'OpenCode', color: '#8B5CF6', x: 185, y: 155, logo: 'opencode' },
{ label: 'Claude Code', color: '#D97757', x: 35, y: 30, logo: '/logos/claude.svg' },
{ label: 'Codex', color: '#10A37F', x: 185, y: 30, logo: 'openai' },
{ label: 'Amp', color: '#F59E0B', x: 35, y: 115, logo: '/logos/amp.svg' },
{ label: 'OpenCode', color: '#8B5CF6', x: 185, y: 115, logo: 'opencode' },
{ label: 'Pi', color: '#38BDF8', x: 110, y: 200, logo: '/logos/pi.svg' },
];
function UniversalAPIDiagram() {
@ -187,7 +188,7 @@ export function Hero() {
<span className="text-zinc-400">Control Them Over HTTP.</span>
</h1>
<p className="mt-6 text-lg text-zinc-500 leading-relaxed max-w-xl mx-auto lg:mx-0">
The Sandbox Agent SDK is a server that runs inside your sandbox. Your app connects remotely to control Claude Code, Codex, OpenCode, or Amp streaming events, handling permissions, managing sessions.
The Sandbox Agent SDK is a server that runs inside your sandbox. Your app connects remotely to control Claude Code, Codex, OpenCode, Amp, or Pi streaming events, handling permissions, managing sessions.
</p>
<div className="mt-10 flex flex-col items-center gap-4 sm:flex-row sm:justify-center lg:justify-start">

View file

@ -16,7 +16,7 @@ const frictions = [
number: '02',
title: 'Every Coding Agent is Different',
problem:
'Claude Code, Codex, OpenCode, and Amp each have proprietary APIs, event formats, and behaviors. Swapping coding agents means rewriting your entire integration.',
'Claude Code, Codex, OpenCode, Amp, and Pi each have proprietary APIs, event formats, and behaviors. Swapping coding agents means rewriting your entire integration.',
solution: 'One HTTP API. Write your code once, swap coding agents with a config change.',
accentColor: 'purple',
},

View file

@ -6,7 +6,7 @@ import { FeatureIcon } from './ui/FeatureIcon';
const problems = [
{
title: 'Universal Agent API',
desc: 'Claude Code, Codex, OpenCode, and Amp each have different APIs. We provide a single interface to control them all.',
desc: 'Claude Code, Codex, OpenCode, Amp, and Pi each have different APIs. We provide a single interface to control them all.',
icon: Workflow,
color: 'text-accent',
},

View file

@ -2,7 +2,7 @@
<img src="../.github/media/gigacode-header.jpeg" alt="Gigacode. Use OpenCode's UI with any coding agent." />
</p>
<h3 align="center">Supports Claude Code, Codex, and Amp.</h3>
<h3 align="center">Supports Claude Code, Codex, Pi, and Amp.</h3>
<p align="center">
<i>This is <u>not</u> a fork (and never will be).<br/>It's powered by <a href="https://sandboxagent.dev">Sandbox Agent SDK</a>'s wizardry.<br/>Experimental & just for fun.</i>
@ -19,23 +19,23 @@
┌─ Gigacode ────────────────────────────────────────────────────────┐
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ OpenCode TUI │───▶│ Sandbox Agent │───▶│ Claude Code / │ │
│ │ │ │ │ │ Codex / Amp │ │
│ │ │ │ │ │ Codex / Pi / Amp │ │
│ └─────────────────┘ └─────────────────┘ └─────────────────┘ │
└───────────────────────────────────────────────────────────────────┘
```
- [Sandbox Agent SDK](https://sandboxagent.dev) provides a universal HTTP API for controlling Claude Code, Codex, and Amp
- [Sandbox Agent SDK](https://sandboxagent.dev) provides a universal HTTP API for controlling Claude Code, Codex, Pi, and Amp
- Sandbox Agent SDK exposes an [OpenCode-compatible endpoint](https://sandboxagent.dev/docs/opencode-compatibility) so OpenCode can talk to any agent
- OpenCode connects to Sandbox Agent SDK via [`attach`](https://opencode.ai/docs/cli/#attach)
## OpenCode Models vs Gigacode Agents
- **OpenCode** supports **switching between inference providers** (Anthropic, OpenAI, etc.). This is OpenCode talking directly to the models with its own tools, system prompts, and agentic loop.
- **Gigacode** automates other coding agent harnesses, so it's using the **exact same logic that you would if you ran Claude Code**, Codex, or Amp natively.
- **Gigacode** automates other coding agent harnesses, so it's using the **exact same logic that you would if you ran Claude Code**, Codex, Pi, or Amp natively.
```
OpenCode (native): Model → OpenCode's tool loop → result
Gigacode: Model → Claude Code / Codex / Amp CLI → result
Gigacode: Model → Claude Code / Codex / Pi / Amp CLI → result
```
This means you get each agent's specialized capabilities (such as Claude Code's `Read`/`Write`/`Bash` tools, Codex's sandboxed execution, and Amp's permission rules) rather than a single tool loop with different models behind it.

View file

@ -0,0 +1,124 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"$id": "https://sandbox-agent/schemas/pi.json",
"title": "Pi RPC Schema",
"definitions": {
"RpcEvent": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"sessionId": {
"type": "string"
},
"messageId": {
"type": "string"
},
"message": {
"$ref": "#/definitions/RpcMessage"
},
"assistantMessageEvent": {
"$ref": "#/definitions/AssistantMessageEvent"
},
"toolCallId": {
"type": "string"
},
"toolName": {
"type": "string"
},
"args": {},
"partialResult": {},
"result": {
"$ref": "#/definitions/ToolResult"
},
"isError": {
"type": "boolean"
},
"error": {}
},
"required": [
"type"
]
},
"RpcMessage": {
"type": "object",
"properties": {
"role": {
"type": "string"
},
"content": {}
}
},
"AssistantMessageEvent": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"delta": {
"type": "string"
},
"content": {},
"partial": {},
"messageId": {
"type": "string"
}
}
},
"ToolResult": {
"type": "object",
"properties": {
"type": {
"type": "string"
},
"content": {
"type": "string"
},
"text": {
"type": "string"
}
}
},
"RpcResponse": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "response"
},
"id": {
"type": "integer"
},
"success": {
"type": "boolean"
},
"data": {},
"error": {}
},
"required": [
"type"
]
},
"RpcCommand": {
"type": "object",
"properties": {
"type": {
"type": "string",
"const": "command"
},
"id": {
"type": "integer"
},
"command": {
"type": "string"
},
"params": {}
},
"required": [
"type",
"command"
]
}
}
}

View file

@ -9,6 +9,7 @@
"extract:claude": "tsx src/index.ts --agent=claude",
"extract:codex": "tsx src/index.ts --agent=codex",
"extract:amp": "tsx src/index.ts --agent=amp",
"extract:pi": "tsx src/index.ts --agent=pi",
"extract:claude-events": "tsx src/claude-event-types.ts",
"extract:claude-events:sdk": "tsx src/claude-event-types-sdk.ts",
"extract:claude-events:cli": "tsx src/claude-event-types-cli.ts",

View file

@ -4,18 +4,20 @@ import { extractOpenCodeSchema } from "./opencode.js";
import { extractClaudeSchema } from "./claude.js";
import { extractCodexSchema } from "./codex.js";
import { extractAmpSchema } from "./amp.js";
import { extractPiSchema } from "./pi.js";
import { validateSchema, type NormalizedSchema } from "./normalize.js";
const RESOURCE_DIR = join(import.meta.dirname, "..");
const DIST_DIR = join(RESOURCE_DIR, "artifacts", "json-schema");
type AgentName = "opencode" | "claude" | "codex" | "amp";
type AgentName = "opencode" | "claude" | "codex" | "amp" | "pi";
const EXTRACTORS: Record<AgentName, () => Promise<NormalizedSchema>> = {
opencode: extractOpenCodeSchema,
claude: extractClaudeSchema,
codex: extractCodexSchema,
amp: extractAmpSchema,
pi: extractPiSchema,
};
function parseArgs(): { agents: AgentName[] } {

View file

@ -0,0 +1,191 @@
import { execSync } from "child_process";
import { existsSync, mkdtempSync, readdirSync, rmSync, writeFileSync } from "fs";
import { join } from "path";
import { tmpdir } from "os";
import { createGenerator, type Config } from "ts-json-schema-generator";
import { createNormalizedSchema, type NormalizedSchema } from "./normalize.js";
import type { JSONSchema7 } from "json-schema";
const PI_SOURCE_URL = "https://codeload.github.com/badlogic/pi-mono/tar.gz/refs/heads/main";
const RPC_TYPES_PATH = "packages/coding-agent/src/modes/rpc/rpc-types.ts";
const TARGET_TYPES = ["RpcEvent", "RpcResponse", "RpcCommand"] as const;
export async function extractPiSchema(): Promise<NormalizedSchema> {
console.log("Extracting Pi schema from pi-mono sources...");
const tempDir = mkdtempSync(join(tmpdir(), "pi-schema-"));
try {
const archivePath = join(tempDir, "pi-mono.tar.gz");
await downloadToFile(PI_SOURCE_URL, archivePath);
execSync(`tar -xzf "${archivePath}" -C "${tempDir}"`, {
stdio: ["ignore", "ignore", "ignore"],
});
const repoRoot = findRepoRoot(tempDir);
const rpcTypesPath = join(repoRoot, RPC_TYPES_PATH);
if (!existsSync(rpcTypesPath)) {
throw new Error(`rpc-types.ts not found at ${rpcTypesPath}`);
}
const tsconfig = resolveTsconfig(repoRoot);
const definitions = generateDefinitions(rpcTypesPath, tsconfig);
if (Object.keys(definitions).length === 0) {
console.log(" [warn] No schemas extracted from source, using fallback");
return createFallbackSchema();
}
console.log(` [ok] Extracted ${Object.keys(definitions).length} types from source`);
return createNormalizedSchema("pi", "Pi RPC Schema", definitions);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
console.log(` [warn] Pi schema extraction failed: ${errorMessage}`);
console.log(" [fallback] Using embedded schema definitions");
return createFallbackSchema();
} finally {
rmSync(tempDir, { recursive: true, force: true });
}
}
async function downloadToFile(url: string, filePath: string): Promise<void> {
const response = await fetch(url);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const buffer = Buffer.from(await response.arrayBuffer());
writeFileSync(filePath, buffer);
}
function findRepoRoot(root: string): string {
const entries = readdirSync(root, { withFileTypes: true }).filter((entry) => entry.isDirectory());
const repoDir = entries.find((entry) => entry.name.startsWith("pi-mono"));
if (!repoDir) {
throw new Error("pi-mono source directory not found after extraction");
}
return join(root, repoDir.name);
}
function resolveTsconfig(root: string): string | undefined {
const candidates = [
join(root, "tsconfig.json"),
join(root, "tsconfig.base.json"),
join(root, "packages", "coding-agent", "tsconfig.json"),
];
return candidates.find((path) => existsSync(path));
}
function generateDefinitions(
rpcTypesPath: string,
tsconfigPath?: string
): Record<string, JSONSchema7> {
const definitions: Record<string, JSONSchema7> = {};
for (const typeName of TARGET_TYPES) {
const config: Config = {
path: rpcTypesPath,
type: typeName,
expose: "all",
skipTypeCheck: false,
topRef: false,
...(tsconfigPath ? { tsconfig: tsconfigPath } : {}),
};
const schema = createGenerator(config).createSchema(typeName) as JSONSchema7;
mergeDefinitions(definitions, schema, typeName);
}
return definitions;
}
function mergeDefinitions(
target: Record<string, JSONSchema7>,
schema: JSONSchema7,
typeName: string
): void {
if (schema.definitions) {
for (const [name, def] of Object.entries(schema.definitions)) {
target[name] = def as JSONSchema7;
}
} else if (schema.$defs) {
for (const [name, def] of Object.entries(schema.$defs)) {
target[name] = def as JSONSchema7;
}
} else {
target[typeName] = schema;
}
if (!target[typeName]) {
target[typeName] = schema;
}
}
function createFallbackSchema(): NormalizedSchema {
const definitions: Record<string, JSONSchema7> = {
RpcEvent: {
type: "object",
properties: {
type: { type: "string" },
sessionId: { type: "string" },
messageId: { type: "string" },
message: { $ref: "#/definitions/RpcMessage" },
assistantMessageEvent: { $ref: "#/definitions/AssistantMessageEvent" },
toolCallId: { type: "string" },
toolName: { type: "string" },
args: {},
partialResult: {},
result: { $ref: "#/definitions/ToolResult" },
isError: { type: "boolean" },
error: {},
},
required: ["type"],
},
RpcMessage: {
type: "object",
properties: {
role: { type: "string" },
content: {},
},
},
AssistantMessageEvent: {
type: "object",
properties: {
type: { type: "string" },
delta: { type: "string" },
content: {},
partial: {},
messageId: { type: "string" },
},
},
ToolResult: {
type: "object",
properties: {
type: { type: "string" },
content: { type: "string" },
text: { type: "string" },
},
},
RpcResponse: {
type: "object",
properties: {
type: { type: "string", const: "response" },
id: { type: "integer" },
success: { type: "boolean" },
data: {},
error: {},
},
required: ["type"],
},
RpcCommand: {
type: "object",
properties: {
type: { type: "string", const: "command" },
id: { type: "integer" },
command: { type: "string" },
params: {},
},
required: ["type", "command"],
},
};
console.log(` [ok] Using fallback schema with ${Object.keys(definitions).length} definitions`);
return createNormalizedSchema("pi", "Pi RPC Schema", definitions);
}

View file

@ -7,7 +7,6 @@ use std::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command, ExitSta
use std::time::{Duration, Instant};
use flate2::read::GzDecoder;
use reqwest::blocking::Client;
use sandbox_agent_extracted_agent_schemas::codex as codex_schema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -21,6 +20,7 @@ pub enum AgentId {
Codex,
Opencode,
Amp,
Pi,
Cursor,
Mock,
}
@ -32,6 +32,7 @@ impl AgentId {
AgentId::Codex => "codex",
AgentId::Opencode => "opencode",
AgentId::Amp => "amp",
AgentId::Pi => "pi",
AgentId::Cursor => "cursor",
AgentId::Mock => "mock",
}
@ -43,6 +44,7 @@ impl AgentId {
AgentId::Codex => "codex",
AgentId::Opencode => "opencode",
AgentId::Amp => "amp",
AgentId::Pi => "pi",
AgentId::Cursor => "cursor-agent",
AgentId::Mock => "mock",
}
@ -54,6 +56,7 @@ impl AgentId {
"codex" => Some(AgentId::Codex),
"opencode" => Some(AgentId::Opencode),
"amp" => Some(AgentId::Amp),
"pi" => Some(AgentId::Pi),
"cursor" => Some(AgentId::Cursor),
"mock" => Some(AgentId::Mock),
_ => None,
@ -155,6 +158,7 @@ impl AgentManager {
install_opencode(&install_path, self.platform, options.version.as_deref())?
}
AgentId::Amp => install_amp(&install_path, self.platform, options.version.as_deref())?,
AgentId::Pi => install_pi(&install_path, self.platform, options.version.as_deref())?,
AgentId::Cursor => install_cursor(&install_path, self.platform, options.version.as_deref())?,
AgentId::Mock => {
if !install_path.exists() {
@ -302,6 +306,20 @@ impl AgentManager {
events,
});
}
AgentId::Pi => {
let output = spawn_pi(&path, &working_dir, &options)?;
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
let events = parse_jsonl_from_outputs(&stdout, &stderr);
return Ok(SpawnResult {
status: output.status,
stdout,
stderr,
session_id: extract_session_id(agent, &events),
result: extract_result_text(agent, &events),
events,
});
}
AgentId::Mock => {
return Err(AgentError::UnsupportedAgent {
agent: agent.as_str().to_string(),
@ -332,6 +350,15 @@ impl AgentManager {
agent: AgentId,
mut options: SpawnOptions,
) -> Result<StreamingSpawn, AgentError> {
// Pi sessions are intentionally handled by the router's dedicated RPC runtime
// (one process per daemon session), not by generic subprocess streaming.
if agent == AgentId::Pi {
return Err(AgentError::UnsupportedRuntimePath {
agent,
operation: "spawn_streaming",
recommended_path: "router-managed per-session RPC runtime",
});
}
let codex_options = if agent == AgentId::Codex {
Some(options.clone())
} else {
@ -575,6 +602,19 @@ impl AgentManager {
}
fn build_command(&self, agent: AgentId, options: &SpawnOptions) -> Result<Command, AgentError> {
if agent == AgentId::Pi {
return Err(AgentError::UnsupportedRuntimePath {
agent,
operation: "build_command",
recommended_path: "router-managed per-session RPC runtime",
});
}
if agent == AgentId::Mock {
return Err(AgentError::UnsupportedAgent {
agent: agent.as_str().to_string(),
});
}
let path = self.resolve_binary(agent)?;
let working_dir = options
.working_dir
@ -646,10 +686,21 @@ impl AgentManager {
AgentId::Amp => {
return Ok(build_amp_command(&path, &working_dir, options));
}
AgentId::Cursor => {
command.arg("run").arg("--format").arg("json");
if let Some(model) = options.model.as_deref() {
command.arg("-m").arg(model);
}
if let Some(session_id) = options.session_id.as_deref() {
command.arg("-s").arg(session_id);
}
command.arg(&options.prompt);
}
AgentId::Pi => {
unreachable!("Pi is handled by router RPC runtime");
}
AgentId::Mock => {
return Err(AgentError::UnsupportedAgent {
agent: agent.as_str().to_string(),
});
unreachable!("Mock is handled above");
}
}
@ -762,6 +813,12 @@ pub enum AgentError {
ExtractFailed(String),
#[error("resume unsupported for {agent}")]
ResumeUnsupported { agent: AgentId },
#[error("unsupported runtime path for {agent}: {operation}; use {recommended_path}")]
UnsupportedRuntimePath {
agent: AgentId,
operation: &'static str,
recommended_path: &'static str,
},
}
fn parse_version_output(output: &std::process::Output) -> Option<String> {
@ -969,6 +1026,33 @@ fn extract_session_id(agent: AgentId, events: &[Value]) -> Option<String> {
return Some(id);
}
}
AgentId::Pi => {
if event.get("type").and_then(Value::as_str) == Some("session") {
if let Some(id) = event.get("id").and_then(Value::as_str) {
return Some(id.to_string());
}
}
if let Some(id) = event.get("session_id").and_then(Value::as_str) {
return Some(id.to_string());
}
if let Some(id) = event.get("sessionId").and_then(Value::as_str) {
return Some(id.to_string());
}
if let Some(id) = extract_nested_string(event, &["data", "sessionId"]) {
return Some(id);
}
if let Some(id) = extract_nested_string(event, &["session", "id"]) {
return Some(id);
}
}
AgentId::Cursor => {
if let Some(id) = event.get("session_id").and_then(Value::as_str) {
return Some(id.to_string());
}
if let Some(id) = event.get("sessionId").and_then(Value::as_str) {
return Some(id.to_string());
}
}
AgentId::Mock => {}
}
}
@ -1051,10 +1135,125 @@ fn extract_result_text(agent: AgentId, events: &[Value]) -> Option<String> {
Some(buffer)
}
}
AgentId::Pi => extract_pi_result_text(events),
AgentId::Cursor => None,
AgentId::Mock => None,
}
}
fn extract_text_from_content_parts(content: &Value) -> Option<String> {
let parts = content.as_array()?;
let mut text = String::new();
for part in parts {
if part.get("type").and_then(Value::as_str) != Some("text") {
continue;
}
if let Some(part_text) = part.get("text").and_then(Value::as_str) {
text.push_str(part_text);
}
}
if text.is_empty() {
None
} else {
Some(text)
}
}
fn extract_assistant_message_text(message: &Value) -> Option<String> {
if message.get("role").and_then(Value::as_str) != Some("assistant") {
return None;
}
if let Some(content) = message.get("content") {
return extract_text_from_content_parts(content);
}
None
}
fn extract_pi_result_text(events: &[Value]) -> Option<String> {
let mut delta_buffer = String::new();
let mut last_full = None;
for event in events {
if event.get("type").and_then(Value::as_str) == Some("message_update") {
if let Some(delta_kind) =
extract_nested_string(event, &["assistantMessageEvent", "type"])
{
if delta_kind == "text_delta" {
if let Some(delta) =
extract_nested_string(event, &["assistantMessageEvent", "delta"])
{
delta_buffer.push_str(&delta);
}
if let Some(delta) =
extract_nested_string(event, &["assistantMessageEvent", "text"])
{
delta_buffer.push_str(&delta);
}
}
}
}
if let Some(message) = event.get("message") {
if let Some(text) = extract_assistant_message_text(message) {
last_full = Some(text);
}
}
if event.get("type").and_then(Value::as_str) == Some("agent_end") {
if let Some(messages) = event.get("messages").and_then(Value::as_array) {
for message in messages {
if let Some(text) = extract_assistant_message_text(message) {
last_full = Some(text);
}
}
}
}
}
if delta_buffer.is_empty() {
last_full
} else {
Some(delta_buffer)
}
}
fn apply_pi_model_args(command: &mut Command, model: Option<&str>) {
let Some(model) = model else {
return;
};
if let Some((provider, model_id)) = model.split_once('/') {
command
.arg("--provider")
.arg(provider)
.arg("--model")
.arg(model_id);
return;
}
command.arg("--model").arg(model);
}
fn spawn_pi(
path: &Path,
working_dir: &Path,
options: &SpawnOptions,
) -> Result<std::process::Output, AgentError> {
if options.session_id.is_some() {
return Err(AgentError::ResumeUnsupported { agent: AgentId::Pi });
}
let mut command = Command::new(path);
command
.current_dir(working_dir)
.arg("--mode")
.arg("json")
.arg("--print");
apply_pi_model_args(&mut command, options.model.as_deref());
if let Some(variant) = options.variant.as_deref() {
command.arg("--thinking").arg(variant);
}
command.arg(&options.prompt);
for (key, value) in &options.env {
command.env(key, value);
}
command.output().map_err(AgentError::Io)
}
fn spawn_amp(
path: &Path,
working_dir: &Path,
@ -1228,7 +1427,7 @@ fn install_cursor(path: &Path, platform: Platform, _version: Option<&str>) -> Re
}
fn download_bytes(url: &Url) -> Result<Vec<u8>, AgentError> {
let client = Client::builder().build()?;
let client = crate::http_client::blocking_client_builder().build()?;
let mut response = client.get(url.clone()).send()?;
if !response.status().is_success() {
return Err(AgentError::DownloadFailed { url: url.clone() });
@ -1238,6 +1437,28 @@ fn download_bytes(url: &Url) -> Result<Vec<u8>, AgentError> {
Ok(bytes)
}
fn install_pi(path: &Path, platform: Platform, version: Option<&str>) -> Result<(), AgentError> {
let asset = match platform {
Platform::LinuxX64 | Platform::LinuxX64Musl => "pi-linux-x64",
Platform::LinuxArm64 => "pi-linux-arm64",
Platform::MacosArm64 => "pi-darwin-arm64",
Platform::MacosX64 => "pi-darwin-x64",
}
.to_string();
let url = match version {
Some(version) => Url::parse(&format!(
"https://upd.dev/badlogic/pi-mono/releases/download/{version}/{asset}"
))?,
None => Url::parse(&format!(
"https://upd.dev/badlogic/pi-mono/releases/latest/download/{asset}"
))?,
};
let bytes = download_bytes(&url)?;
write_executable(path, &bytes)?;
Ok(())
}
fn install_claude(
path: &Path,
platform: Platform,
@ -1357,7 +1578,7 @@ fn install_opencode(
};
install_zip_binary(path, &url, "opencode")
}
_ => {
Platform::LinuxX64 | Platform::LinuxX64Musl | Platform::LinuxArm64 => {
let platform_segment = match platform {
Platform::LinuxX64 => "linux-x64",
Platform::LinuxX64Musl => "linux-x64-musl",
@ -1459,3 +1680,87 @@ fn find_file_recursive(dir: &Path, filename: &str) -> Result<Option<PathBuf>, Ag
}
Ok(None)
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::{
extract_result_text, extract_session_id, AgentError, AgentId, AgentManager, SpawnOptions,
};
#[test]
fn pi_spawn_streaming_fails_fast_with_runtime_contract_error() {
let temp_dir = tempfile::tempdir().expect("temp dir");
let manager = AgentManager::new(temp_dir.path().join("bin")).expect("agent manager");
let err = manager
.spawn_streaming(AgentId::Pi, SpawnOptions::new("hello"))
.expect_err("expected Pi spawn_streaming to be rejected");
assert!(matches!(
err,
AgentError::UnsupportedRuntimePath {
agent: AgentId::Pi,
operation: "spawn_streaming",
..
}
));
}
#[test]
fn extract_pi_session_id_from_session_event() {
let events = vec![json!({
"type": "session",
"id": "pi-session-123"
})];
assert_eq!(
extract_session_id(AgentId::Pi, &events).as_deref(),
Some("pi-session-123")
);
}
#[test]
fn extract_pi_result_text_from_agent_end_message() {
let events = vec![json!({
"type": "agent_end",
"messages": [
{
"role": "assistant",
"content": [
{
"type": "text",
"text": "OK"
}
]
}
]
})];
assert_eq!(
extract_result_text(AgentId::Pi, &events).as_deref(),
Some("OK")
);
}
#[test]
fn extract_pi_result_text_from_message_update_deltas() {
let events = vec![
json!({
"type": "message_update",
"assistantMessageEvent": {
"type": "text_delta",
"delta": "O"
}
}),
json!({
"type": "message_update",
"assistantMessageEvent": {
"type": "text_delta",
"delta": "K"
}
}),
];
assert_eq!(
extract_result_text(AgentId::Pi, &events).as_deref(),
Some("OK")
);
}
}

View file

@ -0,0 +1,20 @@
use std::env;
use reqwest::blocking::ClientBuilder;
const NO_SYSTEM_PROXY_ENV: &str = "SANDBOX_AGENT_NO_SYSTEM_PROXY";
fn disable_system_proxy() -> bool {
env::var(NO_SYSTEM_PROXY_ENV)
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
pub(crate) fn blocking_client_builder() -> ClientBuilder {
let builder = reqwest::blocking::Client::builder();
if disable_system_proxy() {
builder.no_proxy()
} else {
builder
}
}

View file

@ -1,3 +1,4 @@
pub mod agents;
pub mod credentials;
mod http_client;
pub mod testing;

View file

@ -2,7 +2,6 @@ use std::env;
use std::path::PathBuf;
use std::time::Duration;
use reqwest::blocking::Client;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION, CONTENT_TYPE};
use reqwest::StatusCode;
use thiserror::Error;
@ -36,6 +35,7 @@ pub enum TestAgentConfigError {
const AGENTS_ENV: &str = "SANDBOX_TEST_AGENTS";
const ANTHROPIC_ENV: &str = "SANDBOX_TEST_ANTHROPIC_API_KEY";
const OPENAI_ENV: &str = "SANDBOX_TEST_OPENAI_API_KEY";
const PI_ENV: &str = "SANDBOX_TEST_PI";
const ANTHROPIC_MODELS_URL: &str = "https://api.anthropic.com/v1/models";
const OPENAI_MODELS_URL: &str = "https://api.openai.com/v1/models";
const ANTHROPIC_VERSION: &str = "2023-06-01";
@ -63,6 +63,7 @@ pub fn test_agents_from_env() -> Result<Vec<TestAgentConfig>, TestAgentConfigErr
AgentId::Codex,
AgentId::Opencode,
AgentId::Amp,
AgentId::Pi,
]);
continue;
}
@ -73,6 +74,12 @@ pub fn test_agents_from_env() -> Result<Vec<TestAgentConfig>, TestAgentConfigErr
agents
};
let include_pi = pi_tests_enabled() && find_in_path(AgentId::Pi.binary_name());
if !include_pi && agents.iter().any(|agent| *agent == AgentId::Pi) {
eprintln!("Skipping Pi tests: set {PI_ENV}=1 and ensure pi is on PATH.");
}
agents.retain(|agent| *agent != AgentId::Pi || include_pi);
agents.sort_by(|a, b| a.as_str().cmp(b.as_str()));
agents.dedup();
@ -137,6 +144,22 @@ pub fn test_agents_from_env() -> Result<Vec<TestAgentConfig>, TestAgentConfigErr
}
credentials_with(anthropic_cred.clone(), openai_cred.clone())
}
AgentId::Pi => {
if anthropic_cred.is_none() && openai_cred.is_none() {
return Err(TestAgentConfigError::MissingCredentials {
agent,
missing: format!("{ANTHROPIC_ENV} or {OPENAI_ENV}"),
});
}
if let Some(cred) = anthropic_cred.as_ref() {
ensure_anthropic_ok(&mut health_cache, cred)?;
}
if let Some(cred) = openai_cred.as_ref() {
ensure_openai_ok(&mut health_cache, cred)?;
}
credentials_with(anthropic_cred.clone(), openai_cred.clone())
}
AgentId::Cursor => credentials_with(None, None),
AgentId::Mock => credentials_with(None, None),
};
configs.push(TestAgentConfig { agent, credentials });
@ -172,7 +195,7 @@ fn ensure_openai_ok(
fn health_check_anthropic(credentials: &ProviderCredentials) -> Result<(), TestAgentConfigError> {
let credentials = credentials.clone();
run_blocking_check("anthropic", move || {
let client = Client::builder()
let client = crate::http_client::blocking_client_builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|err| TestAgentConfigError::HealthCheckFailed {
@ -226,7 +249,7 @@ fn health_check_anthropic(credentials: &ProviderCredentials) -> Result<(), TestA
fn health_check_openai(credentials: &ProviderCredentials) -> Result<(), TestAgentConfigError> {
let credentials = credentials.clone();
run_blocking_check("openai", move || {
let client = Client::builder()
let client = crate::http_client::blocking_client_builder()
.timeout(Duration::from_secs(10))
.build()
.map_err(|err| TestAgentConfigError::HealthCheckFailed {
@ -298,12 +321,15 @@ where
}
fn detect_system_agents() -> Vec<AgentId> {
let candidates = [
let mut candidates = vec![
AgentId::Claude,
AgentId::Codex,
AgentId::Opencode,
AgentId::Amp,
];
if pi_tests_enabled() && find_in_path(AgentId::Pi.binary_name()) {
candidates.push(AgentId::Pi);
}
let install_dir = default_install_dir();
candidates
.into_iter()
@ -345,6 +371,15 @@ fn read_env_key(name: &str) -> Option<String> {
})
}
fn pi_tests_enabled() -> bool {
env::var(PI_ENV)
.map(|value| {
let value = value.trim().to_ascii_lowercase();
value == "1" || value == "true" || value == "yes"
})
.unwrap_or(false)
}
fn credentials_with(
anthropic_cred: Option<ProviderCredentials>,
openai_cred: Option<ProviderCredentials>,

View file

@ -11,6 +11,7 @@ fn main() {
("claude", "claude.json"),
("codex", "codex.json"),
("amp", "amp.json"),
("pi", "pi.json"),
];
for (name, file) in schemas {

View file

@ -5,6 +5,7 @@
//! - Claude Code SDK
//! - Codex SDK
//! - AMP Code SDK
//! - Pi RPC
pub mod opencode {
//! OpenCode SDK types extracted from OpenAPI 3.1.1 spec.
@ -25,3 +26,8 @@ pub mod amp {
//! AMP Code SDK types.
include!(concat!(env!("OUT_DIR"), "/amp.rs"));
}
pub mod pi {
//! Pi RPC types.
include!(concat!(env!("OUT_DIR"), "/pi.rs"));
}

View file

@ -1265,6 +1265,7 @@ enum CredentialAgent {
Codex,
Opencode,
Amp,
Pi,
}
fn credentials_to_output(credentials: ExtractedCredentials, reveal: bool) -> CredentialsOutput {
@ -1384,6 +1385,31 @@ fn select_token_for_agent(
)))
}
}
CredentialAgent::Pi => {
if let Some(provider) = provider {
return select_token_for_provider(credentials, provider);
}
if let Some(openai) = credentials.openai.as_ref() {
return Ok(openai.api_key.clone());
}
if let Some(anthropic) = credentials.anthropic.as_ref() {
return Ok(anthropic.api_key.clone());
}
if credentials.other.len() == 1 {
if let Some((_, cred)) = credentials.other.iter().next() {
return Ok(cred.api_key.clone());
}
}
let available = available_providers(credentials);
if available.is_empty() {
Err(CliError::Server("no credentials found for pi".to_string()))
} else {
Err(CliError::Server(format!(
"multiple providers available for pi: {} (use --provider)",
available.join(", ")
)))
}
}
}
}

View file

@ -0,0 +1,30 @@
use std::env;
use reqwest::blocking::ClientBuilder as BlockingClientBuilder;
use reqwest::ClientBuilder;
const NO_SYSTEM_PROXY_ENV: &str = "SANDBOX_AGENT_NO_SYSTEM_PROXY";
fn disable_system_proxy() -> bool {
env::var(NO_SYSTEM_PROXY_ENV)
.map(|value| matches!(value.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false)
}
pub fn client_builder() -> ClientBuilder {
let builder = reqwest::Client::builder();
if disable_system_proxy() {
builder.no_proxy()
} else {
builder
}
}
pub fn blocking_client_builder() -> BlockingClientBuilder {
let builder = reqwest::blocking::Client::builder();
if disable_system_proxy() {
builder.no_proxy()
} else {
builder
}
}

View file

@ -4,6 +4,7 @@ mod agent_server_logs;
pub mod cli;
pub mod credentials;
pub mod daemon;
pub mod http_client;
pub mod opencode_compat;
pub mod router;
pub mod server_logs;

View file

@ -10,6 +10,7 @@ use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use std::time::{Duration, Instant};
use axum::body::Body;
use axum::extract::{Path, Query, State};
@ -52,6 +53,7 @@ const OPENCODE_EVENT_LOG_SIZE: usize = 4096;
const OPENCODE_DEFAULT_MODEL_ID: &str = "mock";
const OPENCODE_DEFAULT_PROVIDER_ID: &str = "mock";
const OPENCODE_DEFAULT_AGENT_MODE: &str = "build";
const OPENCODE_MODEL_CACHE_TTL: Duration = Duration::from_secs(30);
const OPENCODE_MODEL_CHANGE_AFTER_SESSION_CREATE_ERROR: &str = "OpenCode compatibility currently does not support changing the model after creating a session. Export with /export and load in to a new session.";
#[derive(Clone, Debug)]
@ -301,6 +303,8 @@ struct OpenCodeModelCache {
group_names: HashMap<String, String>,
default_group: String,
default_model: String,
cached_at: Instant,
had_discovery_errors: bool,
/// Group IDs that have valid credentials available
connected: Vec<String>,
}
@ -814,6 +818,8 @@ fn available_agent_ids() -> Vec<AgentId> {
AgentId::Codex,
AgentId::Opencode,
AgentId::Amp,
AgentId::Pi,
AgentId::Cursor,
AgentId::Mock,
]
}
@ -832,18 +838,30 @@ async fn opencode_model_cache(state: &OpenCodeAppState) -> OpenCodeModelCache {
// spawning duplicate provider/model fetches.
let mut slot = state.opencode.model_cache.lock().await;
if let Some(cache) = slot.as_ref() {
info!(
entries = cache.entries.len(),
groups = cache.group_names.len(),
connected = cache.connected.len(),
"opencode model cache hit"
);
return cache.clone();
if cache.cached_at.elapsed() < OPENCODE_MODEL_CACHE_TTL {
info!(
entries = cache.entries.len(),
groups = cache.group_names.len(),
connected = cache.connected.len(),
"opencode model cache hit"
);
return cache.clone();
}
}
let previous_cache = slot.clone();
let started = std::time::Instant::now();
info!("opencode model cache miss; building cache");
let cache = build_opencode_model_cache(state).await;
let mut cache = build_opencode_model_cache(state).await;
if let Some(previous_cache) = previous_cache {
if cache.had_discovery_errors
&& cache.entries.is_empty()
&& !previous_cache.entries.is_empty()
{
cache = previous_cache;
cache.cached_at = Instant::now();
}
}
info!(
elapsed_ms = started.elapsed().as_millis() as u64,
entries = cache.entries.len(),
@ -884,6 +902,7 @@ async fn build_opencode_model_cache(state: &OpenCodeAppState) -> OpenCodeModelCa
let mut group_agents: HashMap<String, AgentId> = HashMap::new();
let mut group_names: HashMap<String, String> = HashMap::new();
let mut default_model: Option<String> = None;
let mut had_discovery_errors = false;
let agents = available_agent_ids();
let manager = state.inner.session_manager();
@ -901,6 +920,10 @@ async fn build_opencode_model_cache(state: &OpenCodeAppState) -> OpenCodeModelCa
let response = match response {
Ok(response) => response,
Err(err) => {
had_discovery_errors = true;
let (group_id, group_name) = fallback_group_for_agent(agent);
group_agents.entry(group_id.clone()).or_insert(agent);
group_names.entry(group_id).or_insert(group_name);
warn!(
agent = agent.as_str(),
elapsed_ms = elapsed.as_millis() as u64,
@ -918,6 +941,12 @@ async fn build_opencode_model_cache(state: &OpenCodeAppState) -> OpenCodeModelCa
"opencode model cache fetched agent models"
);
if response.models.is_empty() {
let (group_id, group_name) = fallback_group_for_agent(agent);
group_agents.entry(group_id.clone()).or_insert(agent);
group_names.entry(group_id).or_insert(group_name);
}
let first_model_id = response.models.first().map(|model| model.id.clone());
for model in response.models {
let model_id = model.id.clone();
@ -1016,6 +1045,8 @@ async fn build_opencode_model_cache(state: &OpenCodeAppState) -> OpenCodeModelCa
_ => has_anthropic || has_openai,
}
}
Some(AgentId::Pi) => true,
Some(AgentId::Cursor) => true,
Some(AgentId::Mock) => true,
None => false,
};
@ -1032,6 +1063,8 @@ async fn build_opencode_model_cache(state: &OpenCodeAppState) -> OpenCodeModelCa
group_names,
default_group,
default_model,
cached_at: Instant::now(),
had_discovery_errors,
connected,
};
info!(
@ -1046,6 +1079,19 @@ async fn build_opencode_model_cache(state: &OpenCodeAppState) -> OpenCodeModelCa
cache
}
fn fallback_group_for_agent(agent: AgentId) -> (String, String) {
if agent == AgentId::Opencode {
return (
"opencode".to_string(),
agent_display_name(agent).to_string(),
);
}
(
agent.as_str().to_string(),
agent_display_name(agent).to_string(),
)
}
fn resolve_agent_from_model(
cache: &OpenCodeModelCache,
provider_id: &str,
@ -1159,6 +1205,8 @@ fn agent_display_name(agent: AgentId) -> &'static str {
AgentId::Codex => "Codex",
AgentId::Opencode => "OpenCode",
AgentId::Amp => "Amp",
AgentId::Pi => "Pi",
AgentId::Cursor => "Cursor",
AgentId::Mock => "Mock",
}
}
@ -3247,6 +3295,9 @@ async fn oc_config_providers(State(state): State<Arc<OpenCodeAppState>>) -> impl
.or_default()
.push(entry);
}
for group_id in cache.group_names.keys() {
grouped.entry(group_id.clone()).or_default();
}
let mut providers = Vec::new();
let mut defaults = serde_json::Map::new();
for (group_id, entries) in grouped {
@ -4835,6 +4886,9 @@ async fn oc_provider_list(State(state): State<Arc<OpenCodeAppState>>) -> impl In
.or_default()
.push(entry);
}
for group_id in cache.group_names.keys() {
grouped.entry(group_id.clone()).or_default();
}
let mut providers = Vec::new();
let mut defaults = serde_json::Map::new();
for (group_id, entries) in grouped {

File diff suppressed because it is too large Load diff

View file

@ -11,6 +11,7 @@ use serde::Serialize;
use time::OffsetDateTime;
use tokio::time::Instant;
use crate::http_client;
static TELEMETRY_ENABLED: AtomicBool = AtomicBool::new(false);
const TELEMETRY_URL: &str = "https://tc.rivet.dev";
@ -82,7 +83,7 @@ pub fn log_enabled_message() {
pub fn spawn_telemetry_task() {
tokio::spawn(async move {
let client = match Client::builder()
let client = match http_client::client_builder()
.timeout(Duration::from_millis(TELEMETRY_TIMEOUT_MS))
.build()
{

View file

@ -5,3 +5,4 @@ mod agent_permission_flow;
mod agent_question_flow;
mod agent_termination;
mod agent_tool_flow;
mod pi_rpc_integration;

View file

@ -0,0 +1,410 @@
// Pi RPC integration tests (gated via SANDBOX_TEST_PI + PATH).
include!("../common/http.rs");
fn pi_test_config() -> Option<TestAgentConfig> {
let configs = match test_agents_from_env() {
Ok(configs) => configs,
Err(err) => {
eprintln!("Skipping Pi RPC integration test: {err}");
return None;
}
};
configs
.into_iter()
.find(|config| config.agent == AgentId::Pi)
}
async fn create_pi_session_with_native(app: &Router, session_id: &str) -> String {
let payload = create_pi_session(app, session_id, None, None).await;
let native_session_id = payload
.get("native_session_id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
assert!(
!native_session_id.is_empty(),
"expected native_session_id for pi session"
);
native_session_id
}
async fn create_pi_session(
app: &Router,
session_id: &str,
model: Option<&str>,
variant: Option<&str>,
) -> Value {
let mut body = Map::new();
body.insert("agent".to_string(), json!("pi"));
body.insert(
"permissionMode".to_string(),
json!(test_permission_mode(AgentId::Pi)),
);
if let Some(model) = model {
body.insert("model".to_string(), json!(model));
}
if let Some(variant) = variant {
body.insert("variant".to_string(), json!(variant));
}
let (status, payload) = send_json(
app,
Method::POST,
&format!("/v1/sessions/{session_id}"),
Some(Value::Object(body)),
)
.await;
assert_eq!(status, StatusCode::OK, "create pi session");
payload
}
async fn fetch_pi_models(app: &Router) -> Vec<Value> {
let (status, payload) = send_json(app, Method::GET, "/v1/agents/pi/models", None).await;
assert_eq!(status, StatusCode::OK, "pi models endpoint");
payload
.get("models")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default()
}
fn model_variant_ids(model: &Value) -> Vec<&str> {
model
.get("variants")
.and_then(Value::as_array)
.map(|values| values.iter().filter_map(Value::as_str).collect::<Vec<_>>())
.unwrap_or_default()
}
fn assert_strictly_increasing_sequences(events: &[Value], label: &str) {
let mut last_sequence = 0u64;
for event in events {
let sequence = event
.get("sequence")
.and_then(Value::as_u64)
.expect("missing sequence");
assert!(
sequence > last_sequence,
"{label}: sequence did not increase (prev {last_sequence}, next {sequence})"
);
last_sequence = sequence;
}
}
fn assert_all_events_for_session(events: &[Value], session_id: &str) {
for event in events {
let event_session_id = event
.get("session_id")
.and_then(Value::as_str)
.unwrap_or_default();
assert_eq!(
event_session_id, session_id,
"cross-session event detected in {session_id}: {event}"
);
}
}
fn assert_item_started_ids_unique(events: &[Value], label: &str) {
let mut ids = std::collections::HashSet::new();
for event in events {
let event_type = event
.get("type")
.and_then(Value::as_str)
.unwrap_or_default();
if event_type != "item.started" {
continue;
}
let Some(item_id) = event
.get("data")
.and_then(|data| data.get("item"))
.and_then(|item| item.get("item_id"))
.and_then(Value::as_str)
else {
continue;
};
assert!(
ids.insert(item_id.to_string()),
"{label}: duplicate item.started id {item_id}"
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pi_rpc_session_and_stream() {
let Some(config) = pi_test_config() else {
return;
};
let app = TestApp::new();
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_id = "pi-rpc-session";
let _native_session_id = create_pi_session_with_native(&app.app, session_id).await;
let events = read_turn_stream_events(&app.app, session_id, Duration::from_secs(120)).await;
assert!(!events.is_empty(), "no events from pi stream");
assert!(
!events.iter().any(is_unparsed_event),
"agent.unparsed event encountered"
);
assert!(
should_stop(&events),
"turn stream did not reach a terminal event"
);
assert_strictly_increasing_sequences(&events, "pi_rpc_session_and_stream");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pi_variant_high_applies_for_thinking_model() {
let Some(config) = pi_test_config() else {
return;
};
let app = TestApp::new();
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let models = fetch_pi_models(&app.app).await;
let thinking_model = models.iter().find_map(|model| {
let model_id = model.get("id").and_then(Value::as_str)?;
let variants = model_variant_ids(model);
if variants.contains(&"high") {
Some(model_id.to_string())
} else {
None
}
});
let Some(model_id) = thinking_model else {
eprintln!("Skipping PI variant thinking-model test: no model advertises high");
return;
};
let session_id = "pi-variant-thinking-high";
create_pi_session(&app.app, session_id, Some(&model_id), Some("high")).await;
let events = read_turn_stream_events(&app.app, session_id, Duration::from_secs(120)).await;
assert!(
!events.is_empty(),
"no events from pi thinking-variant stream"
);
assert!(
!events.iter().any(is_unparsed_event),
"agent.unparsed event encountered for thinking-variant session"
);
assert!(
should_stop(&events),
"thinking-variant turn stream did not reach a terminal event"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pi_variant_high_on_non_thinking_model_uses_pi_native_clamping() {
let Some(config) = pi_test_config() else {
return;
};
let app = TestApp::new();
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let models = fetch_pi_models(&app.app).await;
let non_thinking_model = models.iter().find_map(|model| {
let model_id = model.get("id").and_then(Value::as_str)?;
let variants = model_variant_ids(model);
if variants == vec!["off"] {
Some(model_id.to_string())
} else {
None
}
});
let Some(model_id) = non_thinking_model else {
eprintln!("Skipping PI non-thinking variant test: no off-only model reported");
return;
};
let session_id = "pi-variant-nonthinking-high";
create_pi_session(&app.app, session_id, Some(&model_id), Some("high")).await;
let events = read_turn_stream_events(&app.app, session_id, Duration::from_secs(120)).await;
assert!(
!events.is_empty(),
"no events from pi non-thinking variant stream"
);
assert!(
!events.iter().any(is_unparsed_event),
"agent.unparsed event encountered for non-thinking variant session"
);
assert!(
should_stop(&events),
"non-thinking variant turn stream did not reach a terminal event"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pi_parallel_sessions_turns() {
let Some(config) = pi_test_config() else {
return;
};
let app = TestApp::new();
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_a = "pi-parallel-a";
let session_b = "pi-parallel-b";
create_pi_session_with_native(&app.app, session_a).await;
create_pi_session_with_native(&app.app, session_b).await;
let app_a = app.app.clone();
let app_b = app.app.clone();
let send_a = send_message(&app_a, session_a);
let send_b = send_message(&app_b, session_b);
tokio::join!(send_a, send_b);
let app_a = app.app.clone();
let app_b = app.app.clone();
let poll_a = poll_events_until(&app_a, session_a, Duration::from_secs(120));
let poll_b = poll_events_until(&app_b, session_b, Duration::from_secs(120));
let (events_a, events_b) = tokio::join!(poll_a, poll_b);
assert!(!events_a.is_empty(), "no events for session A");
assert!(!events_b.is_empty(), "no events for session B");
assert!(
should_stop(&events_a),
"session A did not reach a terminal event"
);
assert!(
should_stop(&events_b),
"session B did not reach a terminal event"
);
assert!(
!events_a.iter().any(is_unparsed_event),
"session A encountered agent.unparsed"
);
assert!(
!events_b.iter().any(is_unparsed_event),
"session B encountered agent.unparsed"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pi_event_isolation() {
let Some(config) = pi_test_config() else {
return;
};
let app = TestApp::new();
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_a = "pi-isolation-a";
let session_b = "pi-isolation-b";
create_pi_session_with_native(&app.app, session_a).await;
create_pi_session_with_native(&app.app, session_b).await;
let app_a = app.app.clone();
let app_b = app.app.clone();
let send_a = send_message(&app_a, session_a);
let send_b = send_message(&app_b, session_b);
tokio::join!(send_a, send_b);
let app_a = app.app.clone();
let app_b = app.app.clone();
let poll_a = poll_events_until(&app_a, session_a, Duration::from_secs(120));
let poll_b = poll_events_until(&app_b, session_b, Duration::from_secs(120));
let (events_a, events_b) = tokio::join!(poll_a, poll_b);
assert!(should_stop(&events_a), "session A did not complete");
assert!(should_stop(&events_b), "session B did not complete");
assert_all_events_for_session(&events_a, session_a);
assert_all_events_for_session(&events_b, session_b);
assert_strictly_increasing_sequences(&events_a, "session A");
assert_strictly_increasing_sequences(&events_b, "session B");
assert_item_started_ids_unique(&events_a, "session A");
assert_item_started_ids_unique(&events_b, "session B");
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pi_terminate_one_session_does_not_affect_other() {
let Some(config) = pi_test_config() else {
return;
};
let app = TestApp::new();
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_a = "pi-terminate-a";
let session_b = "pi-terminate-b";
create_pi_session_with_native(&app.app, session_a).await;
create_pi_session_with_native(&app.app, session_b).await;
let terminate_status = send_status(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_a}/terminate"),
None,
)
.await;
assert_eq!(
terminate_status,
StatusCode::NO_CONTENT,
"terminate session A"
);
send_message(&app.app, session_b).await;
let events_b = poll_events_until(&app.app, session_b, Duration::from_secs(120)).await;
assert!(!events_b.is_empty(), "no events for session B");
assert!(
should_stop(&events_b),
"session B did not complete after A terminated"
);
let events_a = poll_events_until(&app.app, session_a, Duration::from_secs(10)).await;
assert!(
events_a.iter().any(|event| {
event
.get("type")
.and_then(Value::as_str)
.is_some_and(|ty| ty == "session.ended")
}),
"session A missing session.ended after terminate"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pi_runtime_restart_scope() {
let Some(config) = pi_test_config() else {
return;
};
let app = TestApp::new();
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, config.agent).await;
let session_a = "pi-restart-scope-a";
let session_b = "pi-restart-scope-b";
create_pi_session_with_native(&app.app, session_a).await;
create_pi_session_with_native(&app.app, session_b).await;
let terminate_status = send_status(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_a}/terminate"),
None,
)
.await;
assert_eq!(
terminate_status,
StatusCode::NO_CONTENT,
"terminate session A to stop only its runtime"
);
send_message(&app.app, session_b).await;
let events_b = poll_events_until(&app.app, session_b, Duration::from_secs(120)).await;
assert!(
should_stop(&events_b),
"session B did not continue after A stopped"
);
assert_all_events_for_session(&events_b, session_b);
}

View file

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::env;
use sandbox_agent_agent_management::agents::{
AgentError, AgentId, AgentManager, InstallOptions, SpawnOptions,
@ -29,6 +30,48 @@ fn prompt_ok(label: &str) -> String {
format!("Respond with exactly the text {label} and nothing else.")
}
fn pi_tests_enabled() -> bool {
env::var("SANDBOX_TEST_PI")
.map(|value| {
let value = value.trim().to_ascii_lowercase();
value == "1" || value == "true" || value == "yes"
})
.unwrap_or(false)
}
fn pi_on_path() -> bool {
let binary = AgentId::Pi.binary_name();
let path_var = match env::var_os("PATH") {
Some(path) => path,
None => return false,
};
for path in env::split_paths(&path_var) {
if path.join(binary).exists() {
return true;
}
}
false
}
#[test]
fn pi_spawn_streaming_is_rejected_with_runtime_contract_error(
) -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
let manager = AgentManager::new(temp_dir.path().join("bin"))?;
let err = manager
.spawn_streaming(AgentId::Pi, SpawnOptions::new(prompt_ok("IGNORED")))
.expect_err("expected Pi spawn_streaming to be rejected");
assert!(matches!(
err,
AgentError::UnsupportedRuntimePath {
agent: AgentId::Pi,
operation: "spawn_streaming",
..
}
));
Ok(())
}
#[test]
fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>> {
let temp_dir = tempfile::tempdir()?;
@ -36,12 +79,15 @@ fn test_agents_install_version_spawn() -> Result<(), Box<dyn std::error::Error>>
let env = build_env();
assert!(!env.is_empty(), "expected credentials to be available");
let agents = [
let mut agents = vec![
AgentId::Claude,
AgentId::Codex,
AgentId::Opencode,
AgentId::Amp,
];
if pi_tests_enabled() && pi_on_path() {
agents.push(AgentId::Pi);
}
for agent in agents {
let install = manager.install(agent, InstallOptions::default())?;
assert!(install.path.exists(), "expected install for {agent}");

View file

@ -178,7 +178,7 @@ async fn install_agent(app: &Router, agent: AgentId) {
/// while other agents support "bypass" which skips tool approval.
fn test_permission_mode(agent: AgentId) -> &'static str {
match agent {
AgentId::Opencode => "default",
AgentId::Opencode | AgentId::Pi => "default",
_ => "bypass",
}
}

View file

@ -182,7 +182,7 @@ pub async fn create_session_with_mode(
pub fn test_permission_mode(agent: AgentId) -> &'static str {
match agent {
AgentId::Opencode => "default",
AgentId::Opencode | AgentId::Pi => "default",
_ => "bypass",
}
}

View file

@ -187,6 +187,82 @@ async fn agent_endpoints_snapshots() {
}
}
fn pi_test_config() -> Option<TestAgentConfig> {
let configs = match test_agents_from_env() {
Ok(configs) => configs,
Err(err) => {
eprintln!("Skipping PI endpoint variant test: {err}");
return None;
}
};
configs
.into_iter()
.find(|config| config.agent == AgentId::Pi)
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn pi_capabilities_and_models_expose_variants() {
let Some(config) = pi_test_config() else {
return;
};
let app = TestApp::new();
let _guard = apply_credentials(&config.credentials);
install_agent(&app.app, AgentId::Pi).await;
let capabilities = fetch_capabilities(&app.app).await;
let pi_caps = capabilities.get("pi").expect("pi capabilities");
assert!(pi_caps.variants, "pi capabilities should enable variants");
let (status, payload) = send_json(&app.app, Method::GET, "/v1/agents/pi/models", None).await;
assert_eq!(status, StatusCode::OK, "pi models endpoint");
let models = payload
.get("models")
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
assert!(!models.is_empty(), "pi models should not be empty");
let full_levels = vec!["off", "minimal", "low", "medium", "high", "xhigh"];
for model in models {
let model_id = model
.get("id")
.and_then(Value::as_str)
.unwrap_or("<unknown>");
let variants = model
.get("variants")
.and_then(Value::as_array)
.expect("pi model variants");
let default_variant = model
.get("defaultVariant")
.and_then(Value::as_str)
.expect("pi model defaultVariant");
let variant_ids = variants
.iter()
.filter_map(Value::as_str)
.collect::<Vec<_>>();
assert!(
!variant_ids.is_empty(),
"pi model {model_id} has no variants"
);
if variant_ids == vec!["off"] {
assert_eq!(
default_variant, "off",
"pi model {model_id} expected default off for non-thinking model"
);
} else {
assert_eq!(
variant_ids, full_levels,
"pi model {model_id} expected full thinking levels"
);
assert_eq!(
default_variant, "medium",
"pi model {model_id} expected medium default for thinking model"
);
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn create_session_with_skill_sources() {
let app = TestApp::new();

View file

@ -0,0 +1,6 @@
---
source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs
assertion_line: 145
expression: snapshot_status(status)
---
status: 204

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs
assertion_line: 129
expression: normalize_agent_list(&agents)
---
agents:
@ -8,3 +9,4 @@ agents:
- id: codex
- id: mock
- id: opencode
- id: pi

View file

@ -1,5 +1,6 @@
---
source: server/packages/sandbox-agent/tests/http/agent_endpoints.rs
assertion_line: 59
expression: "json!({\n \"status\": status.as_u16(), \"payload\": normalize_agent_list(&payload),\n})"
---
payload:
@ -9,4 +10,5 @@ payload:
- id: codex
- id: mock
- id: opencode
- id: pi
status: 200

View file

@ -31,10 +31,12 @@ describe("OpenCode-compatible Model API", () => {
const providers = response.data?.all ?? [];
const mockProvider = providers.find((entry) => entry.id === "mock");
const ampProvider = providers.find((entry) => entry.id === "amp");
const piProvider = providers.find((entry) => entry.id === "pi");
const sandboxProvider = providers.find((entry) => entry.id === "sandbox-agent");
expect(sandboxProvider).toBeUndefined();
expect(mockProvider).toBeDefined();
expect(ampProvider).toBeDefined();
expect(piProvider).toBeDefined();
const mockModels = mockProvider?.models ?? {};
expect(mockModels["mock"]).toBeDefined();
@ -49,4 +51,17 @@ describe("OpenCode-compatible Model API", () => {
expect(response.data?.default?.["mock"]).toBe("mock");
expect(response.data?.default?.["amp"]).toBe("smart");
});
it("should keep provider backends visible when discovery is degraded", async () => {
const response = await client.provider.list();
const providers = response.data?.all ?? [];
const providerIds = new Set(providers.map((provider) => provider.id));
expect(providerIds.has("claude")).toBe(true);
expect(providerIds.has("codex")).toBe(true);
expect(providerIds.has("pi")).toBe(true);
expect(
providerIds.has("opencode") || Array.from(providerIds).some((id) => id.startsWith("opencode:"))
).toBe(true);
});
});

View file

@ -2,3 +2,4 @@ pub mod amp;
pub mod claude;
pub mod codex;
pub mod opencode;
pub mod pi;

View file

@ -0,0 +1,769 @@
use std::collections::{HashMap, HashSet};
use serde_json::Value;
use crate::pi as schema;
use crate::{
ContentPart, EventConversion, ItemDeltaData, ItemEventData, ItemKind, ItemRole, ItemStatus,
ReasoningVisibility, UniversalEventData, UniversalEventType, UniversalItem,
};
#[derive(Default)]
pub struct PiEventConverter {
tool_result_buffers: HashMap<String, String>,
tool_result_started: HashSet<String>,
message_completed: HashSet<String>,
message_errors: HashSet<String>,
message_reasoning: HashMap<String, String>,
message_text: HashMap<String, String>,
last_message_id: Option<String>,
message_started: HashSet<String>,
message_counter: u64,
}
impl PiEventConverter {
pub fn event_value_to_universal(
&mut self,
raw: &Value,
) -> Result<Vec<EventConversion>, String> {
let event_type = raw
.get("type")
.and_then(Value::as_str)
.ok_or_else(|| "missing event type".to_string())?;
let native_session_id = extract_session_id(raw);
let conversions = match event_type {
"message_start" => self.message_start(raw),
"message_update" => self.message_update(raw),
"message_end" => self.message_end(raw),
"tool_execution_start" => self.tool_execution_start(raw),
"tool_execution_update" => self.tool_execution_update(raw),
"tool_execution_end" => self.tool_execution_end(raw),
"agent_start"
| "agent_end"
| "turn_start"
| "turn_end"
| "auto_compaction"
| "auto_compaction_start"
| "auto_compaction_end"
| "auto_retry"
| "auto_retry_start"
| "auto_retry_end"
| "hook_error" => Ok(vec![status_event(event_type, raw)]),
"extension_ui_request" | "extension_ui_response" | "extension_error" => {
Ok(vec![status_event(event_type, raw)])
}
other => Err(format!("unsupported Pi event type: {other}")),
}?;
Ok(conversions
.into_iter()
.map(|conversion| attach_metadata(conversion, &native_session_id, raw))
.collect())
}
fn next_synthetic_message_id(&mut self) -> String {
self.message_counter += 1;
format!("pi_msg_{}", self.message_counter)
}
fn ensure_message_id(&mut self, message_id: Option<String>) -> String {
if let Some(id) = message_id {
self.last_message_id = Some(id.clone());
return id;
}
if let Some(id) = self.last_message_id.clone() {
return id;
}
let id = self.next_synthetic_message_id();
self.last_message_id = Some(id.clone());
id
}
fn ensure_message_started(&mut self, message_id: &str) -> Option<EventConversion> {
if !self.message_started.insert(message_id.to_string()) {
return None;
}
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(message_id.to_string()),
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content: Vec::new(),
status: ItemStatus::InProgress,
};
Some(
EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
)
.synthetic(),
)
}
fn clear_last_message_id(&mut self, message_id: Option<&str>) {
if message_id.is_none() || self.last_message_id.as_deref() == message_id {
self.last_message_id = None;
}
}
pub fn event_to_universal(
&mut self,
event: &schema::RpcEvent,
) -> Result<Vec<EventConversion>, String> {
let raw = serde_json::to_value(event).map_err(|err| err.to_string())?;
self.event_value_to_universal(&raw)
}
fn message_start(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let message = raw.get("message");
if is_user_role(message) {
return Ok(Vec::new());
}
let message_id = self.ensure_message_id(extract_message_id(raw));
self.message_completed.remove(&message_id);
self.message_started.insert(message_id.clone());
let content = message.and_then(parse_message_content).unwrap_or_default();
let entry = self.message_text.entry(message_id.clone()).or_default();
for part in &content {
if let ContentPart::Text { text } = part {
entry.push_str(text);
}
}
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(message_id),
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content,
status: ItemStatus::InProgress,
};
Ok(vec![EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
)])
}
fn message_update(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let assistant_event = raw
.get("assistantMessageEvent")
.or_else(|| raw.get("assistant_message_event"))
.ok_or_else(|| "missing assistantMessageEvent".to_string())?;
let event_type = assistant_event
.get("type")
.and_then(Value::as_str)
.unwrap_or("");
let message_id = extract_message_id(raw)
.or_else(|| extract_message_id(assistant_event))
.or_else(|| self.last_message_id.clone());
match event_type {
"start" => {
if let Some(id) = message_id {
self.last_message_id = Some(id);
}
Ok(Vec::new())
}
"text_start" | "text_delta" | "text_end" => {
let Some(delta) = extract_delta_text(assistant_event) else {
return Ok(Vec::new());
};
let message_id = self.ensure_message_id(message_id);
let entry = self.message_text.entry(message_id.clone()).or_default();
entry.push_str(&delta);
let mut conversions = Vec::new();
if let Some(start) = self.ensure_message_started(&message_id) {
conversions.push(start);
}
conversions.push(item_delta(Some(message_id), delta));
Ok(conversions)
}
"thinking_start" | "thinking_delta" | "thinking_end" => {
let Some(delta) = extract_delta_text(assistant_event) else {
return Ok(Vec::new());
};
let message_id = self.ensure_message_id(message_id);
let entry = self
.message_reasoning
.entry(message_id.clone())
.or_default();
entry.push_str(&delta);
let mut conversions = Vec::new();
if let Some(start) = self.ensure_message_started(&message_id) {
conversions.push(start);
}
conversions.push(item_delta(Some(message_id), delta));
Ok(conversions)
}
"toolcall_start"
| "toolcall_delta"
| "toolcall_end"
| "toolcall_args_start"
| "toolcall_args_delta"
| "toolcall_args_end" => Ok(Vec::new()),
"done" => {
let message_id = self.ensure_message_id(message_id);
if self.message_errors.remove(&message_id) {
self.message_text.remove(&message_id);
self.message_reasoning.remove(&message_id);
self.message_started.remove(&message_id);
self.clear_last_message_id(Some(&message_id));
return Ok(Vec::new());
}
if self.message_completed.contains(&message_id) {
self.clear_last_message_id(Some(&message_id));
return Ok(Vec::new());
}
let message = raw
.get("message")
.or_else(|| assistant_event.get("message"));
let conversion = self.complete_message(Some(message_id.clone()), message);
self.message_completed.insert(message_id.clone());
self.clear_last_message_id(Some(&message_id));
Ok(vec![conversion])
}
"error" => {
let message_id = self.ensure_message_id(message_id);
if self.message_completed.contains(&message_id) {
self.clear_last_message_id(Some(&message_id));
return Ok(Vec::new());
}
let error_text = assistant_event
.get("error")
.or_else(|| raw.get("error"))
.map(value_to_string)
.unwrap_or_else(|| "Pi message error".to_string());
self.message_reasoning.remove(&message_id);
self.message_text.remove(&message_id);
self.message_errors.insert(message_id.clone());
self.message_started.remove(&message_id);
self.message_completed.insert(message_id.clone());
self.clear_last_message_id(Some(&message_id));
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(message_id),
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content: vec![ContentPart::Text { text: error_text }],
status: ItemStatus::Failed,
};
Ok(vec![EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
)])
}
other => Err(format!("unsupported assistantMessageEvent: {other}")),
}
}
fn message_end(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let message = raw.get("message");
if is_user_role(message) {
return Ok(Vec::new());
}
let message_id = self
.ensure_message_id(extract_message_id(raw).or_else(|| self.last_message_id.clone()));
if self.message_errors.remove(&message_id) {
self.message_text.remove(&message_id);
self.message_reasoning.remove(&message_id);
self.message_started.remove(&message_id);
self.clear_last_message_id(Some(&message_id));
return Ok(Vec::new());
}
if self.message_completed.contains(&message_id) {
self.clear_last_message_id(Some(&message_id));
return Ok(Vec::new());
}
let conversion = self.complete_message(Some(message_id.clone()), message);
self.message_completed.insert(message_id.clone());
self.clear_last_message_id(Some(&message_id));
Ok(vec![conversion])
}
fn complete_message(
&mut self,
message_id: Option<String>,
message: Option<&Value>,
) -> EventConversion {
let mut content = message.and_then(parse_message_content).unwrap_or_default();
let failed = message_is_failed(message);
let message_error_text = extract_message_error_text(message);
if let Some(id) = message_id.clone() {
if content.is_empty() {
if let Some(text) = self.message_text.remove(&id) {
if !text.is_empty() {
content.push(ContentPart::Text { text });
}
}
} else {
self.message_text.remove(&id);
}
if let Some(reasoning) = self.message_reasoning.remove(&id) {
if !reasoning.trim().is_empty()
&& !content
.iter()
.any(|part| matches!(part, ContentPart::Reasoning { .. }))
{
content.push(ContentPart::Reasoning {
text: reasoning,
visibility: ReasoningVisibility::Private,
});
}
}
self.message_started.remove(&id);
}
if failed && content.is_empty() {
if let Some(text) = message_error_text {
content.push(ContentPart::Text { text });
}
}
let item = UniversalItem {
item_id: String::new(),
native_item_id: message_id,
parent_id: None,
kind: ItemKind::Message,
role: Some(ItemRole::Assistant),
content,
status: if failed {
ItemStatus::Failed
} else {
ItemStatus::Completed
},
};
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
)
}
fn tool_execution_start(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let tool_call_id =
extract_tool_call_id(raw).ok_or_else(|| "missing toolCallId".to_string())?;
let tool_name = extract_tool_name(raw).unwrap_or_else(|| "tool".to_string());
let arguments = raw
.get("args")
.or_else(|| raw.get("arguments"))
.map(value_to_string)
.unwrap_or_else(|| "{}".to_string());
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(tool_call_id.clone()),
parent_id: None,
kind: ItemKind::ToolCall,
role: Some(ItemRole::Assistant),
content: vec![ContentPart::ToolCall {
name: tool_name,
arguments,
call_id: tool_call_id,
}],
status: ItemStatus::InProgress,
};
Ok(vec![EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
)])
}
fn tool_execution_update(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let tool_call_id = match extract_tool_call_id(raw) {
Some(id) => id,
None => return Ok(Vec::new()),
};
let partial = match raw
.get("partialResult")
.or_else(|| raw.get("partial_result"))
{
Some(value) => value_to_string(value),
None => return Ok(Vec::new()),
};
let prior = self
.tool_result_buffers
.get(&tool_call_id)
.cloned()
.unwrap_or_default();
let delta = delta_from_partial(&prior, &partial);
self.tool_result_buffers
.insert(tool_call_id.clone(), partial);
let mut conversions = Vec::new();
if self.tool_result_started.insert(tool_call_id.clone()) {
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(tool_call_id.clone()),
parent_id: None,
kind: ItemKind::ToolResult,
role: Some(ItemRole::Tool),
content: vec![ContentPart::ToolResult {
call_id: tool_call_id.clone(),
output: String::new(),
}],
status: ItemStatus::InProgress,
};
conversions.push(
EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
)
.synthetic(),
);
}
if !delta.is_empty() {
conversions.push(
EventConversion::new(
UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(),
native_item_id: Some(tool_call_id.clone()),
delta,
}),
)
.synthetic(),
);
}
Ok(conversions)
}
fn tool_execution_end(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
let tool_call_id =
extract_tool_call_id(raw).ok_or_else(|| "missing toolCallId".to_string())?;
self.tool_result_buffers.remove(&tool_call_id);
self.tool_result_started.remove(&tool_call_id);
let output = raw
.get("result")
.and_then(extract_result_content)
.unwrap_or_default();
let is_error = raw.get("isError").and_then(Value::as_bool).unwrap_or(false);
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(tool_call_id.clone()),
parent_id: None,
kind: ItemKind::ToolResult,
role: Some(ItemRole::Tool),
content: vec![ContentPart::ToolResult {
call_id: tool_call_id,
output,
}],
status: if is_error {
ItemStatus::Failed
} else {
ItemStatus::Completed
},
};
Ok(vec![EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
)])
}
}
pub fn event_to_universal(event: &schema::RpcEvent) -> Result<Vec<EventConversion>, String> {
PiEventConverter::default().event_to_universal(event)
}
pub fn event_value_to_universal(raw: &Value) -> Result<Vec<EventConversion>, String> {
PiEventConverter::default().event_value_to_universal(raw)
}
fn attach_metadata(
conversion: EventConversion,
native_session_id: &Option<String>,
raw: &Value,
) -> EventConversion {
conversion
.with_native_session(native_session_id.clone())
.with_raw(Some(raw.clone()))
}
fn status_event(label: &str, raw: &Value) -> EventConversion {
let detail = raw
.get("error")
.or_else(|| raw.get("message"))
.map(value_to_string);
let item = UniversalItem {
item_id: String::new(),
native_item_id: None,
parent_id: None,
kind: ItemKind::Status,
role: Some(ItemRole::System),
content: vec![ContentPart::Status {
label: pi_status_label(label),
detail,
}],
status: ItemStatus::Completed,
};
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData { item }),
)
}
fn pi_status_label(label: &str) -> String {
match label {
"turn_end" => "turn.completed".to_string(),
"agent_end" => "session.idle".to_string(),
_ => format!("pi.{label}"),
}
}
fn item_delta(message_id: Option<String>, delta: String) -> EventConversion {
EventConversion::new(
UniversalEventType::ItemDelta,
UniversalEventData::ItemDelta(ItemDeltaData {
item_id: String::new(),
native_item_id: message_id,
delta,
}),
)
}
fn is_user_role(message: Option<&Value>) -> bool {
message
.and_then(|msg| msg.get("role"))
.and_then(Value::as_str)
.is_some_and(|role| role == "user")
}
fn extract_session_id(value: &Value) -> Option<String> {
extract_string(value, &["sessionId"])
.or_else(|| extract_string(value, &["session_id"]))
.or_else(|| extract_string(value, &["session", "id"]))
.or_else(|| extract_string(value, &["message", "sessionId"]))
}
fn extract_message_id(value: &Value) -> Option<String> {
extract_string(value, &["messageId"])
.or_else(|| extract_string(value, &["message_id"]))
.or_else(|| extract_string(value, &["message", "id"]))
.or_else(|| extract_string(value, &["message", "messageId"]))
.or_else(|| extract_string(value, &["assistantMessageEvent", "messageId"]))
}
fn extract_tool_call_id(value: &Value) -> Option<String> {
extract_string(value, &["toolCallId"]).or_else(|| extract_string(value, &["tool_call_id"]))
}
fn extract_tool_name(value: &Value) -> Option<String> {
extract_string(value, &["toolName"]).or_else(|| extract_string(value, &["tool_name"]))
}
fn extract_string(value: &Value, path: &[&str]) -> Option<String> {
let mut current = value;
for key in path {
current = current.get(*key)?;
}
current.as_str().map(|value| value.to_string())
}
fn extract_delta_text(event: &Value) -> Option<String> {
if let Some(value) = event.get("delta") {
return Some(value_to_string(value));
}
if let Some(value) = event.get("text") {
return Some(value_to_string(value));
}
if let Some(value) = event.get("partial") {
return extract_text_from_value(value);
}
if let Some(value) = event.get("content") {
return extract_text_from_value(value);
}
None
}
fn extract_text_from_value(value: &Value) -> Option<String> {
if let Some(text) = value.as_str() {
return Some(text.to_string());
}
if let Some(text) = value.get("text").and_then(Value::as_str) {
return Some(text.to_string());
}
if let Some(text) = value.get("content").and_then(Value::as_str) {
return Some(text.to_string());
}
None
}
fn extract_result_content(value: &Value) -> Option<String> {
let content = value.get("content").and_then(Value::as_str);
let text = value.get("text").and_then(Value::as_str);
content
.or(text)
.map(|value| value.to_string())
.or_else(|| Some(value_to_string(value)))
}
fn parse_message_content(message: &Value) -> Option<Vec<ContentPart>> {
if let Some(text) = message.as_str() {
return Some(vec![ContentPart::Text {
text: text.to_string(),
}]);
}
let content_value = message
.get("content")
.or_else(|| message.get("text"))
.or_else(|| message.get("value"))?;
let mut parts = Vec::new();
match content_value {
Value::String(text) => parts.push(ContentPart::Text { text: text.clone() }),
Value::Array(items) => {
for item in items {
if let Some(part) = content_part_from_value(item) {
parts.push(part);
}
}
}
Value::Object(_) => {
if let Some(part) = content_part_from_value(content_value) {
parts.push(part);
}
}
_ => {}
}
Some(parts)
}
fn message_is_failed(message: Option<&Value>) -> bool {
message
.and_then(|value| {
value
.get("stopReason")
.or_else(|| value.get("stop_reason"))
.and_then(Value::as_str)
})
.is_some_and(|reason| reason == "error" || reason == "aborted")
}
fn extract_message_error_text(message: Option<&Value>) -> Option<String> {
let value = message?;
if let Some(text) = value
.get("errorMessage")
.or_else(|| value.get("error_message"))
.and_then(Value::as_str)
{
let trimmed = text.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
let error = value.get("error")?;
if let Some(text) = error.as_str() {
let trimmed = text.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
if let Some(text) = error
.get("errorMessage")
.or_else(|| error.get("error_message"))
.or_else(|| error.get("message"))
.and_then(Value::as_str)
{
let trimmed = text.trim();
if !trimmed.is_empty() {
return Some(trimmed.to_string());
}
}
None
}
fn content_part_from_value(value: &Value) -> Option<ContentPart> {
if let Some(text) = value.as_str() {
return Some(ContentPart::Text {
text: text.to_string(),
});
}
let part_type = value.get("type").and_then(Value::as_str);
match part_type {
Some("text") | Some("markdown") => {
extract_text_from_value(value).map(|text| ContentPart::Text { text })
}
Some("thinking") | Some("reasoning") => {
extract_text_from_value(value).map(|text| ContentPart::Reasoning {
text,
visibility: ReasoningVisibility::Private,
})
}
Some("image") => value
.get("path")
.or_else(|| value.get("url"))
.and_then(|path| {
path.as_str().map(|path| ContentPart::Image {
path: path.to_string(),
mime: value
.get("mime")
.or_else(|| value.get("mimeType"))
.and_then(Value::as_str)
.map(|mime| mime.to_string()),
})
}),
Some("tool_call") | Some("toolcall") => {
let name = value
.get("name")
.and_then(Value::as_str)
.unwrap_or("tool")
.to_string();
let arguments = value
.get("arguments")
.or_else(|| value.get("args"))
.map(value_to_string)
.unwrap_or_else(|| "{}".to_string());
let call_id = value
.get("call_id")
.or_else(|| value.get("callId"))
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
Some(ContentPart::ToolCall {
name,
arguments,
call_id,
})
}
Some("tool_result") => {
let call_id = value
.get("call_id")
.or_else(|| value.get("callId"))
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let output = value
.get("output")
.or_else(|| value.get("content"))
.map(value_to_string)
.unwrap_or_default();
Some(ContentPart::ToolResult { call_id, output })
}
_ => Some(ContentPart::Json {
json: value.clone(),
}),
}
}
fn value_to_string(value: &Value) -> String {
if let Some(text) = value.as_str() {
text.to_string()
} else {
value.to_string()
}
}
fn delta_from_partial(previous: &str, next: &str) -> String {
if next.starts_with(previous) {
next[previous.len()..].to_string()
} else {
next.to_string()
}
}

View file

@ -3,13 +3,13 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use utoipa::ToSchema;
pub use sandbox_agent_extracted_agent_schemas::{amp, claude, codex, opencode};
pub use sandbox_agent_extracted_agent_schemas::{amp, claude, codex, opencode, pi};
pub mod agents;
pub use agents::{
amp as convert_amp, claude as convert_claude, codex as convert_codex,
opencode as convert_opencode,
opencode as convert_opencode, pi as convert_pi,
};
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
@ -226,7 +226,7 @@ pub enum ItemKind {
Unknown,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum ItemRole {
User,
@ -235,7 +235,7 @@ pub enum ItemRole {
Tool,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, ToSchema)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema, ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum ItemStatus {
InProgress,

View file

@ -0,0 +1,414 @@
use sandbox_agent_universal_agent_schema::convert_pi::PiEventConverter;
use sandbox_agent_universal_agent_schema::pi as pi_schema;
use sandbox_agent_universal_agent_schema::{
ContentPart, ItemKind, ItemRole, ItemStatus, UniversalEventData, UniversalEventType,
};
use serde_json::json;
fn parse_event(value: serde_json::Value) -> pi_schema::RpcEvent {
serde_json::from_value(value).expect("pi event")
}
#[test]
fn pi_message_flow_converts() {
let mut converter = PiEventConverter::default();
let start_event = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello" }]
}
}));
let start_events = converter
.event_to_universal(&start_event)
.expect("start conversions");
assert_eq!(start_events[0].event_type, UniversalEventType::ItemStarted);
if let UniversalEventData::Item(item) = &start_events[0].data {
assert_eq!(item.item.kind, ItemKind::Message);
assert_eq!(item.item.role, Some(ItemRole::Assistant));
assert_eq!(item.item.status, ItemStatus::InProgress);
} else {
panic!("expected item event");
}
let update_event = parse_event(json!({
"type": "message_update",
"sessionId": "session-1",
"messageId": "msg-1",
"assistantMessageEvent": { "type": "text_delta", "delta": " world" }
}));
let update_events = converter
.event_to_universal(&update_event)
.expect("update conversions");
assert_eq!(update_events[0].event_type, UniversalEventType::ItemDelta);
let end_event = parse_event(json!({
"type": "message_end",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello world" }]
}
}));
let end_events = converter
.event_to_universal(&end_event)
.expect("end conversions");
assert_eq!(end_events[0].event_type, UniversalEventType::ItemCompleted);
if let UniversalEventData::Item(item) = &end_events[0].data {
assert_eq!(item.item.kind, ItemKind::Message);
assert_eq!(item.item.role, Some(ItemRole::Assistant));
assert_eq!(item.item.status, ItemStatus::Completed);
} else {
panic!("expected item event");
}
}
#[test]
fn pi_user_message_echo_is_skipped() {
let mut converter = PiEventConverter::default();
// Pi may echo the user message as a message_start with role "user".
// The daemon already records synthetic user events, so the converter
// must skip these to avoid a duplicate assistant-looking bubble.
let start_event = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "user-msg-1",
"message": {
"role": "user",
"content": [{ "type": "text", "text": "hello!" }]
}
}));
let events = converter
.event_to_universal(&start_event)
.expect("user message_start should not error");
assert!(
events.is_empty(),
"user message_start should produce no events, got {}",
events.len()
);
let end_event = parse_event(json!({
"type": "message_end",
"sessionId": "session-1",
"messageId": "user-msg-1",
"message": {
"role": "user",
"content": [{ "type": "text", "text": "hello!" }]
}
}));
let events = converter
.event_to_universal(&end_event)
.expect("user message_end should not error");
assert!(
events.is_empty(),
"user message_end should produce no events, got {}",
events.len()
);
// A subsequent assistant message should still work normally.
let assistant_start = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello! How can I help?" }]
}
}));
let events = converter
.event_to_universal(&assistant_start)
.expect("assistant message_start");
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, UniversalEventType::ItemStarted);
if let UniversalEventData::Item(item) = &events[0].data {
assert_eq!(item.item.role, Some(ItemRole::Assistant));
} else {
panic!("expected item event");
}
}
#[test]
fn pi_tool_execution_converts_with_partial_deltas() {
let mut converter = PiEventConverter::default();
let start_event = parse_event(json!({
"type": "tool_execution_start",
"sessionId": "session-1",
"toolCallId": "call-1",
"toolName": "bash",
"args": { "command": "ls" }
}));
let start_events = converter
.event_to_universal(&start_event)
.expect("tool start");
assert_eq!(start_events[0].event_type, UniversalEventType::ItemStarted);
if let UniversalEventData::Item(item) = &start_events[0].data {
assert_eq!(item.item.kind, ItemKind::ToolCall);
assert_eq!(item.item.role, Some(ItemRole::Assistant));
match &item.item.content[0] {
ContentPart::ToolCall { name, .. } => assert_eq!(name, "bash"),
_ => panic!("expected tool call content"),
}
}
let update_event = parse_event(json!({
"type": "tool_execution_update",
"sessionId": "session-1",
"toolCallId": "call-1",
"partialResult": "foo"
}));
let update_events = converter
.event_to_universal(&update_event)
.expect("tool update");
assert!(update_events
.iter()
.any(|event| event.event_type == UniversalEventType::ItemDelta));
let update_event2 = parse_event(json!({
"type": "tool_execution_update",
"sessionId": "session-1",
"toolCallId": "call-1",
"partialResult": "foobar"
}));
let update_events2 = converter
.event_to_universal(&update_event2)
.expect("tool update 2");
let delta = update_events2
.iter()
.find_map(|event| match &event.data {
UniversalEventData::ItemDelta(data) => Some(data.delta.clone()),
_ => None,
})
.unwrap_or_default();
assert_eq!(delta, "bar");
let end_event = parse_event(json!({
"type": "tool_execution_end",
"sessionId": "session-1",
"toolCallId": "call-1",
"result": { "type": "text", "content": "done" },
"isError": false
}));
let end_events = converter.event_to_universal(&end_event).expect("tool end");
assert_eq!(end_events[0].event_type, UniversalEventType::ItemCompleted);
if let UniversalEventData::Item(item) = &end_events[0].data {
assert_eq!(item.item.kind, ItemKind::ToolResult);
assert_eq!(item.item.role, Some(ItemRole::Tool));
match &item.item.content[0] {
ContentPart::ToolResult { output, .. } => assert_eq!(output, "done"),
_ => panic!("expected tool result content"),
}
}
}
#[test]
fn pi_unknown_event_returns_error() {
let mut converter = PiEventConverter::default();
let event = parse_event(json!({
"type": "unknown_event",
"sessionId": "session-1"
}));
assert!(converter.event_to_universal(&event).is_err());
}
#[test]
fn pi_turn_and_agent_end_emit_terminal_status_labels() {
let mut converter = PiEventConverter::default();
let turn_end = parse_event(json!({
"type": "turn_end",
"sessionId": "session-1"
}));
let turn_events = converter
.event_to_universal(&turn_end)
.expect("turn_end conversions");
assert_eq!(turn_events[0].event_type, UniversalEventType::ItemCompleted);
if let UniversalEventData::Item(item) = &turn_events[0].data {
assert_eq!(item.item.kind, ItemKind::Status);
assert!(
matches!(
item.item.content.first(),
Some(ContentPart::Status { label, .. }) if label == "turn.completed"
),
"turn_end should map to turn.completed status"
);
} else {
panic!("expected item event");
}
let agent_end = parse_event(json!({
"type": "agent_end",
"sessionId": "session-1"
}));
let agent_events = converter
.event_to_universal(&agent_end)
.expect("agent_end conversions");
assert_eq!(
agent_events[0].event_type,
UniversalEventType::ItemCompleted
);
if let UniversalEventData::Item(item) = &agent_events[0].data {
assert_eq!(item.item.kind, ItemKind::Status);
assert!(
matches!(
item.item.content.first(),
Some(ContentPart::Status { label, .. }) if label == "session.idle"
),
"agent_end should map to session.idle status"
);
} else {
panic!("expected item event");
}
}
#[test]
fn pi_message_done_completes_without_message_end() {
let mut converter = PiEventConverter::default();
let start_event = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello" }]
}
}));
let _start_events = converter
.event_to_universal(&start_event)
.expect("start conversions");
let update_event = parse_event(json!({
"type": "message_update",
"sessionId": "session-1",
"messageId": "msg-1",
"assistantMessageEvent": { "type": "text_delta", "delta": " world" }
}));
let _update_events = converter
.event_to_universal(&update_event)
.expect("update conversions");
let done_event = parse_event(json!({
"type": "message_update",
"sessionId": "session-1",
"messageId": "msg-1",
"assistantMessageEvent": { "type": "done" }
}));
let done_events = converter
.event_to_universal(&done_event)
.expect("done conversions");
assert_eq!(done_events[0].event_type, UniversalEventType::ItemCompleted);
if let UniversalEventData::Item(item) = &done_events[0].data {
assert_eq!(item.item.status, ItemStatus::Completed);
assert!(
matches!(item.item.content.get(0), Some(ContentPart::Text { text }) if text == "Hello world")
);
} else {
panic!("expected item event");
}
}
#[test]
fn pi_message_done_then_message_end_does_not_double_complete() {
let mut converter = PiEventConverter::default();
let start_event = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello" }]
}
}));
let _ = converter
.event_to_universal(&start_event)
.expect("start conversions");
let update_event = parse_event(json!({
"type": "message_update",
"sessionId": "session-1",
"messageId": "msg-1",
"assistantMessageEvent": { "type": "text_delta", "delta": " world" }
}));
let _ = converter
.event_to_universal(&update_event)
.expect("update conversions");
let done_event = parse_event(json!({
"type": "message_update",
"sessionId": "session-1",
"messageId": "msg-1",
"assistantMessageEvent": { "type": "done" }
}));
let done_events = converter
.event_to_universal(&done_event)
.expect("done conversions");
assert_eq!(done_events.len(), 1);
assert_eq!(done_events[0].event_type, UniversalEventType::ItemCompleted);
let end_event = parse_event(json!({
"type": "message_end",
"sessionId": "session-1",
"messageId": "msg-1",
"message": {
"role": "assistant",
"content": [{ "type": "text", "text": "Hello world" }]
}
}));
let end_events = converter
.event_to_universal(&end_event)
.expect("end conversions");
assert!(
end_events.is_empty(),
"message_end after done should not emit a second completion"
);
}
#[test]
fn pi_message_end_error_surfaces_failed_status_and_error_text() {
let mut converter = PiEventConverter::default();
let start_event = parse_event(json!({
"type": "message_start",
"sessionId": "session-1",
"messageId": "msg-err",
"message": {
"role": "assistant",
"content": []
}
}));
let _ = converter
.event_to_universal(&start_event)
.expect("start conversions");
let end_raw = json!({
"type": "message_end",
"sessionId": "session-1",
"messageId": "msg-err",
"message": {
"role": "assistant",
"content": [],
"stopReason": "error",
"errorMessage": "Connection error."
}
});
let end_events = converter
.event_value_to_universal(&end_raw)
.expect("end conversions");
assert_eq!(end_events[0].event_type, UniversalEventType::ItemCompleted);
if let UniversalEventData::Item(item) = &end_events[0].data {
assert_eq!(item.item.status, ItemStatus::Failed);
assert!(
matches!(item.item.content.first(), Some(ContentPart::Text { text }) if text == "Connection error.")
);
} else {
panic!("expected item event");
}
}