mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-17 10:02:26 +00:00
wip: pi working with variatns
This commit is contained in:
parent
bef2e84d0c
commit
9a26604001
6 changed files with 530 additions and 1637 deletions
|
|
@ -84,9 +84,6 @@ Message normalization notes
|
||||||
- If Pi message_update events omit messageId, we synthesize a stable message id and emit a synthetic item.started before the first delta so streaming text stays grouped.
|
- If Pi message_update events omit messageId, we synthesize a stable message id and emit a synthetic item.started before the first delta so streaming text stays grouped.
|
||||||
- Pi auto_compaction_start/auto_compaction_end and auto_retry_start/auto_retry_end events are mapped to status items (label `pi.*`).
|
- Pi auto_compaction_start/auto_compaction_end and auto_retry_start/auto_retry_end events are mapped to status items (label `pi.*`).
|
||||||
- Pi extension_ui_request/extension_error events are mapped to status items.
|
- Pi extension_ui_request/extension_error events are mapped to status items.
|
||||||
<<<<<<< Updated upstream
|
|
||||||
- Pi RPC from pi-coding-agent does not include sessionId in events; each daemon session owns a dedicated Pi RPC process, so events are routed by runtime ownership (parallel sessions supported).
|
- Pi RPC from pi-coding-agent does not include sessionId in events; each daemon session owns a dedicated Pi RPC process, so events are routed by runtime ownership (parallel sessions supported).
|
||||||
=======
|
- PI `variant` maps directly to PI RPC `set_thinking_level.level` before prompts are sent.
|
||||||
- Pi runtime mode is capability-selected: default is per-session process isolation, while shared multiplexing is used only for allowlisted Pi capabilities.
|
- PI remains source of truth for thinking-level constraints: unsupported levels (including non-reasoning models and model-specific limits such as `xhigh`) are PI-native clamped or rejected.
|
||||||
- In shared mode, pi-coding-agent events without sessionId are routed using the current-session mapping.
|
|
||||||
>>>>>>> Stashed changes
|
|
||||||
|
|
|
||||||
|
|
@ -10,11 +10,7 @@
|
||||||
"license": {
|
"license": {
|
||||||
"name": "Apache-2.0"
|
"name": "Apache-2.0"
|
||||||
},
|
},
|
||||||
<<<<<<< Updated upstream
|
|
||||||
"version": "0.1.7"
|
"version": "0.1.7"
|
||||||
=======
|
|
||||||
"version": "0.1.6"
|
|
||||||
>>>>>>> Stashed changes
|
|
||||||
},
|
},
|
||||||
"servers": [
|
"servers": [
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -2628,12 +2628,8 @@ pub fn build_opencode_router(state: Arc<OpenCodeAppState>) -> Router {
|
||||||
responses((status = 200)),
|
responses((status = 200)),
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
<<<<<<< Updated upstream
|
|
||||||
async fn oc_agent_list(State(state): State<Arc<OpenCodeAppState>>) -> impl IntoResponse {
|
async fn oc_agent_list(State(state): State<Arc<OpenCodeAppState>>) -> impl IntoResponse {
|
||||||
let name = state.inner.branding.product_name();
|
let name = state.inner.branding.product_name();
|
||||||
=======
|
|
||||||
async fn oc_agent_list(State(_state): State<Arc<OpenCodeAppState>>) -> impl IntoResponse {
|
|
||||||
>>>>>>> Stashed changes
|
|
||||||
let agent = json!({
|
let agent = json!({
|
||||||
"name": name,
|
"name": name,
|
||||||
"description": format!("{name} compatibility layer"),
|
"description": format!("{name} compatibility layer"),
|
||||||
|
|
@ -4278,7 +4274,6 @@ async fn oc_file_list() -> impl IntoResponse {
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_file_content(Query(query): Query<FileContentQuery>) -> impl IntoResponse {
|
async fn oc_file_content(Query(query): Query<FileContentQuery>) -> impl IntoResponse {
|
||||||
let _directory = query.directory.as_deref();
|
|
||||||
if query.path.is_none() {
|
if query.path.is_none() {
|
||||||
return bad_request("path is required").into_response();
|
return bad_request("path is required").into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -4309,7 +4304,6 @@ async fn oc_file_status() -> impl IntoResponse {
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_find_text(Query(query): Query<FindTextQuery>) -> impl IntoResponse {
|
async fn oc_find_text(Query(query): Query<FindTextQuery>) -> impl IntoResponse {
|
||||||
let _directory = query.directory.as_deref();
|
|
||||||
if query.pattern.is_none() {
|
if query.pattern.is_none() {
|
||||||
return bad_request("pattern is required").into_response();
|
return bad_request("pattern is required").into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -4323,7 +4317,6 @@ async fn oc_find_text(Query(query): Query<FindTextQuery>) -> impl IntoResponse {
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_find_files(Query(query): Query<FindFilesQuery>) -> impl IntoResponse {
|
async fn oc_find_files(Query(query): Query<FindFilesQuery>) -> impl IntoResponse {
|
||||||
let _directory = query.directory.as_deref();
|
|
||||||
if query.query.is_none() {
|
if query.query.is_none() {
|
||||||
return bad_request("query is required").into_response();
|
return bad_request("query is required").into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -4337,7 +4330,6 @@ async fn oc_find_files(Query(query): Query<FindFilesQuery>) -> impl IntoResponse
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_find_symbols(Query(query): Query<FindSymbolsQuery>) -> impl IntoResponse {
|
async fn oc_find_symbols(Query(query): Query<FindSymbolsQuery>) -> impl IntoResponse {
|
||||||
let _directory = query.directory.as_deref();
|
|
||||||
if query.query.is_none() {
|
if query.query.is_none() {
|
||||||
return bad_request("query is required").into_response();
|
return bad_request("query is required").into_response();
|
||||||
}
|
}
|
||||||
|
|
@ -4454,7 +4446,6 @@ async fn oc_tool_ids() -> impl IntoResponse {
|
||||||
tag = "opencode"
|
tag = "opencode"
|
||||||
)]
|
)]
|
||||||
async fn oc_tool_list(Query(query): Query<ToolQuery>) -> impl IntoResponse {
|
async fn oc_tool_list(Query(query): Query<ToolQuery>) -> impl IntoResponse {
|
||||||
let _directory = query.directory.as_deref();
|
|
||||||
if query.provider.is_none() || query.model.is_none() {
|
if query.provider.is_none() || query.model.is_none() {
|
||||||
return bad_request("provider and model are required").into_response();
|
return bad_request("provider and model are required").into_response();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load diff
|
|
@ -1,32 +1,6 @@
|
||||||
// Pi RPC integration tests (gated via SANDBOX_TEST_PI + PATH).
|
// Pi RPC integration tests (gated via SANDBOX_TEST_PI + PATH).
|
||||||
include!("../common/http.rs");
|
include!("../common/http.rs");
|
||||||
|
|
||||||
<<<<<<< Updated upstream
|
|
||||||
=======
|
|
||||||
struct EnvVarGuard {
|
|
||||||
key: String,
|
|
||||||
previous: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for EnvVarGuard {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
match &self.previous {
|
|
||||||
Some(value) => std::env::set_var(&self.key, value),
|
|
||||||
None => std::env::remove_var(&self.key),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_env_var(key: &str, value: &str) -> EnvVarGuard {
|
|
||||||
let previous = std::env::var(key).ok();
|
|
||||||
std::env::set_var(key, value);
|
|
||||||
EnvVarGuard {
|
|
||||||
key: key.to_string(),
|
|
||||||
previous,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
>>>>>>> Stashed changes
|
|
||||||
fn pi_test_config() -> Option<TestAgentConfig> {
|
fn pi_test_config() -> Option<TestAgentConfig> {
|
||||||
let configs = match test_agents_from_env() {
|
let configs = match test_agents_from_env() {
|
||||||
Ok(configs) => configs,
|
Ok(configs) => configs,
|
||||||
|
|
@ -39,96 +13,9 @@ fn pi_test_config() -> Option<TestAgentConfig> {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.find(|config| config.agent == AgentId::Pi)
|
.find(|config| config.agent == AgentId::Pi)
|
||||||
}
|
}
|
||||||
<<<<<<< Updated upstream
|
|
||||||
=======
|
|
||||||
|
|
||||||
async fn create_pi_session_checked(app: &Router, session_id: &str) -> Value {
|
|
||||||
let (status, payload) = send_json(
|
|
||||||
app,
|
|
||||||
Method::POST,
|
|
||||||
&format!("/v1/sessions/{session_id}"),
|
|
||||||
Some(json!({
|
|
||||||
"agent": "pi",
|
|
||||||
"permissionMode": test_permission_mode(AgentId::Pi),
|
|
||||||
})),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert_eq!(status, StatusCode::OK, "create pi session {session_id}");
|
|
||||||
payload
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn poll_events_until_assistant_count(
|
|
||||||
app: &Router,
|
|
||||||
session_id: &str,
|
|
||||||
expected_assistant_messages: usize,
|
|
||||||
timeout: Duration,
|
|
||||||
) -> Vec<Value> {
|
|
||||||
let start = Instant::now();
|
|
||||||
let mut offset = 0u64;
|
|
||||||
let mut events = Vec::new();
|
|
||||||
|
|
||||||
while start.elapsed() < timeout {
|
|
||||||
let path = format!("/v1/sessions/{session_id}/events?offset={offset}&limit=200");
|
|
||||||
let (status, payload) = send_json(app, Method::GET, &path, None).await;
|
|
||||||
assert_eq!(status, StatusCode::OK, "poll events");
|
|
||||||
let new_events = payload
|
|
||||||
.get("events")
|
|
||||||
.and_then(Value::as_array)
|
|
||||||
.cloned()
|
|
||||||
.unwrap_or_default();
|
|
||||||
|
|
||||||
if !new_events.is_empty() {
|
|
||||||
if let Some(last) = new_events
|
|
||||||
.last()
|
|
||||||
.and_then(|event| event.get("sequence"))
|
|
||||||
.and_then(Value::as_u64)
|
|
||||||
{
|
|
||||||
offset = last;
|
|
||||||
}
|
|
||||||
events.extend(new_events);
|
|
||||||
}
|
|
||||||
|
|
||||||
if events.iter().any(is_unparsed_event) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let assistant_count = events
|
|
||||||
.iter()
|
|
||||||
.filter(|event| is_assistant_message(event))
|
|
||||||
.count();
|
|
||||||
if assistant_count >= expected_assistant_messages {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if events.iter().any(is_error_event) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(800)).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
events
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
||||||
async fn pi_rpc_session_and_stream() {
|
|
||||||
let Some(config) = pi_test_config() else {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
>>>>>>> Stashed changes
|
|
||||||
|
|
||||||
async fn create_pi_session_with_native(app: &Router, session_id: &str) -> String {
|
async fn create_pi_session_with_native(app: &Router, session_id: &str) -> String {
|
||||||
let (status, payload) = send_json(
|
let payload = create_pi_session(app, session_id, None, None).await;
|
||||||
app,
|
|
||||||
Method::POST,
|
|
||||||
&format!("/v1/sessions/{session_id}"),
|
|
||||||
Some(json!({
|
|
||||||
"agent": "pi",
|
|
||||||
"permissionMode": test_permission_mode(AgentId::Pi),
|
|
||||||
})),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert_eq!(status, StatusCode::OK, "create pi session");
|
|
||||||
let native_session_id = payload
|
let native_session_id = payload
|
||||||
.get("native_session_id")
|
.get("native_session_id")
|
||||||
.and_then(Value::as_str)
|
.and_then(Value::as_str)
|
||||||
|
|
@ -141,6 +28,53 @@ async fn create_pi_session_with_native(app: &Router, session_id: &str) -> String
|
||||||
native_session_id
|
native_session_id
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn create_pi_session(
|
||||||
|
app: &Router,
|
||||||
|
session_id: &str,
|
||||||
|
model: Option<&str>,
|
||||||
|
variant: Option<&str>,
|
||||||
|
) -> Value {
|
||||||
|
let mut body = Map::new();
|
||||||
|
body.insert("agent".to_string(), json!("pi"));
|
||||||
|
body.insert(
|
||||||
|
"permissionMode".to_string(),
|
||||||
|
json!(test_permission_mode(AgentId::Pi)),
|
||||||
|
);
|
||||||
|
if let Some(model) = model {
|
||||||
|
body.insert("model".to_string(), json!(model));
|
||||||
|
}
|
||||||
|
if let Some(variant) = variant {
|
||||||
|
body.insert("variant".to_string(), json!(variant));
|
||||||
|
}
|
||||||
|
let (status, payload) = send_json(
|
||||||
|
app,
|
||||||
|
Method::POST,
|
||||||
|
&format!("/v1/sessions/{session_id}"),
|
||||||
|
Some(Value::Object(body)),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(status, StatusCode::OK, "create pi session");
|
||||||
|
payload
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn fetch_pi_models(app: &Router) -> Vec<Value> {
|
||||||
|
let (status, payload) = send_json(app, Method::GET, "/v1/agents/pi/models", None).await;
|
||||||
|
assert_eq!(status, StatusCode::OK, "pi models endpoint");
|
||||||
|
payload
|
||||||
|
.get("models")
|
||||||
|
.and_then(Value::as_array)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn model_variant_ids(model: &Value) -> Vec<&str> {
|
||||||
|
model
|
||||||
|
.get("variants")
|
||||||
|
.and_then(Value::as_array)
|
||||||
|
.map(|values| values.iter().filter_map(Value::as_str).collect::<Vec<_>>())
|
||||||
|
.unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
fn assert_strictly_increasing_sequences(events: &[Value], label: &str) {
|
fn assert_strictly_increasing_sequences(events: &[Value], label: &str) {
|
||||||
let mut last_sequence = 0u64;
|
let mut last_sequence = 0u64;
|
||||||
for event in events {
|
for event in events {
|
||||||
|
|
@ -156,7 +90,6 @@ fn assert_strictly_increasing_sequences(events: &[Value], label: &str) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
<<<<<<< Updated upstream
|
|
||||||
fn assert_all_events_for_session(events: &[Value], session_id: &str) {
|
fn assert_all_events_for_session(events: &[Value], session_id: &str) {
|
||||||
for event in events {
|
for event in events {
|
||||||
let event_session_id = event
|
let event_session_id = event
|
||||||
|
|
@ -197,11 +130,6 @@ fn assert_item_started_ids_unique(events: &[Value], label: &str) {
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn pi_rpc_session_and_stream() {
|
async fn pi_rpc_session_and_stream() {
|
||||||
=======
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
|
||||||
async fn pi_rpc_multi_session_create_per_session_mode() {
|
|
||||||
let _mode_guard = set_env_var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE", "per-session");
|
|
||||||
>>>>>>> Stashed changes
|
|
||||||
let Some(config) = pi_test_config() else {
|
let Some(config) = pi_test_config() else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
@ -210,7 +138,6 @@ async fn pi_rpc_multi_session_create_per_session_mode() {
|
||||||
let _guard = apply_credentials(&config.credentials);
|
let _guard = apply_credentials(&config.credentials);
|
||||||
install_agent(&app.app, config.agent).await;
|
install_agent(&app.app, config.agent).await;
|
||||||
|
|
||||||
<<<<<<< Updated upstream
|
|
||||||
let session_id = "pi-rpc-session";
|
let session_id = "pi-rpc-session";
|
||||||
let _native_session_id = create_pi_session_with_native(&app.app, session_id).await;
|
let _native_session_id = create_pi_session_with_native(&app.app, session_id).await;
|
||||||
|
|
||||||
|
|
@ -228,34 +155,90 @@ async fn pi_rpc_multi_session_create_per_session_mode() {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
async fn pi_parallel_sessions_turns() {
|
async fn pi_variant_high_applies_for_thinking_model() {
|
||||||
=======
|
let Some(config) = pi_test_config() else {
|
||||||
let first = create_pi_session_checked(&app.app, "pi-multi-a").await;
|
return;
|
||||||
let second = create_pi_session_checked(&app.app, "pi-multi-b").await;
|
};
|
||||||
|
|
||||||
let first_native = first
|
let app = TestApp::new();
|
||||||
.get("native_session_id")
|
let _guard = apply_credentials(&config.credentials);
|
||||||
.and_then(Value::as_str)
|
install_agent(&app.app, config.agent).await;
|
||||||
.unwrap_or("");
|
|
||||||
let second_native = second
|
let models = fetch_pi_models(&app.app).await;
|
||||||
.get("native_session_id")
|
let thinking_model = models.iter().find_map(|model| {
|
||||||
.and_then(Value::as_str)
|
let model_id = model.get("id").and_then(Value::as_str)?;
|
||||||
.unwrap_or("");
|
let variants = model_variant_ids(model);
|
||||||
assert!(!first_native.is_empty(), "first native session id missing");
|
if variants.contains(&"high") {
|
||||||
assert!(
|
Some(model_id.to_string())
|
||||||
!second_native.is_empty(),
|
} else {
|
||||||
"second native session id missing"
|
None
|
||||||
);
|
}
|
||||||
assert_ne!(
|
});
|
||||||
first_native, second_native,
|
let Some(model_id) = thinking_model else {
|
||||||
"per-session mode should allocate independent native session ids"
|
eprintln!("Skipping PI variant thinking-model test: no model advertises high");
|
||||||
);
|
return;
|
||||||
}
|
};
|
||||||
|
|
||||||
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
let session_id = "pi-variant-thinking-high";
|
||||||
async fn pi_rpc_per_session_queue_and_termination_isolation() {
|
create_pi_session(&app.app, session_id, Some(&model_id), Some("high")).await;
|
||||||
let _mode_guard = set_env_var("SANDBOX_AGENT_PI_FORCE_RUNTIME_MODE", "per-session");
|
|
||||||
>>>>>>> Stashed changes
|
let events = read_turn_stream_events(&app.app, session_id, Duration::from_secs(120)).await;
|
||||||
|
assert!(!events.is_empty(), "no events from pi thinking-variant stream");
|
||||||
|
assert!(
|
||||||
|
!events.iter().any(is_unparsed_event),
|
||||||
|
"agent.unparsed event encountered for thinking-variant session"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
should_stop(&events),
|
||||||
|
"thinking-variant turn stream did not reach a terminal event"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn pi_variant_high_on_non_thinking_model_uses_pi_native_clamping() {
|
||||||
|
let Some(config) = pi_test_config() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let app = TestApp::new();
|
||||||
|
let _guard = apply_credentials(&config.credentials);
|
||||||
|
install_agent(&app.app, config.agent).await;
|
||||||
|
|
||||||
|
let models = fetch_pi_models(&app.app).await;
|
||||||
|
let non_thinking_model = models.iter().find_map(|model| {
|
||||||
|
let model_id = model.get("id").and_then(Value::as_str)?;
|
||||||
|
let variants = model_variant_ids(model);
|
||||||
|
if variants == vec!["off"] {
|
||||||
|
Some(model_id.to_string())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let Some(model_id) = non_thinking_model else {
|
||||||
|
eprintln!("Skipping PI non-thinking variant test: no off-only model reported");
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let session_id = "pi-variant-nonthinking-high";
|
||||||
|
create_pi_session(&app.app, session_id, Some(&model_id), Some("high")).await;
|
||||||
|
|
||||||
|
let events = read_turn_stream_events(&app.app, session_id, Duration::from_secs(120)).await;
|
||||||
|
assert!(
|
||||||
|
!events.is_empty(),
|
||||||
|
"no events from pi non-thinking variant stream"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!events.iter().any(is_unparsed_event),
|
||||||
|
"agent.unparsed event encountered for non-thinking variant session"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
should_stop(&events),
|
||||||
|
"non-thinking variant turn stream did not reach a terminal event"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn pi_parallel_sessions_turns() {
|
||||||
let Some(config) = pi_test_config() else {
|
let Some(config) = pi_test_config() else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
|
|
@ -264,7 +247,6 @@ async fn pi_rpc_per_session_queue_and_termination_isolation() {
|
||||||
let _guard = apply_credentials(&config.credentials);
|
let _guard = apply_credentials(&config.credentials);
|
||||||
install_agent(&app.app, config.agent).await;
|
install_agent(&app.app, config.agent).await;
|
||||||
|
|
||||||
<<<<<<< Updated upstream
|
|
||||||
let session_a = "pi-parallel-a";
|
let session_a = "pi-parallel-a";
|
||||||
let session_b = "pi-parallel-b";
|
let session_b = "pi-parallel-b";
|
||||||
create_pi_session_with_native(&app.app, session_a).await;
|
create_pi_session_with_native(&app.app, session_a).await;
|
||||||
|
|
@ -423,105 +405,3 @@ async fn pi_runtime_restart_scope() {
|
||||||
);
|
);
|
||||||
assert_all_events_for_session(&events_b, session_b);
|
assert_all_events_for_session(&events_b, session_b);
|
||||||
}
|
}
|
||||||
=======
|
|
||||||
create_pi_session_checked(&app.app, "pi-queue-a").await;
|
|
||||||
create_pi_session_checked(&app.app, "pi-queue-b").await;
|
|
||||||
|
|
||||||
let status = send_status(
|
|
||||||
&app.app,
|
|
||||||
Method::POST,
|
|
||||||
"/v1/sessions/pi-queue-a/messages",
|
|
||||||
Some(json!({ "message": "Reply with exactly FIRST." })),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert_eq!(status, StatusCode::NO_CONTENT, "send first prompt");
|
|
||||||
|
|
||||||
let status = send_status(
|
|
||||||
&app.app,
|
|
||||||
Method::POST,
|
|
||||||
"/v1/sessions/pi-queue-a/messages",
|
|
||||||
Some(json!({ "message": "Reply with exactly SECOND." })),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert_eq!(status, StatusCode::NO_CONTENT, "enqueue second prompt");
|
|
||||||
|
|
||||||
let status = send_status(
|
|
||||||
&app.app,
|
|
||||||
Method::POST,
|
|
||||||
"/v1/sessions/pi-queue-b/messages",
|
|
||||||
Some(json!({ "message": PROMPT })),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert_eq!(
|
|
||||||
status,
|
|
||||||
StatusCode::NO_CONTENT,
|
|
||||||
"send prompt to sibling session"
|
|
||||||
);
|
|
||||||
|
|
||||||
let events_a =
|
|
||||||
poll_events_until_assistant_count(&app.app, "pi-queue-a", 2, Duration::from_secs(240))
|
|
||||||
.await;
|
|
||||||
let events_b =
|
|
||||||
poll_events_until_assistant_count(&app.app, "pi-queue-b", 1, Duration::from_secs(180))
|
|
||||||
.await;
|
|
||||||
|
|
||||||
assert!(
|
|
||||||
!events_a.iter().any(is_unparsed_event),
|
|
||||||
"session a emitted agent.unparsed"
|
|
||||||
);
|
|
||||||
assert!(
|
|
||||||
!events_b.iter().any(is_unparsed_event),
|
|
||||||
"session b emitted agent.unparsed"
|
|
||||||
);
|
|
||||||
let assistant_count_a = events_a
|
|
||||||
.iter()
|
|
||||||
.filter(|event| is_assistant_message(event))
|
|
||||||
.count();
|
|
||||||
let assistant_count_b = events_b
|
|
||||||
.iter()
|
|
||||||
.filter(|event| is_assistant_message(event))
|
|
||||||
.count();
|
|
||||||
assert!(
|
|
||||||
assistant_count_a >= 2,
|
|
||||||
"expected at least two assistant completions for queued session, got {assistant_count_a}"
|
|
||||||
);
|
|
||||||
assert!(
|
|
||||||
assistant_count_b >= 1,
|
|
||||||
"expected assistant completion for sibling session, got {assistant_count_b}"
|
|
||||||
);
|
|
||||||
|
|
||||||
let status = send_status(
|
|
||||||
&app.app,
|
|
||||||
Method::POST,
|
|
||||||
"/v1/sessions/pi-queue-a/terminate",
|
|
||||||
Some(json!({})),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert_eq!(status, StatusCode::NO_CONTENT, "terminate first session");
|
|
||||||
|
|
||||||
let status = send_status(
|
|
||||||
&app.app,
|
|
||||||
Method::POST,
|
|
||||||
"/v1/sessions/pi-queue-b/messages",
|
|
||||||
Some(json!({ "message": PROMPT })),
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert_eq!(
|
|
||||||
status,
|
|
||||||
StatusCode::NO_CONTENT,
|
|
||||||
"sibling session should continue after termination"
|
|
||||||
);
|
|
||||||
|
|
||||||
let events_b_after =
|
|
||||||
poll_events_until_assistant_count(&app.app, "pi-queue-b", 2, Duration::from_secs(180))
|
|
||||||
.await;
|
|
||||||
let assistant_count_b_after = events_b_after
|
|
||||||
.iter()
|
|
||||||
.filter(|event| is_assistant_message(event))
|
|
||||||
.count();
|
|
||||||
assert!(
|
|
||||||
assistant_count_b_after >= 2,
|
|
||||||
"expected additional assistant completion for sibling session after termination"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
>>>>>>> Stashed changes
|
|
||||||
|
|
|
||||||
|
|
@ -186,3 +186,76 @@ async fn agent_endpoints_snapshots() {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn pi_test_config() -> Option<TestAgentConfig> {
|
||||||
|
let configs = match test_agents_from_env() {
|
||||||
|
Ok(configs) => configs,
|
||||||
|
Err(err) => {
|
||||||
|
eprintln!("Skipping PI endpoint variant test: {err}");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
configs
|
||||||
|
.into_iter()
|
||||||
|
.find(|config| config.agent == AgentId::Pi)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
|
||||||
|
async fn pi_capabilities_and_models_expose_variants() {
|
||||||
|
let Some(config) = pi_test_config() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let app = TestApp::new();
|
||||||
|
let _guard = apply_credentials(&config.credentials);
|
||||||
|
install_agent(&app.app, AgentId::Pi).await;
|
||||||
|
|
||||||
|
let capabilities = fetch_capabilities(&app.app).await;
|
||||||
|
let pi_caps = capabilities.get("pi").expect("pi capabilities");
|
||||||
|
assert!(pi_caps.variants, "pi capabilities should enable variants");
|
||||||
|
|
||||||
|
let (status, payload) = send_json(&app.app, Method::GET, "/v1/agents/pi/models", None).await;
|
||||||
|
assert_eq!(status, StatusCode::OK, "pi models endpoint");
|
||||||
|
let models = payload
|
||||||
|
.get("models")
|
||||||
|
.and_then(Value::as_array)
|
||||||
|
.cloned()
|
||||||
|
.unwrap_or_default();
|
||||||
|
assert!(!models.is_empty(), "pi models should not be empty");
|
||||||
|
|
||||||
|
let full_levels = vec!["off", "minimal", "low", "medium", "high", "xhigh"];
|
||||||
|
for model in models {
|
||||||
|
let model_id = model
|
||||||
|
.get("id")
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.unwrap_or("<unknown>");
|
||||||
|
let variants = model
|
||||||
|
.get("variants")
|
||||||
|
.and_then(Value::as_array)
|
||||||
|
.expect("pi model variants");
|
||||||
|
let default_variant = model
|
||||||
|
.get("defaultVariant")
|
||||||
|
.and_then(Value::as_str)
|
||||||
|
.expect("pi model defaultVariant");
|
||||||
|
let variant_ids = variants
|
||||||
|
.iter()
|
||||||
|
.filter_map(Value::as_str)
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
assert!(!variant_ids.is_empty(), "pi model {model_id} has no variants");
|
||||||
|
if variant_ids == vec!["off"] {
|
||||||
|
assert_eq!(
|
||||||
|
default_variant, "off",
|
||||||
|
"pi model {model_id} expected default off for non-thinking model"
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
assert_eq!(
|
||||||
|
variant_ids, full_levels,
|
||||||
|
"pi model {model_id} expected full thinking levels"
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
default_variant, "medium",
|
||||||
|
"pi model {model_id} expected medium default for thinking model"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue