mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-15 07:04:48 +00:00
wip: pi working
This commit is contained in:
parent
a6064e7027
commit
bef2e84d0c
9 changed files with 1747 additions and 39 deletions
|
|
@ -84,4 +84,9 @@ Message normalization notes
|
||||||
- If Pi message_update events omit messageId, we synthesize a stable message id and emit a synthetic item.started before the first delta so streaming text stays grouped.
|
- If Pi message_update events omit messageId, we synthesize a stable message id and emit a synthetic item.started before the first delta so streaming text stays grouped.
|
||||||
- Pi auto_compaction_start/auto_compaction_end and auto_retry_start/auto_retry_end events are mapped to status items (label `pi.*`).
|
- Pi auto_compaction_start/auto_compaction_end and auto_retry_start/auto_retry_end events are mapped to status items (label `pi.*`).
|
||||||
- Pi extension_ui_request/extension_error events are mapped to status items.
|
- Pi extension_ui_request/extension_error events are mapped to status items.
|
||||||
|
<<<<<<< Updated upstream
|
||||||
- Pi RPC from pi-coding-agent does not include sessionId in events; each daemon session owns a dedicated Pi RPC process, so events are routed by runtime ownership (parallel sessions supported).
|
- Pi RPC from pi-coding-agent does not include sessionId in events; each daemon session owns a dedicated Pi RPC process, so events are routed by runtime ownership (parallel sessions supported).
|
||||||
|
=======
|
||||||
|
- Pi runtime mode is capability-selected: default is per-session process isolation, while shared multiplexing is used only for allowlisted Pi capabilities.
|
||||||
|
- In shared mode, pi-coding-agent events without sessionId are routed using the current-session mapping.
|
||||||
|
>>>>>>> Stashed changes
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ console.log(url);
|
||||||
- **Send messages**: Post messages to a session directly from the UI
|
- **Send messages**: Post messages to a session directly from the UI
|
||||||
- **Agent selection**: Switch between agents and modes
|
- **Agent selection**: Switch between agents and modes
|
||||||
- **Request log**: View raw HTTP requests and responses for debugging
|
- **Request log**: View raw HTTP requests and responses for debugging
|
||||||
|
- **Pi concurrent sessions**: Pi sessions run concurrently by default via per-session runtime processes
|
||||||
|
|
||||||
## When to Use
|
## When to Use
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,11 @@
|
||||||
"license": {
|
"license": {
|
||||||
"name": "Apache-2.0"
|
"name": "Apache-2.0"
|
||||||
},
|
},
|
||||||
|
<<<<<<< Updated upstream
|
||||||
"version": "0.1.7"
|
"version": "0.1.7"
|
||||||
|
=======
|
||||||
|
"version": "0.1.6"
|
||||||
|
>>>>>>> Stashed changes
|
||||||
},
|
},
|
||||||
"servers": [
|
"servers": [
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,15 @@
|
||||||
# Pi Agent Support Plan (pi-mono)
|
# Pi Agent Support Plan (pi-mono)
|
||||||
|
|
||||||
|
## Implementation Status Update
|
||||||
|
|
||||||
|
- Runtime selection now supports two internal modes:
|
||||||
|
- `PerSession` (default for unknown/non-allowlisted Pi capabilities)
|
||||||
|
- `Shared` (allowlist-only compatibility path)
|
||||||
|
- Pi sessions now use per-session process isolation by default, enabling true concurrent Pi sessions in Inspector and API clients.
|
||||||
|
- Shared Pi server code remains available and is used only when capability checks allow multiplexing.
|
||||||
|
- Session termination for per-session Pi mode hard-kills the underlying Pi process and clears queued prompts/pending waiters.
|
||||||
|
- In-session concurrent sends are serialized with an unbounded daemon-side FIFO queue per session.
|
||||||
|
|
||||||
## Investigation Summary
|
## Investigation Summary
|
||||||
|
|
||||||
### Pi CLI modes and RPC protocol
|
### Pi CLI modes and RPC protocol
|
||||||
|
|
|
||||||
|
|
@ -2628,8 +2628,12 @@ pub fn build_opencode_router(state: Arc<OpenCodeAppState>) -> Router {
|
||||||
responses((status = 200)),
|
responses((status = 200)),
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
|
<<<<<<< Updated upstream
|
||||||
async fn oc_agent_list(State(state): State<Arc<OpenCodeAppState>>) -> impl IntoResponse {
|
async fn oc_agent_list(State(state): State<Arc<OpenCodeAppState>>) -> impl IntoResponse {
|
||||||
let name = state.inner.branding.product_name();
|
let name = state.inner.branding.product_name();
|
||||||
|
=======
|
||||||
|
async fn oc_agent_list(State(_state): State<Arc<OpenCodeAppState>>) -> impl IntoResponse {
|
||||||
|
>>>>>>> Stashed changes
|
||||||
let agent = json!({
|
let agent = json!({
|
||||||
"name": name,
|
"name": name,
|
||||||
"description": format!("{name} compatibility layer"),
|
"description": format!("{name} compatibility layer"),
|
||||||
|
|
@ -4274,6 +4278,7 @@ async fn oc_file_list() -> impl IntoResponse {
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_file_content(Query(query): Query<FileContentQuery>) -> impl IntoResponse {
|
async fn oc_file_content(Query(query): Query<FileContentQuery>) -> impl IntoResponse {
|
||||||
|
let _directory = query.directory.as_deref();
|
||||||
if query.path.is_none() {
|
if query.path.is_none() {
|
||||||
return bad_request("path is required").into_response();
|
return bad_request("path is required").into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -4304,6 +4309,7 @@ async fn oc_file_status() -> impl IntoResponse {
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_find_text(Query(query): Query<FindTextQuery>) -> impl IntoResponse {
|
async fn oc_find_text(Query(query): Query<FindTextQuery>) -> impl IntoResponse {
|
||||||
|
let _directory = query.directory.as_deref();
|
||||||
if query.pattern.is_none() {
|
if query.pattern.is_none() {
|
||||||
return bad_request("pattern is required").into_response();
|
return bad_request("pattern is required").into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -4317,6 +4323,7 @@ async fn oc_find_text(Query(query): Query<FindTextQuery>) -> impl IntoResponse {
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_find_files(Query(query): Query<FindFilesQuery>) -> impl IntoResponse {
|
async fn oc_find_files(Query(query): Query<FindFilesQuery>) -> impl IntoResponse {
|
||||||
|
let _directory = query.directory.as_deref();
|
||||||
if query.query.is_none() {
|
if query.query.is_none() {
|
||||||
return bad_request("query is required").into_response();
|
return bad_request("query is required").into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -4330,6 +4337,7 @@ async fn oc_find_files(Query(query): Query<FindFilesQuery>) -> impl IntoResponse
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_find_symbols(Query(query): Query<FindSymbolsQuery>) -> impl IntoResponse {
|
async fn oc_find_symbols(Query(query): Query<FindSymbolsQuery>) -> impl IntoResponse {
|
||||||
|
let _directory = query.directory.as_deref();
|
||||||
if query.query.is_none() {
|
if query.query.is_none() {
|
||||||
return bad_request("query is required").into_response();
|
return bad_request("query is required").into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -4446,6 +4454,7 @@ async fn oc_tool_ids() -> impl IntoResponse {
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_tool_list(Query(query): Query<ToolQuery>) -> impl IntoResponse {
|
async fn oc_tool_list(Query(query): Query<ToolQuery>) -> impl IntoResponse {
|
||||||
|
let _directory = query.directory.as_deref();
|
||||||
if query.provider.is_none() || query.model.is_none() {
|
if query.provider.is_none() || query.model.is_none() {
|
||||||
return bad_request("provider and model are required").into_response();
|
return bad_request("provider and model are required").into_response();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
|
|
@ -1,6 +1,32 @@
|
||||||
// Pi RPC integration tests (gated via SANDBOX_TEST_PI + PATH).
|
// Pi RPC integration tests (gated via SANDBOX_TEST_PI + PATH).
|
||||||
include!("../common/http.rs");
|
include!("../common/http.rs");
|
||||||
|
|
||||||
|
<<<<<<< Updated upstream
|
||||||
|
=======
|
||||||
|
struct EnvVarGuard {
|
||||||
|
key: String,
|
||||||
|
previous: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for EnvVarGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
match &self.previous {
|
||||||
|
Some(value) => std::env::set_var(&self.key, value),
|
||||||
|
None => std::env::remove_var(&self.key),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_env_var(key: &str, value: &str) -> EnvVarGuard {
|
||||||
|
let previous = std::env::var(key).ok();
|
||||||
|
std::env::set_var(key, value);
|
||||||
|
EnvVarGuard {
|
||||||
|
key: key.to_string(),
|
||||||
|
previous,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
>>>>>>> Stashed changes
|
||||||
fn pi_test_config() -> Option<TestAgentConfig> {
|
fn pi_test_config() -> Option<TestAgentConfig> {
|
||||||
let configs = match test_agents_from_env() {
|
let configs = match test_agents_from_env() {
|
||||||
Ok(configs) => configs,
|
Ok(configs) => configs,
|
||||||
|
|
@ -13,6 +39,83 @@ fn pi_test_config() -> Option<TestAgentConfig> {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.find(|config| config.agent == AgentId::Pi)
|
.find(|config| config.agent == AgentId::Pi)
|
||||||
}
|
}
|
||||||
|
<<<<<<< Updated upstream
|
||||||
|
=======
|
||||||
|
|
||||||
|
async fn create_pi_session_checked(app: &Router, session_id: &str) -> Value {
|
||||||
|
let (status, payload) = send_json(
|
||||||
|
app,
|
||||||
|
Method::POST,
|
||||||
|
&format!("/v1/sessions/{session_id}"),
|
||||||
|
Some(json!({
|
||||||
|
"agent": "pi",
|
||||||
|
"permissionMode": test_permission_mode(AgentId::Pi),
|
||||||
|
})),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(status, StatusCode::OK, "create pi session {session_id}");
|
||||||
|
payload
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn poll_events_until_assistant_count(
|
||||||
|
app: &Router,
|
||||||
|
session_id: &str,
|
||||||
|
expected_assistant_messages: usize,
|
||||||
|
timeout: Duration,
|
||||||
|
) -> Vec<Value> {
|
||||||
|
let start = Instant::now();
|
||||||
|
let mut offset = 0u64;
|
||||||
|
let mut events = Vec::new();
|
||||||
|
|
||||||
|
while start.elapsed() < timeout {
|
||||||
|
let path = format!("/v1/sessions/{session_id}/events?offset={offset}&limit=200");
|
||||||
|
let (status, payload) = send_json(app, Method::GET, &path, None).await;
|
||||||
|
assert_eq!(status, StatusCode::OK, "poll events");
|
||||||
|
let new_events = payload
|
||||||
|
.get("events")
|
||||||
|
.and_then(Value::as_array)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
if !new_events.is_empty() {
|
||||||
|
if let Some(last) = new_events
|
||||||
|
.last()
|
||||||
|
.and_then(|event| event.get("sequence"))
|
||||||
|
.and_then(Value::as_u64)
|
||||||
|
{
|
||||||
|
offset = last;
|
||||||
|
}
|
||||||
|
events.extend(new_events);
|
||||||
|
}
|
||||||
|
|
||||||
|
if events.iter().any(is_unparsed_event) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let assistant_count = events
|
||||||
|
.iter()
|
||||||
|
.filter(|event| is_assistant_message(event))
|
||||||
|
.count();
|
||||||
|
if assistant_count >= expected_assistant_messages {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if events.iter().any(is_error_event) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(Duration::from_millis(800)).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
events
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn pi_rpc_session_and_stream() {
|
||||||
|
let Some(config) = pi_test_config() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
>>>>>>> Stashed changes
|
||||||
|
|
||||||
async fn create_pi_session_with_native(app: &Router, session_id: &str) -> String {
|
async fn create_pi_session_with_native(app: &Router, session_id: &str) -> String {
|
||||||
let (status, payload) = send_json(
|
let (status, payload) = send_json(
|
||||||
|
|
@ -53,6 +156,7 @@ fn assert_strictly_increasing_sequences(events: &[Value], label: &str) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
<<<<<<< Updated upstream
|
||||||
fn assert_all_events_for_session(events: &[Value], session_id: &str) {
|
fn assert_all_events_for_session(events: &[Value], session_id: &str) {
|
||||||
for event in events {
|
for event in events {
|
||||||
let event_session_id = event
|
let event_session_id = event
|
||||||
|
|
@ -93,6 +197,11 @@ fn assert_item_started_ids_unique(events: &[Value], label: &str) {
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn pi_rpc_session_and_stream() {
|
async fn pi_rpc_session_and_stream() {
|
||||||
|
=======
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn pi_rpc_multi_session_create_per_session_mode() {
|
||||||
|
let _mode_guard = set_env_var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE", "per-session");
|
||||||
|
>>>>>>> Stashed changes
|
||||||
let Some(config) = pi_test_config() else {
|
let Some(config) = pi_test_config() else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
@ -101,6 +210,7 @@ async fn pi_rpc_session_and_stream() {
|
||||||
let _guard = apply_credentials(&config.credentials);
|
let _guard = apply_credentials(&config.credentials);
|
||||||
install_agent(&app.app, config.agent).await;
|
install_agent(&app.app, config.agent).await;
|
||||||
|
|
||||||
|
<<<<<<< Updated upstream
|
||||||
let session_id = "pi-rpc-session";
|
let session_id = "pi-rpc-session";
|
||||||
let _native_session_id = create_pi_session_with_native(&app.app, session_id).await;
|
let _native_session_id = create_pi_session_with_native(&app.app, session_id).await;
|
||||||
|
|
||||||
|
|
@ -119,6 +229,33 @@ async fn pi_rpc_session_and_stream() {
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn pi_parallel_sessions_turns() {
|
async fn pi_parallel_sessions_turns() {
|
||||||
|
=======
|
||||||
|
let first = create_pi_session_checked(&app.app, "pi-multi-a").await;
|
||||||
|
let second = create_pi_session_checked(&app.app, "pi-multi-b").await;
|
||||||
|
|
||||||
|
let first_native = first
|
||||||
|
.get("native_session_id")
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.unwrap_or("");
|
||||||
|
let second_native = second
|
||||||
|
.get("native_session_id")
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.unwrap_or("");
|
||||||
|
assert!(!first_native.is_empty(), "first native session id missing");
|
||||||
|
assert!(
|
||||||
|
!second_native.is_empty(),
|
||||||
|
"second native session id missing"
|
||||||
|
);
|
||||||
|
assert_ne!(
|
||||||
|
first_native, second_native,
|
||||||
|
"per-session mode should allocate independent native session ids"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn pi_rpc_per_session_queue_and_termination_isolation() {
|
||||||
|
let _mode_guard = set_env_var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE", "per-session");
|
||||||
|
>>>>>>> Stashed changes
|
||||||
let Some(config) = pi_test_config() else {
|
let Some(config) = pi_test_config() else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
@ -127,6 +264,7 @@ async fn pi_parallel_sessions_turns() {
|
||||||
let _guard = apply_credentials(&config.credentials);
|
let _guard = apply_credentials(&config.credentials);
|
||||||
install_agent(&app.app, config.agent).await;
|
install_agent(&app.app, config.agent).await;
|
||||||
|
|
||||||
|
<<<<<<< Updated upstream
|
||||||
let session_a = "pi-parallel-a";
|
let session_a = "pi-parallel-a";
|
||||||
let session_b = "pi-parallel-b";
|
let session_b = "pi-parallel-b";
|
||||||
create_pi_session_with_native(&app.app, session_a).await;
|
create_pi_session_with_native(&app.app, session_a).await;
|
||||||
|
|
@ -285,3 +423,105 @@ async fn pi_runtime_restart_scope() {
|
||||||
);
|
);
|
||||||
assert_all_events_for_session(&events_b, session_b);
|
assert_all_events_for_session(&events_b, session_b);
|
||||||
}
|
}
|
||||||
|
=======
|
||||||
|
create_pi_session_checked(&app.app, "pi-queue-a").await;
|
||||||
|
create_pi_session_checked(&app.app, "pi-queue-b").await;
|
||||||
|
|
||||||
|
let status = send_status(
|
||||||
|
&app.app,
|
||||||
|
Method::POST,
|
||||||
|
"/v1/sessions/pi-queue-a/messages",
|
||||||
|
Some(json!({ "message": "Reply with exactly FIRST." })),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(status, StatusCode::NO_CONTENT, "send first prompt");
|
||||||
|
|
||||||
|
let status = send_status(
|
||||||
|
&app.app,
|
||||||
|
Method::POST,
|
||||||
|
"/v1/sessions/pi-queue-a/messages",
|
||||||
|
Some(json!({ "message": "Reply with exactly SECOND." })),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(status, StatusCode::NO_CONTENT, "enqueue second prompt");
|
||||||
|
|
||||||
|
let status = send_status(
|
||||||
|
&app.app,
|
||||||
|
Method::POST,
|
||||||
|
"/v1/sessions/pi-queue-b/messages",
|
||||||
|
Some(json!({ "message": PROMPT })),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(
|
||||||
|
status,
|
||||||
|
StatusCode::NO_CONTENT,
|
||||||
|
"send prompt to sibling session"
|
||||||
|
);
|
||||||
|
|
||||||
|
let events_a =
|
||||||
|
poll_events_until_assistant_count(&app.app, "pi-queue-a", 2, Duration::from_secs(240))
|
||||||
|
.await;
|
||||||
|
let events_b =
|
||||||
|
poll_events_until_assistant_count(&app.app, "pi-queue-b", 1, Duration::from_secs(180))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
!events_a.iter().any(is_unparsed_event),
|
||||||
|
"session a emitted agent.unparsed"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!events_b.iter().any(is_unparsed_event),
|
||||||
|
"session b emitted agent.unparsed"
|
||||||
|
);
|
||||||
|
let assistant_count_a = events_a
|
||||||
|
.iter()
|
||||||
|
.filter(|event| is_assistant_message(event))
|
||||||
|
.count();
|
||||||
|
let assistant_count_b = events_b
|
||||||
|
.iter()
|
||||||
|
.filter(|event| is_assistant_message(event))
|
||||||
|
.count();
|
||||||
|
assert!(
|
||||||
|
assistant_count_a >= 2,
|
||||||
|
"expected at least two assistant completions for queued session, got {assistant_count_a}"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
assistant_count_b >= 1,
|
||||||
|
"expected assistant completion for sibling session, got {assistant_count_b}"
|
||||||
|
);
|
||||||
|
|
||||||
|
let status = send_status(
|
||||||
|
&app.app,
|
||||||
|
Method::POST,
|
||||||
|
"/v1/sessions/pi-queue-a/terminate",
|
||||||
|
Some(json!({})),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(status, StatusCode::NO_CONTENT, "terminate first session");
|
||||||
|
|
||||||
|
let status = send_status(
|
||||||
|
&app.app,
|
||||||
|
Method::POST,
|
||||||
|
"/v1/sessions/pi-queue-b/messages",
|
||||||
|
Some(json!({ "message": PROMPT })),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(
|
||||||
|
status,
|
||||||
|
StatusCode::NO_CONTENT,
|
||||||
|
"sibling session should continue after termination"
|
||||||
|
);
|
||||||
|
|
||||||
|
let events_b_after =
|
||||||
|
poll_events_until_assistant_count(&app.app, "pi-queue-b", 2, Duration::from_secs(180))
|
||||||
|
.await;
|
||||||
|
let assistant_count_b_after = events_b_after
|
||||||
|
.iter()
|
||||||
|
.filter(|event| is_assistant_message(event))
|
||||||
|
.count();
|
||||||
|
assert!(
|
||||||
|
assistant_count_b_after >= 2,
|
||||||
|
"expected additional assistant completion for sibling session after termination"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
>>>>>>> Stashed changes
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,46 @@ pub struct PiEventConverter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PiEventConverter {
|
impl PiEventConverter {
|
||||||
|
pub fn event_value_to_universal(
|
||||||
|
&mut self,
|
||||||
|
raw: &Value,
|
||||||
|
) -> Result<Vec<EventConversion>, String> {
|
||||||
|
let event_type = raw
|
||||||
|
.get("type")
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.ok_or_else(|| "missing event type".to_string())?;
|
||||||
|
let native_session_id = extract_session_id(raw);
|
||||||
|
|
||||||
|
let conversions = match event_type {
|
||||||
|
"message_start" => self.message_start(raw),
|
||||||
|
"message_update" => self.message_update(raw),
|
||||||
|
"message_end" => self.message_end(raw),
|
||||||
|
"tool_execution_start" => self.tool_execution_start(raw),
|
||||||
|
"tool_execution_update" => self.tool_execution_update(raw),
|
||||||
|
"tool_execution_end" => self.tool_execution_end(raw),
|
||||||
|
"agent_start"
|
||||||
|
| "agent_end"
|
||||||
|
| "turn_start"
|
||||||
|
| "turn_end"
|
||||||
|
| "auto_compaction"
|
||||||
|
| "auto_compaction_start"
|
||||||
|
| "auto_compaction_end"
|
||||||
|
| "auto_retry"
|
||||||
|
| "auto_retry_start"
|
||||||
|
| "auto_retry_end"
|
||||||
|
| "hook_error" => Ok(vec![status_event(event_type, raw)]),
|
||||||
|
"extension_ui_request" | "extension_ui_response" | "extension_error" => {
|
||||||
|
Ok(vec![status_event(event_type, raw)])
|
||||||
|
}
|
||||||
|
other => Err(format!("unsupported Pi event type: {other}")),
|
||||||
|
}?;
|
||||||
|
|
||||||
|
Ok(conversions
|
||||||
|
.into_iter()
|
||||||
|
.map(|conversion| attach_metadata(conversion, &native_session_id, raw))
|
||||||
|
.collect())
|
||||||
|
}
|
||||||
|
|
||||||
fn next_synthetic_message_id(&mut self) -> String {
|
fn next_synthetic_message_id(&mut self) -> String {
|
||||||
self.message_counter += 1;
|
self.message_counter += 1;
|
||||||
format!("pi_msg_{}", self.message_counter)
|
format!("pi_msg_{}", self.message_counter)
|
||||||
|
|
@ -72,40 +112,7 @@ impl PiEventConverter {
|
||||||
event: &schema::RpcEvent,
|
event: &schema::RpcEvent,
|
||||||
) -> Result<Vec<EventConversion>, String> {
|
) -> Result<Vec<EventConversion>, String> {
|
||||||
let raw = serde_json::to_value(event).map_err(|err| err.to_string())?;
|
let raw = serde_json::to_value(event).map_err(|err| err.to_string())?;
|
||||||
let event_type = raw
|
self.event_value_to_universal(&raw)
|
||||||
.get("type")
|
|
||||||
.and_then(Value::as_str)
|
|
||||||
.ok_or_else(|| "missing event type".to_string())?;
|
|
||||||
let native_session_id = extract_session_id(&raw);
|
|
||||||
|
|
||||||
let conversions = match event_type {
|
|
||||||
"message_start" => self.message_start(&raw),
|
|
||||||
"message_update" => self.message_update(&raw),
|
|
||||||
"message_end" => self.message_end(&raw),
|
|
||||||
"tool_execution_start" => self.tool_execution_start(&raw),
|
|
||||||
"tool_execution_update" => self.tool_execution_update(&raw),
|
|
||||||
"tool_execution_end" => self.tool_execution_end(&raw),
|
|
||||||
"agent_start"
|
|
||||||
| "agent_end"
|
|
||||||
| "turn_start"
|
|
||||||
| "turn_end"
|
|
||||||
| "auto_compaction"
|
|
||||||
| "auto_compaction_start"
|
|
||||||
| "auto_compaction_end"
|
|
||||||
| "auto_retry"
|
|
||||||
| "auto_retry_start"
|
|
||||||
| "auto_retry_end"
|
|
||||||
| "hook_error" => Ok(vec![status_event(event_type, &raw)]),
|
|
||||||
"extension_ui_request" | "extension_ui_response" | "extension_error" => {
|
|
||||||
Ok(vec![status_event(event_type, &raw)])
|
|
||||||
}
|
|
||||||
other => Err(format!("unsupported Pi event type: {other}")),
|
|
||||||
}?;
|
|
||||||
|
|
||||||
Ok(conversions
|
|
||||||
.into_iter()
|
|
||||||
.map(|conversion| attach_metadata(conversion, &native_session_id, &raw))
|
|
||||||
.collect())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn message_start(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
|
fn message_start(&mut self, raw: &Value) -> Result<Vec<EventConversion>, String> {
|
||||||
|
|
@ -265,6 +272,8 @@ impl PiEventConverter {
|
||||||
message: Option<&Value>,
|
message: Option<&Value>,
|
||||||
) -> EventConversion {
|
) -> EventConversion {
|
||||||
let mut content = message.and_then(parse_message_content).unwrap_or_default();
|
let mut content = message.and_then(parse_message_content).unwrap_or_default();
|
||||||
|
let failed = message_is_failed(message);
|
||||||
|
let message_error_text = extract_message_error_text(message);
|
||||||
|
|
||||||
if let Some(id) = message_id.clone() {
|
if let Some(id) = message_id.clone() {
|
||||||
if content.is_empty() {
|
if content.is_empty() {
|
||||||
|
|
@ -292,6 +301,12 @@ impl PiEventConverter {
|
||||||
self.message_started.remove(&id);
|
self.message_started.remove(&id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if failed && content.is_empty() {
|
||||||
|
if let Some(text) = message_error_text {
|
||||||
|
content.push(ContentPart::Text { text });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let item = UniversalItem {
|
let item = UniversalItem {
|
||||||
item_id: String::new(),
|
item_id: String::new(),
|
||||||
native_item_id: message_id,
|
native_item_id: message_id,
|
||||||
|
|
@ -299,7 +314,11 @@ impl PiEventConverter {
|
||||||
kind: ItemKind::Message,
|
kind: ItemKind::Message,
|
||||||
role: Some(ItemRole::Assistant),
|
role: Some(ItemRole::Assistant),
|
||||||
content,
|
content,
|
||||||
status: ItemStatus::Completed,
|
status: if failed {
|
||||||
|
ItemStatus::Failed
|
||||||
|
} else {
|
||||||
|
ItemStatus::Completed
|
||||||
|
},
|
||||||
};
|
};
|
||||||
EventConversion::new(
|
EventConversion::new(
|
||||||
UniversalEventType::ItemCompleted,
|
UniversalEventType::ItemCompleted,
|
||||||
|
|
@ -434,6 +453,10 @@ pub fn event_to_universal(event: &schema::RpcEvent) -> Result<Vec<EventConversio
|
||||||
PiEventConverter::default().event_to_universal(event)
|
PiEventConverter::default().event_to_universal(event)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn event_value_to_universal(raw: &Value) -> Result<Vec<EventConversion>, String> {
|
||||||
|
PiEventConverter::default().event_value_to_universal(raw)
|
||||||
|
}
|
||||||
|
|
||||||
fn attach_metadata(
|
fn attach_metadata(
|
||||||
conversion: EventConversion,
|
conversion: EventConversion,
|
||||||
native_session_id: &Option<String>,
|
native_session_id: &Option<String>,
|
||||||
|
|
@ -584,6 +607,53 @@ fn parse_message_content(message: &Value) -> Option<Vec<ContentPart>> {
|
||||||
Some(parts)
|
Some(parts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn message_is_failed(message: Option<&Value>) -> bool {
|
||||||
|
message
|
||||||
|
.and_then(|value| {
|
||||||
|
value
|
||||||
|
.get("stopReason")
|
||||||
|
.or_else(|| value.get("stop_reason"))
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
})
|
||||||
|
.is_some_and(|reason| reason == "error" || reason == "aborted")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn extract_message_error_text(message: Option<&Value>) -> Option<String> {
|
||||||
|
let value = message?;
|
||||||
|
|
||||||
|
if let Some(text) = value
|
||||||
|
.get("errorMessage")
|
||||||
|
.or_else(|| value.get("error_message"))
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
{
|
||||||
|
let trimmed = text.trim();
|
||||||
|
if !trimmed.is_empty() {
|
||||||
|
return Some(trimmed.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let error = value.get("error")?;
|
||||||
|
if let Some(text) = error.as_str() {
|
||||||
|
let trimmed = text.trim();
|
||||||
|
if !trimmed.is_empty() {
|
||||||
|
return Some(trimmed.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(text) = error
|
||||||
|
.get("errorMessage")
|
||||||
|
.or_else(|| error.get("error_message"))
|
||||||
|
.or_else(|| error.get("message"))
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
{
|
||||||
|
let trimmed = text.trim();
|
||||||
|
if !trimmed.is_empty() {
|
||||||
|
return Some(trimmed.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
fn content_part_from_value(value: &Value) -> Option<ContentPart> {
|
fn content_part_from_value(value: &Value) -> Option<ContentPart> {
|
||||||
if let Some(text) = value.as_str() {
|
if let Some(text) = value.as_str() {
|
||||||
return Some(ContentPart::Text {
|
return Some(ContentPart::Text {
|
||||||
|
|
|
||||||
|
|
@ -262,3 +262,46 @@ fn pi_message_done_completes_without_message_end() {
|
||||||
panic!("expected item event");
|
panic!("expected item event");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pi_message_end_error_surfaces_failed_status_and_error_text() {
|
||||||
|
let mut converter = PiEventConverter::default();
|
||||||
|
|
||||||
|
let start_event = parse_event(json!({
|
||||||
|
"type": "message_start",
|
||||||
|
"sessionId": "session-1",
|
||||||
|
"messageId": "msg-err",
|
||||||
|
"message": {
|
||||||
|
"role": "assistant",
|
||||||
|
"content": []
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
let _ = converter
|
||||||
|
.event_to_universal(&start_event)
|
||||||
|
.expect("start conversions");
|
||||||
|
|
||||||
|
let end_raw = json!({
|
||||||
|
"type": "message_end",
|
||||||
|
"sessionId": "session-1",
|
||||||
|
"messageId": "msg-err",
|
||||||
|
"message": {
|
||||||
|
"role": "assistant",
|
||||||
|
"content": [],
|
||||||
|
"stopReason": "error",
|
||||||
|
"errorMessage": "Connection error."
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let end_events = converter
|
||||||
|
.event_value_to_universal(&end_raw)
|
||||||
|
.expect("end conversions");
|
||||||
|
|
||||||
|
assert_eq!(end_events[0].event_type, UniversalEventType::ItemCompleted);
|
||||||
|
if let UniversalEventData::Item(item) = &end_events[0].data {
|
||||||
|
assert_eq!(item.item.status, ItemStatus::Failed);
|
||||||
|
assert!(
|
||||||
|
matches!(item.item.content.first(), Some(ContentPart::Text { text }) if text == "Connection error.")
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
panic!("expected item event");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue