mirror of
https://github.com/harivansh-afk/sandbox-agent.git
synced 2026-04-20 06:04:23 +00:00
feat: acp http adapter
This commit is contained in:
parent
2ba630c180
commit
b4c8564cb2
217 changed files with 18785 additions and 17400 deletions
24
server/packages/acp-http-adapter/Cargo.toml
Normal file
24
server/packages/acp-http-adapter/Cargo.toml
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
[package]
|
||||
name = "acp-http-adapter"
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
authors.workspace = true
|
||||
license.workspace = true
|
||||
repository.workspace = true
|
||||
description = "Minimal ACP HTTP-to-stdio adapter"
|
||||
|
||||
[dependencies]
|
||||
axum.workspace = true
|
||||
tokio = { workspace = true, features = ["process", "io-util"] }
|
||||
tokio-stream.workspace = true
|
||||
futures.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
clap.workspace = true
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
reqwest.workspace = true
|
||||
bytes = "1.10"
|
||||
47
server/packages/acp-http-adapter/README.md
Normal file
47
server/packages/acp-http-adapter/README.md
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
# acp-http-adapter
|
||||
|
||||
Minimal ACP HTTP to stdio proxy.
|
||||
|
||||
## Endpoints
|
||||
|
||||
- `GET /v1/health`
|
||||
- `POST /v1/rpc`
|
||||
- `GET /v1/rpc` (SSE)
|
||||
- `DELETE /v1/rpc`
|
||||
|
||||
## Stdio framing
|
||||
|
||||
Uses ACP stdio framing from ACP docs:
|
||||
- UTF-8 JSON-RPC messages
|
||||
- one message per line
|
||||
- newline-delimited (`\n`)
|
||||
- no embedded newlines in messages
|
||||
|
||||
## Run
|
||||
|
||||
```bash
|
||||
cargo run -p acp-http-adapter -- \
|
||||
--host 127.0.0.1 \
|
||||
--port 7591 \
|
||||
--registry-json '{"distribution":{"npx":{"package":"@zed-industries/codex-acp"}}}'
|
||||
```
|
||||
|
||||
`--registry-json` accepts:
|
||||
- full registry document (`{"agents":[...]}`) with `--registry-agent-id`
|
||||
- single registry entry (`{"id":"...","distribution":...}`)
|
||||
- direct distribution object (`{"npx":...}` or `{"binary":...}`)
|
||||
|
||||
## Library
|
||||
|
||||
```rust
|
||||
use std::time::Duration;
|
||||
use acp_http_adapter::{run_server, ServerConfig};
|
||||
|
||||
run_server(ServerConfig {
|
||||
host: "127.0.0.1".to_string(),
|
||||
port: 7591,
|
||||
registry_json: r#"{"distribution":{"npx":{"package":"@zed-industries/codex-acp"}}}"#.to_string(),
|
||||
registry_agent_id: None,
|
||||
rpc_timeout: Duration::from_secs(120),
|
||||
}).await?;
|
||||
```
|
||||
132
server/packages/acp-http-adapter/src/app.rs
Normal file
132
server/packages/acp-http-adapter/src/app.rs
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::extract::State;
|
||||
use axum::http::{header, HeaderMap, StatusCode};
|
||||
use axum::response::sse::KeepAlive;
|
||||
use axum::response::{IntoResponse, Response, Sse};
|
||||
use axum::routing::{get, post};
|
||||
use axum::{Json, Router};
|
||||
use serde::Serialize;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use crate::process::{AdapterError, AdapterRuntime, PostOutcome};
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct HealthResponse {
|
||||
ok: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct Problem {
|
||||
r#type: &'static str,
|
||||
title: &'static str,
|
||||
status: u16,
|
||||
detail: String,
|
||||
}
|
||||
|
||||
pub fn build_router(runtime: Arc<AdapterRuntime>) -> Router {
|
||||
Router::new()
|
||||
.route("/v1/health", get(get_health))
|
||||
.route("/v1/rpc", post(post_rpc).get(get_rpc).delete(delete_rpc))
|
||||
.with_state(runtime)
|
||||
}
|
||||
|
||||
async fn get_health() -> Json<HealthResponse> {
|
||||
Json(HealthResponse { ok: true })
|
||||
}
|
||||
|
||||
async fn post_rpc(
|
||||
State(runtime): State<Arc<AdapterRuntime>>,
|
||||
headers: HeaderMap,
|
||||
Json(payload): Json<Value>,
|
||||
) -> Response {
|
||||
if !is_json_content_type(&headers) {
|
||||
return problem(
|
||||
StatusCode::UNSUPPORTED_MEDIA_TYPE,
|
||||
"unsupported_media_type",
|
||||
"content-type must be application/json",
|
||||
);
|
||||
}
|
||||
|
||||
match runtime.post(payload).await {
|
||||
Ok(PostOutcome::Response(value)) => (StatusCode::OK, Json(value)).into_response(),
|
||||
Ok(PostOutcome::Accepted) => StatusCode::ACCEPTED.into_response(),
|
||||
Err(err) => map_error(err),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_rpc(
|
||||
State(runtime): State<Arc<AdapterRuntime>>,
|
||||
headers: HeaderMap,
|
||||
) -> impl IntoResponse {
|
||||
let last_event_id = headers
|
||||
.get("last-event-id")
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.and_then(|value| value.parse::<u64>().ok());
|
||||
|
||||
let stream = runtime.clone().sse_stream(last_event_id).await;
|
||||
Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)))
|
||||
}
|
||||
|
||||
async fn delete_rpc() -> StatusCode {
|
||||
StatusCode::NO_CONTENT
|
||||
}
|
||||
|
||||
fn is_json_content_type(headers: &HeaderMap) -> bool {
|
||||
headers
|
||||
.get(header::CONTENT_TYPE)
|
||||
.and_then(|value| value.to_str().ok())
|
||||
.map(|value| value.starts_with("application/json"))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn map_error(err: AdapterError) -> Response {
|
||||
match err {
|
||||
AdapterError::InvalidEnvelope => problem(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"invalid_envelope",
|
||||
"request body must be a JSON-RPC object",
|
||||
),
|
||||
AdapterError::Timeout => problem(
|
||||
StatusCode::GATEWAY_TIMEOUT,
|
||||
"timeout",
|
||||
"timed out waiting for agent response",
|
||||
),
|
||||
AdapterError::Write(write) => problem(
|
||||
StatusCode::BAD_GATEWAY,
|
||||
"write_failed",
|
||||
&format!("failed writing to agent stdin: {write}"),
|
||||
),
|
||||
AdapterError::Serialize(ser) => problem(
|
||||
StatusCode::BAD_REQUEST,
|
||||
"serialize_failed",
|
||||
&format!("failed to serialize JSON payload: {ser}"),
|
||||
),
|
||||
AdapterError::Spawn(spawn) => problem(
|
||||
StatusCode::BAD_GATEWAY,
|
||||
"spawn_failed",
|
||||
&format!("failed to start agent process: {spawn}"),
|
||||
),
|
||||
AdapterError::MissingStdin | AdapterError::MissingStdout | AdapterError::MissingStderr => {
|
||||
problem(
|
||||
StatusCode::BAD_GATEWAY,
|
||||
"io_setup_failed",
|
||||
"agent subprocess pipes were not available",
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn problem(status: StatusCode, title: &'static str, detail: &str) -> Response {
|
||||
(
|
||||
status,
|
||||
Json(json!(Problem {
|
||||
r#type: "about:blank",
|
||||
title,
|
||||
status: status.as_u16(),
|
||||
detail: detail.to_string(),
|
||||
})),
|
||||
)
|
||||
.into_response()
|
||||
}
|
||||
50
server/packages/acp-http-adapter/src/lib.rs
Normal file
50
server/packages/acp-http-adapter/src/lib.rs
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use app::build_router;
|
||||
use process::AdapterRuntime;
|
||||
use registry::LaunchSpec;
|
||||
|
||||
pub mod app;
|
||||
pub mod process;
|
||||
pub mod registry;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ServerConfig {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
pub registry_json: String,
|
||||
pub registry_agent_id: Option<String>,
|
||||
pub rpc_timeout: Duration,
|
||||
}
|
||||
|
||||
pub async fn run_server(
|
||||
config: ServerConfig,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let launch =
|
||||
LaunchSpec::from_registry_blob(&config.registry_json, config.registry_agent_id.as_deref())?;
|
||||
let runtime = Arc::new(AdapterRuntime::start(launch, config.rpc_timeout).await?);
|
||||
run_server_with_runtime(config.host, config.port, runtime).await
|
||||
}
|
||||
|
||||
pub async fn run_server_with_runtime(
|
||||
host: String,
|
||||
port: u16,
|
||||
runtime: Arc<AdapterRuntime>,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let app = build_router(runtime.clone());
|
||||
let addr: SocketAddr = format!("{host}:{port}").parse()?;
|
||||
let listener = tokio::net::TcpListener::bind(addr).await?;
|
||||
tracing::info!(addr = %addr, "acp-http-adapter listening");
|
||||
|
||||
axum::serve(listener, app)
|
||||
.with_graceful_shutdown(shutdown_signal(runtime))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn shutdown_signal(runtime: Arc<AdapterRuntime>) {
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
runtime.shutdown().await;
|
||||
}
|
||||
55
server/packages/acp-http-adapter/src/main.rs
Normal file
55
server/packages/acp-http-adapter/src/main.rs
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use acp_http_adapter::{run_server, ServerConfig};
|
||||
use clap::Parser;
|
||||
|
||||
#[derive(Debug, Parser)]
|
||||
#[command(name = "acp-http-adapter")]
|
||||
#[command(about = "Minimal ACP HTTP->stdio adapter", version)]
|
||||
struct Cli {
|
||||
#[arg(long, default_value = "127.0.0.1")]
|
||||
host: String,
|
||||
|
||||
#[arg(long, default_value_t = 7591)]
|
||||
port: u16,
|
||||
|
||||
#[arg(long)]
|
||||
registry_json: String,
|
||||
|
||||
#[arg(long)]
|
||||
registry_agent_id: Option<String>,
|
||||
|
||||
#[arg(long)]
|
||||
rpc_timeout_ms: Option<u64>,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
if let Err(err) = run().await {
|
||||
tracing::error!(error = %err, "acp-http-adapter failed");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
||||
)
|
||||
.compact()
|
||||
.init();
|
||||
|
||||
let cli = Cli::parse();
|
||||
run_server(ServerConfig {
|
||||
host: cli.host,
|
||||
port: cli.port,
|
||||
registry_json: cli.registry_json,
|
||||
registry_agent_id: cli.registry_agent_id,
|
||||
rpc_timeout: cli
|
||||
.rpc_timeout_ms
|
||||
.map(Duration::from_millis)
|
||||
.unwrap_or_else(|| Duration::from_secs(120)),
|
||||
})
|
||||
.await
|
||||
}
|
||||
571
server/packages/acp-http-adapter/src/process.rs
Normal file
571
server/packages/acp-http-adapter/src/process.rs
Normal file
|
|
@ -0,0 +1,571 @@
|
|||
use std::collections::{HashMap, VecDeque};
|
||||
use std::convert::Infallible;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use axum::response::sse::Event;
|
||||
use futures::{stream, Stream, StreamExt};
|
||||
use serde_json::{json, Value};
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::process::{Child, ChildStdin, Command};
|
||||
use tokio::sync::{broadcast, oneshot, Mutex};
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
|
||||
use crate::registry::LaunchSpec;
|
||||
|
||||
const RING_BUFFER_SIZE: usize = 1024;
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum AdapterError {
|
||||
#[error("failed to spawn subprocess: {0}")]
|
||||
Spawn(std::io::Error),
|
||||
#[error("failed to capture subprocess stdin")]
|
||||
MissingStdin,
|
||||
#[error("failed to capture subprocess stdout")]
|
||||
MissingStdout,
|
||||
#[error("failed to capture subprocess stderr")]
|
||||
MissingStderr,
|
||||
#[error("invalid json-rpc envelope")]
|
||||
InvalidEnvelope,
|
||||
#[error("failed to serialize json-rpc message: {0}")]
|
||||
Serialize(serde_json::Error),
|
||||
#[error("failed to write subprocess stdin: {0}")]
|
||||
Write(std::io::Error),
|
||||
#[error("timeout waiting for response")]
|
||||
Timeout,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum PostOutcome {
|
||||
Response(Value),
|
||||
Accepted,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct StreamMessage {
|
||||
sequence: u64,
|
||||
payload: Value,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AdapterRuntime {
|
||||
stdin: Arc<Mutex<ChildStdin>>,
|
||||
child: Arc<Mutex<Child>>,
|
||||
pending: Arc<Mutex<HashMap<String, oneshot::Sender<Value>>>>,
|
||||
sender: broadcast::Sender<StreamMessage>,
|
||||
ring: Arc<Mutex<VecDeque<StreamMessage>>>,
|
||||
sequence: Arc<AtomicU64>,
|
||||
request_timeout: Duration,
|
||||
shutting_down: AtomicBool,
|
||||
spawned_at: Instant,
|
||||
first_stdout: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl AdapterRuntime {
|
||||
pub async fn start(
|
||||
launch: LaunchSpec,
|
||||
request_timeout: Duration,
|
||||
) -> Result<Self, AdapterError> {
|
||||
let spawn_start = Instant::now();
|
||||
|
||||
let mut command = Command::new(&launch.program);
|
||||
command
|
||||
.args(&launch.args)
|
||||
.stdin(std::process::Stdio::piped())
|
||||
.stdout(std::process::Stdio::piped())
|
||||
.stderr(std::process::Stdio::piped());
|
||||
|
||||
for (key, value) in &launch.env {
|
||||
command.env(key, value);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
program = ?launch.program,
|
||||
args = ?launch.args,
|
||||
"spawning agent process"
|
||||
);
|
||||
|
||||
let mut child = command.spawn().map_err(|err| {
|
||||
tracing::error!(
|
||||
program = ?launch.program,
|
||||
error = %err,
|
||||
"failed to spawn agent process"
|
||||
);
|
||||
AdapterError::Spawn(err)
|
||||
})?;
|
||||
|
||||
let pid = child.id().unwrap_or(0);
|
||||
let spawn_elapsed = spawn_start.elapsed();
|
||||
tracing::info!(
|
||||
pid = pid,
|
||||
elapsed_ms = spawn_elapsed.as_millis() as u64,
|
||||
"agent process spawned"
|
||||
);
|
||||
|
||||
let stdin = child.stdin.take().ok_or(AdapterError::MissingStdin)?;
|
||||
let stdout = child.stdout.take().ok_or(AdapterError::MissingStdout)?;
|
||||
let stderr = child.stderr.take().ok_or(AdapterError::MissingStderr)?;
|
||||
|
||||
let (sender, _rx) = broadcast::channel(512);
|
||||
let runtime = Self {
|
||||
stdin: Arc::new(Mutex::new(stdin)),
|
||||
child: Arc::new(Mutex::new(child)),
|
||||
pending: Arc::new(Mutex::new(HashMap::new())),
|
||||
sender,
|
||||
ring: Arc::new(Mutex::new(VecDeque::with_capacity(RING_BUFFER_SIZE))),
|
||||
sequence: Arc::new(AtomicU64::new(0)),
|
||||
request_timeout,
|
||||
shutting_down: AtomicBool::new(false),
|
||||
spawned_at: spawn_start,
|
||||
first_stdout: Arc::new(AtomicBool::new(false)),
|
||||
};
|
||||
|
||||
runtime.spawn_stdout_loop(stdout);
|
||||
runtime.spawn_stderr_loop(stderr);
|
||||
runtime.spawn_exit_watcher();
|
||||
|
||||
Ok(runtime)
|
||||
}
|
||||
|
||||
pub async fn post(&self, payload: Value) -> Result<PostOutcome, AdapterError> {
|
||||
if !payload.is_object() {
|
||||
return Err(AdapterError::InvalidEnvelope);
|
||||
}
|
||||
|
||||
let method: String = payload
|
||||
.get("method")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("<none>")
|
||||
.to_string();
|
||||
let has_method = payload.get("method").is_some();
|
||||
let id = payload.get("id");
|
||||
|
||||
if has_method && id.is_some() {
|
||||
let id_value = id.expect("checked");
|
||||
let key = id_key(id_value);
|
||||
let (tx, rx) = oneshot::channel();
|
||||
|
||||
let pending_count = self.pending.lock().await.len();
|
||||
tracing::info!(
|
||||
method = %method,
|
||||
id = %key,
|
||||
pending_count = pending_count,
|
||||
"post: request → agent (awaiting response)"
|
||||
);
|
||||
|
||||
self.pending.lock().await.insert(key.clone(), tx);
|
||||
|
||||
let write_start = Instant::now();
|
||||
if let Err(err) = self.send_to_subprocess(&payload).await {
|
||||
tracing::error!(
|
||||
method = %method,
|
||||
id = %key,
|
||||
error = %err,
|
||||
"post: failed to write to agent stdin"
|
||||
);
|
||||
self.pending.lock().await.remove(&key);
|
||||
return Err(err);
|
||||
}
|
||||
let write_ms = write_start.elapsed().as_millis() as u64;
|
||||
tracing::debug!(
|
||||
method = %method,
|
||||
id = %key,
|
||||
write_ms = write_ms,
|
||||
"post: stdin write complete, waiting for response"
|
||||
);
|
||||
|
||||
let wait_start = Instant::now();
|
||||
match tokio::time::timeout(self.request_timeout, rx).await {
|
||||
Ok(Ok(response)) => {
|
||||
let wait_ms = wait_start.elapsed().as_millis() as u64;
|
||||
tracing::info!(
|
||||
method = %method,
|
||||
id = %key,
|
||||
response_ms = wait_ms,
|
||||
total_ms = write_ms + wait_ms,
|
||||
"post: got response from agent"
|
||||
);
|
||||
Ok(PostOutcome::Response(response))
|
||||
}
|
||||
Ok(Err(_)) => {
|
||||
let wait_ms = wait_start.elapsed().as_millis() as u64;
|
||||
tracing::error!(
|
||||
method = %method,
|
||||
id = %key,
|
||||
wait_ms = wait_ms,
|
||||
"post: response channel dropped (agent process may have exited)"
|
||||
);
|
||||
self.pending.lock().await.remove(&key);
|
||||
Err(AdapterError::Timeout)
|
||||
}
|
||||
Err(_) => {
|
||||
let pending_keys: Vec<String> =
|
||||
self.pending.lock().await.keys().cloned().collect();
|
||||
tracing::error!(
|
||||
method = %method,
|
||||
id = %key,
|
||||
timeout_ms = self.request_timeout.as_millis() as u64,
|
||||
age_ms = self.spawned_at.elapsed().as_millis() as u64,
|
||||
pending_keys = ?pending_keys,
|
||||
first_stdout_seen = self.first_stdout.load(Ordering::Relaxed),
|
||||
"post: TIMEOUT waiting for agent response"
|
||||
);
|
||||
self.pending.lock().await.remove(&key);
|
||||
Err(AdapterError::Timeout)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
tracing::debug!(
|
||||
method = %method,
|
||||
"post: notification → agent (fire-and-forget)"
|
||||
);
|
||||
self.send_to_subprocess(&payload).await?;
|
||||
Ok(PostOutcome::Accepted)
|
||||
}
|
||||
}
|
||||
|
||||
async fn subscribe(
|
||||
&self,
|
||||
last_event_id: Option<u64>,
|
||||
) -> (Vec<(u64, Value)>, broadcast::Receiver<StreamMessage>) {
|
||||
let replay = {
|
||||
let ring = self.ring.lock().await;
|
||||
ring.iter()
|
||||
.filter(|message| {
|
||||
if let Some(last_event_id) = last_event_id {
|
||||
message.sequence > last_event_id
|
||||
} else {
|
||||
true
|
||||
}
|
||||
})
|
||||
.map(|message| (message.sequence, message.payload.clone()))
|
||||
.collect::<Vec<_>>()
|
||||
};
|
||||
(replay, self.sender.subscribe())
|
||||
}
|
||||
|
||||
pub async fn sse_stream(
|
||||
self: Arc<Self>,
|
||||
last_event_id: Option<u64>,
|
||||
) -> impl Stream<Item = Result<Event, Infallible>> + Send + 'static {
|
||||
let (replay, rx) = self.subscribe(last_event_id).await;
|
||||
let replay_stream = stream::iter(replay.into_iter().map(|(sequence, payload)| {
|
||||
let event = Event::default()
|
||||
.event("message")
|
||||
.id(sequence.to_string())
|
||||
.data(payload.to_string());
|
||||
Ok(event)
|
||||
}));
|
||||
|
||||
let live_stream = BroadcastStream::new(rx).filter_map(|item| async move {
|
||||
match item {
|
||||
Ok(message) => {
|
||||
let event = Event::default()
|
||||
.event("message")
|
||||
.id(message.sequence.to_string())
|
||||
.data(message.payload.to_string());
|
||||
Some(Ok(event))
|
||||
}
|
||||
Err(_) => None,
|
||||
}
|
||||
});
|
||||
|
||||
replay_stream.chain(live_stream)
|
||||
}
|
||||
|
||||
/// Stream of raw JSON-RPC `Value` payloads (without SSE framing).
|
||||
/// Useful for consumers that need to inspect the payload contents
|
||||
/// rather than forward them as SSE events.
|
||||
pub async fn value_stream(
|
||||
self: Arc<Self>,
|
||||
last_event_id: Option<u64>,
|
||||
) -> impl Stream<Item = Value> + Send + 'static {
|
||||
let (replay, rx) = self.subscribe(last_event_id).await;
|
||||
let replay_stream =
|
||||
stream::iter(replay.into_iter().map(|(_sequence, payload)| payload));
|
||||
let live_stream = BroadcastStream::new(rx).filter_map(|item| async move {
|
||||
match item {
|
||||
Ok(message) => Some(message.payload),
|
||||
Err(_) => None,
|
||||
}
|
||||
});
|
||||
replay_stream.chain(live_stream)
|
||||
}
|
||||
|
||||
pub async fn shutdown(&self) {
|
||||
if self.shutting_down.swap(true, Ordering::SeqCst) {
|
||||
return;
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
age_ms = self.spawned_at.elapsed().as_millis() as u64,
|
||||
"shutting down agent process"
|
||||
);
|
||||
|
||||
self.pending.lock().await.clear();
|
||||
let mut child = self.child.lock().await;
|
||||
match child.try_wait() {
|
||||
Ok(Some(_)) => {}
|
||||
Ok(None) => {
|
||||
let _ = child.kill().await;
|
||||
let _ = child.wait().await;
|
||||
}
|
||||
Err(_) => {
|
||||
let _ = child.kill().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_stdout_loop(&self, stdout: tokio::process::ChildStdout) {
|
||||
let pending = self.pending.clone();
|
||||
let sender = self.sender.clone();
|
||||
let ring = self.ring.clone();
|
||||
let sequence = self.sequence.clone();
|
||||
let spawned_at = self.spawned_at;
|
||||
let first_stdout = self.first_stdout.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(stdout).lines();
|
||||
let mut line_count: u64 = 0;
|
||||
|
||||
while let Ok(Some(line)) = lines.next_line().await {
|
||||
let trimmed = line.trim();
|
||||
if trimmed.is_empty() {
|
||||
continue;
|
||||
}
|
||||
|
||||
line_count += 1;
|
||||
|
||||
if !first_stdout.swap(true, Ordering::Relaxed) {
|
||||
tracing::info!(
|
||||
first_stdout_ms = spawned_at.elapsed().as_millis() as u64,
|
||||
line_bytes = trimmed.len(),
|
||||
"agent process: first stdout line received"
|
||||
);
|
||||
}
|
||||
|
||||
let payload = match serde_json::from_str::<Value>(trimmed) {
|
||||
Ok(payload) => payload,
|
||||
Err(err) => {
|
||||
tracing::warn!(
|
||||
error = %err,
|
||||
line_number = line_count,
|
||||
raw = %if trimmed.len() > 200 {
|
||||
format!("{}...", &trimmed[..200])
|
||||
} else {
|
||||
trimmed.to_string()
|
||||
},
|
||||
"agent stdout: invalid JSON"
|
||||
);
|
||||
json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "_adapter/invalid_stdout",
|
||||
"params": {
|
||||
"error": err.to_string(),
|
||||
"raw": trimmed,
|
||||
}
|
||||
})
|
||||
}
|
||||
};
|
||||
|
||||
let is_response = payload.get("id").is_some() && payload.get("method").is_none();
|
||||
if is_response {
|
||||
let key = id_key(payload.get("id").expect("checked"));
|
||||
let has_error = payload.get("error").is_some();
|
||||
if let Some(tx) = pending.lock().await.remove(&key) {
|
||||
tracing::debug!(
|
||||
id = %key,
|
||||
has_error = has_error,
|
||||
age_ms = spawned_at.elapsed().as_millis() as u64,
|
||||
"agent stdout: response matched to pending request"
|
||||
);
|
||||
let _ = tx.send(payload.clone());
|
||||
// Also broadcast the response so SSE/notification subscribers
|
||||
// see it in order after preceding notifications. This lets the
|
||||
// SSE translation task detect turn completion after all
|
||||
// session/update events have been processed.
|
||||
let seq = sequence.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
let message = StreamMessage {
|
||||
sequence: seq,
|
||||
payload,
|
||||
};
|
||||
{
|
||||
let mut guard = ring.lock().await;
|
||||
guard.push_back(message.clone());
|
||||
while guard.len() > RING_BUFFER_SIZE {
|
||||
guard.pop_front();
|
||||
}
|
||||
}
|
||||
let _ = sender.send(message);
|
||||
continue;
|
||||
} else {
|
||||
tracing::warn!(
|
||||
id = %key,
|
||||
has_error = has_error,
|
||||
"agent stdout: response has no matching pending request (orphan)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let method = payload
|
||||
.get("method")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("<none>");
|
||||
tracing::debug!(
|
||||
method = method,
|
||||
line_number = line_count,
|
||||
"agent stdout: notification/event → SSE broadcast"
|
||||
);
|
||||
|
||||
let seq = sequence.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
let message = StreamMessage {
|
||||
sequence: seq,
|
||||
payload,
|
||||
};
|
||||
|
||||
{
|
||||
let mut guard = ring.lock().await;
|
||||
guard.push_back(message.clone());
|
||||
while guard.len() > RING_BUFFER_SIZE {
|
||||
guard.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
let _ = sender.send(message);
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
total_lines = line_count,
|
||||
age_ms = spawned_at.elapsed().as_millis() as u64,
|
||||
"agent stdout: stream ended"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_stderr_loop(&self, stderr: tokio::process::ChildStderr) {
|
||||
let spawned_at = self.spawned_at;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut lines = BufReader::new(stderr).lines();
|
||||
let mut line_count: u64 = 0;
|
||||
|
||||
while let Ok(Some(line)) = lines.next_line().await {
|
||||
line_count += 1;
|
||||
tracing::info!(
|
||||
line_number = line_count,
|
||||
age_ms = spawned_at.elapsed().as_millis() as u64,
|
||||
"agent stderr: {}",
|
||||
line
|
||||
);
|
||||
}
|
||||
|
||||
tracing::debug!(
|
||||
total_lines = line_count,
|
||||
age_ms = spawned_at.elapsed().as_millis() as u64,
|
||||
"agent stderr: stream ended"
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
fn spawn_exit_watcher(&self) {
|
||||
let child = self.child.clone();
|
||||
let sender = self.sender.clone();
|
||||
let ring = self.ring.clone();
|
||||
let sequence = self.sequence.clone();
|
||||
let spawned_at = self.spawned_at;
|
||||
let pending = self.pending.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let status = {
|
||||
let mut guard = child.lock().await;
|
||||
guard.wait().await.ok()
|
||||
};
|
||||
|
||||
let age_ms = spawned_at.elapsed().as_millis() as u64;
|
||||
let pending_count = pending.lock().await.len();
|
||||
|
||||
if let Some(status) = status {
|
||||
tracing::warn!(
|
||||
success = status.success(),
|
||||
code = status.code(),
|
||||
age_ms = age_ms,
|
||||
pending_requests = pending_count,
|
||||
"agent process exited"
|
||||
);
|
||||
|
||||
let payload = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"method": "_adapter/agent_exited",
|
||||
"params": {
|
||||
"success": status.success(),
|
||||
"code": status.code(),
|
||||
}
|
||||
});
|
||||
|
||||
let seq = sequence.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
let message = StreamMessage {
|
||||
sequence: seq,
|
||||
payload,
|
||||
};
|
||||
|
||||
{
|
||||
let mut guard = ring.lock().await;
|
||||
guard.push_back(message.clone());
|
||||
while guard.len() > RING_BUFFER_SIZE {
|
||||
guard.pop_front();
|
||||
}
|
||||
}
|
||||
|
||||
let _ = sender.send(message);
|
||||
} else {
|
||||
tracing::error!(
|
||||
age_ms = age_ms,
|
||||
pending_requests = pending_count,
|
||||
"agent process: failed to get exit status"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn send_to_subprocess(&self, payload: &Value) -> Result<(), AdapterError> {
|
||||
let method = payload
|
||||
.get("method")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("<none>");
|
||||
let id = payload
|
||||
.get("id")
|
||||
.map(|v| v.to_string())
|
||||
.unwrap_or_default();
|
||||
|
||||
tracing::debug!(
|
||||
method = method,
|
||||
id = %id,
|
||||
bytes = serde_json::to_vec(payload).map(|b| b.len()).unwrap_or(0),
|
||||
"stdin: writing message to agent"
|
||||
);
|
||||
|
||||
let mut stdin = self.stdin.lock().await;
|
||||
let bytes = serde_json::to_vec(payload).map_err(AdapterError::Serialize)?;
|
||||
stdin.write_all(&bytes).await.map_err(|err| {
|
||||
tracing::error!(method = method, id = %id, error = %err, "stdin: write_all failed");
|
||||
AdapterError::Write(err)
|
||||
})?;
|
||||
stdin.write_all(b"\n").await.map_err(|err| {
|
||||
tracing::error!(method = method, id = %id, error = %err, "stdin: newline write failed");
|
||||
AdapterError::Write(err)
|
||||
})?;
|
||||
stdin.flush().await.map_err(|err| {
|
||||
tracing::error!(method = method, id = %id, error = %err, "stdin: flush failed");
|
||||
AdapterError::Write(err)
|
||||
})?;
|
||||
|
||||
tracing::debug!(method = method, id = %id, "stdin: write+flush complete");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn id_key(value: &Value) -> String {
|
||||
serde_json::to_string(value).unwrap_or_else(|_| "null".to_string())
|
||||
}
|
||||
143
server/packages/acp-http-adapter/src/registry.rs
Normal file
143
server/packages/acp-http-adapter/src/registry.rs
Normal file
|
|
@ -0,0 +1,143 @@
|
|||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use serde::Deserialize;
|
||||
use serde_json::Value;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LaunchSpec {
|
||||
pub program: PathBuf,
|
||||
pub args: Vec<String>,
|
||||
pub env: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Error)]
|
||||
pub enum RegistryError {
|
||||
#[error("invalid registry json: {0}")]
|
||||
InvalidJson(#[from] serde_json::Error),
|
||||
#[error("unable to resolve registry entry from blob")]
|
||||
UnsupportedBlob,
|
||||
#[error("registry blob has agents[] but no --registry-agent-id was provided")]
|
||||
MissingAgentId,
|
||||
#[error("agent '{0}' was not found in registry blob")]
|
||||
AgentNotFound(String),
|
||||
#[error("registry entry has no supported launch target")]
|
||||
MissingLaunchTarget,
|
||||
#[error("platform '{0}' is not present in distribution.binary")]
|
||||
UnsupportedPlatform(String),
|
||||
}
|
||||
|
||||
impl LaunchSpec {
|
||||
pub fn from_registry_blob(blob: &str, agent_id: Option<&str>) -> Result<Self, RegistryError> {
|
||||
let value: Value = serde_json::from_str(blob)?;
|
||||
Self::from_registry_value(value, agent_id)
|
||||
}
|
||||
|
||||
fn from_registry_value(value: Value, agent_id: Option<&str>) -> Result<Self, RegistryError> {
|
||||
if value.get("agents").is_some() {
|
||||
let doc: RegistryDocument = serde_json::from_value(value)?;
|
||||
let wanted = agent_id.ok_or(RegistryError::MissingAgentId)?;
|
||||
let agent = doc
|
||||
.agents
|
||||
.into_iter()
|
||||
.find(|a| a.id == wanted)
|
||||
.ok_or_else(|| RegistryError::AgentNotFound(wanted.to_string()))?;
|
||||
return Self::from_distribution(agent.distribution);
|
||||
}
|
||||
|
||||
if value.get("distribution").is_some() {
|
||||
let entry: RegistryAgent = serde_json::from_value(value)?;
|
||||
return Self::from_distribution(entry.distribution);
|
||||
}
|
||||
|
||||
if value.get("npx").is_some() || value.get("binary").is_some() {
|
||||
let distribution: RegistryDistribution = serde_json::from_value(value)?;
|
||||
return Self::from_distribution(distribution);
|
||||
}
|
||||
|
||||
Err(RegistryError::UnsupportedBlob)
|
||||
}
|
||||
|
||||
fn from_distribution(distribution: RegistryDistribution) -> Result<Self, RegistryError> {
|
||||
if let Some(npx) = distribution.npx {
|
||||
let mut args = vec!["-y".to_string(), npx.package];
|
||||
args.extend(npx.args);
|
||||
return Ok(Self {
|
||||
program: PathBuf::from("npx"),
|
||||
args,
|
||||
env: npx.env,
|
||||
});
|
||||
}
|
||||
|
||||
if let Some(binary) = distribution.binary {
|
||||
let platform = platform_key().ok_or(RegistryError::UnsupportedPlatform(format!(
|
||||
"{}/{}",
|
||||
std::env::consts::OS,
|
||||
std::env::consts::ARCH
|
||||
)))?;
|
||||
let target = binary
|
||||
.get(platform)
|
||||
.ok_or_else(|| RegistryError::UnsupportedPlatform(platform.to_string()))?;
|
||||
return Ok(Self {
|
||||
program: PathBuf::from(&target.cmd),
|
||||
args: target.args.clone(),
|
||||
env: target.env.clone(),
|
||||
});
|
||||
}
|
||||
|
||||
Err(RegistryError::MissingLaunchTarget)
|
||||
}
|
||||
}
|
||||
|
||||
fn platform_key() -> Option<&'static str> {
|
||||
match (std::env::consts::OS, std::env::consts::ARCH) {
|
||||
("linux", "x86_64") => Some("linux-x86_64"),
|
||||
("linux", "aarch64") => Some("linux-aarch64"),
|
||||
("macos", "x86_64") => Some("darwin-x86_64"),
|
||||
("macos", "aarch64") => Some("darwin-aarch64"),
|
||||
("windows", "x86_64") => Some("windows-x86_64"),
|
||||
("windows", "aarch64") => Some("windows-aarch64"),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RegistryDocument {
|
||||
agents: Vec<RegistryAgent>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RegistryAgent {
|
||||
#[allow(dead_code)]
|
||||
id: String,
|
||||
distribution: RegistryDistribution,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RegistryDistribution {
|
||||
#[serde(default)]
|
||||
npx: Option<RegistryNpx>,
|
||||
#[serde(default)]
|
||||
binary: Option<HashMap<String, RegistryBinaryTarget>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RegistryNpx {
|
||||
package: String,
|
||||
#[serde(default)]
|
||||
args: Vec<String>,
|
||||
#[serde(default)]
|
||||
env: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct RegistryBinaryTarget {
|
||||
#[allow(dead_code)]
|
||||
archive: Option<String>,
|
||||
cmd: String,
|
||||
#[serde(default)]
|
||||
args: Vec<String>,
|
||||
#[serde(default)]
|
||||
env: HashMap<String, String>,
|
||||
}
|
||||
338
server/packages/acp-http-adapter/tests/e2e.rs
Normal file
338
server/packages/acp-http-adapter/tests/e2e.rs
Normal file
|
|
@ -0,0 +1,338 @@
|
|||
use std::io;
|
||||
use std::net::TcpListener;
|
||||
use std::path::PathBuf;
|
||||
use std::process::{Child, Command, Stdio};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures::StreamExt;
|
||||
use reqwest::{Client, StatusCode};
|
||||
use serde_json::{json, Value};
|
||||
|
||||
struct AdapterHandle {
|
||||
child: Child,
|
||||
base_url: String,
|
||||
}
|
||||
|
||||
impl Drop for AdapterHandle {
|
||||
fn drop(&mut self) {
|
||||
let _ = self.child.kill();
|
||||
let _ = self.child.wait();
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn health_and_request_response_round_trip() {
|
||||
let adapter = spawn_adapter().expect("spawn adapter");
|
||||
wait_for_health(&adapter.base_url)
|
||||
.await
|
||||
.expect("wait for health");
|
||||
|
||||
let client = Client::new();
|
||||
let health = client
|
||||
.get(format!("{}/v1/health", adapter.base_url))
|
||||
.send()
|
||||
.await
|
||||
.expect("health request");
|
||||
assert_eq!(health.status(), StatusCode::OK);
|
||||
let health_json: Value = health.json().await.expect("health json");
|
||||
assert_eq!(health_json["ok"], true);
|
||||
|
||||
let payload = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 1,
|
||||
"method": "mock/ping",
|
||||
"params": {
|
||||
"text": "hello"
|
||||
}
|
||||
});
|
||||
|
||||
let response = client
|
||||
.post(format!("{}/v1/rpc", adapter.base_url))
|
||||
.json(&payload)
|
||||
.send()
|
||||
.await
|
||||
.expect("post rpc");
|
||||
assert_eq!(response.status(), StatusCode::OK);
|
||||
|
||||
let body: Value = response.json().await.expect("response json");
|
||||
assert_eq!(body["jsonrpc"], "2.0");
|
||||
assert_eq!(body["id"], 1);
|
||||
assert_eq!(body["result"]["echoed"]["method"], "mock/ping");
|
||||
assert_eq!(body["result"]["echoed"]["params"]["text"], "hello");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn sse_request_and_client_response_flow() {
|
||||
let adapter = spawn_adapter().expect("spawn adapter");
|
||||
wait_for_health(&adapter.base_url)
|
||||
.await
|
||||
.expect("wait for health");
|
||||
|
||||
let client = Client::new();
|
||||
|
||||
let sse_response = client
|
||||
.get(format!("{}/v1/rpc", adapter.base_url))
|
||||
.header("accept", "text/event-stream")
|
||||
.send()
|
||||
.await
|
||||
.expect("open sse");
|
||||
assert_eq!(sse_response.status(), StatusCode::OK);
|
||||
|
||||
let mut sse = SseReader::new(sse_response);
|
||||
|
||||
let request = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": 42,
|
||||
"method": "mock/ask_client",
|
||||
"params": {
|
||||
"need": "input"
|
||||
}
|
||||
});
|
||||
|
||||
let initial = client
|
||||
.post(format!("{}/v1/rpc", adapter.base_url))
|
||||
.json(&request)
|
||||
.send()
|
||||
.await
|
||||
.expect("post ask_client");
|
||||
assert_eq!(initial.status(), StatusCode::OK);
|
||||
|
||||
let initial_body: Value = initial.json().await.expect("initial body");
|
||||
assert_eq!(initial_body["id"], 42);
|
||||
|
||||
let mut agent_request_id = None;
|
||||
for _ in 0..10 {
|
||||
let event = sse
|
||||
.next_json(Duration::from_secs(3))
|
||||
.await
|
||||
.expect("sse event");
|
||||
if event["method"] == "mock/request" {
|
||||
agent_request_id = event.get("id").cloned();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
let agent_request_id = agent_request_id.expect("agent request id");
|
||||
|
||||
let client_response = json!({
|
||||
"jsonrpc": "2.0",
|
||||
"id": agent_request_id,
|
||||
"result": {
|
||||
"approved": true
|
||||
}
|
||||
});
|
||||
|
||||
let response = client
|
||||
.post(format!("{}/v1/rpc", adapter.base_url))
|
||||
.json(&client_response)
|
||||
.send()
|
||||
.await
|
||||
.expect("post client response");
|
||||
assert_eq!(response.status(), StatusCode::ACCEPTED);
|
||||
|
||||
let mut saw_client_response = false;
|
||||
for _ in 0..10 {
|
||||
let event = sse
|
||||
.next_json(Duration::from_secs(3))
|
||||
.await
|
||||
.expect("sse follow-up");
|
||||
if event["method"] == "mock/client_response" {
|
||||
assert_eq!(event["params"]["result"]["approved"], true);
|
||||
saw_client_response = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assert!(
|
||||
saw_client_response,
|
||||
"expected mock/client_response over SSE"
|
||||
);
|
||||
}
|
||||
|
||||
struct SseReader {
|
||||
stream: futures::stream::BoxStream<'static, Result<bytes::Bytes, reqwest::Error>>,
|
||||
buffer: Vec<u8>,
|
||||
}
|
||||
|
||||
impl SseReader {
|
||||
fn new(response: reqwest::Response) -> Self {
|
||||
Self {
|
||||
stream: response.bytes_stream().boxed(),
|
||||
buffer: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn next_json(&mut self, timeout: Duration) -> io::Result<Value> {
|
||||
let deadline = Instant::now() + timeout;
|
||||
|
||||
loop {
|
||||
if let Some(event) = self.try_parse_event()? {
|
||||
return Ok(event);
|
||||
}
|
||||
|
||||
if Instant::now() >= deadline {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
"timed out waiting for sse event",
|
||||
));
|
||||
}
|
||||
|
||||
let remaining = deadline.saturating_duration_since(Instant::now());
|
||||
let chunk = tokio::time::timeout(remaining, self.stream.next())
|
||||
.await
|
||||
.map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "timed out reading sse"))?;
|
||||
|
||||
match chunk {
|
||||
Some(Ok(bytes)) => self.buffer.extend_from_slice(&bytes),
|
||||
Some(Err(err)) => {
|
||||
return Err(io::Error::other(format!("sse stream error: {err}")));
|
||||
}
|
||||
None => {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::UnexpectedEof,
|
||||
"sse stream ended",
|
||||
));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn try_parse_event(&mut self) -> io::Result<Option<Value>> {
|
||||
let split = self
|
||||
.buffer
|
||||
.windows(2)
|
||||
.position(|window| window == b"\n\n")
|
||||
.or_else(|| {
|
||||
self.buffer
|
||||
.windows(4)
|
||||
.position(|window| window == b"\r\n\r\n")
|
||||
});
|
||||
|
||||
let Some(idx) = split else {
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let delimiter_len = if self.buffer.get(idx..idx + 2) == Some(b"\n\n") {
|
||||
2
|
||||
} else {
|
||||
4
|
||||
};
|
||||
|
||||
let block = self.buffer.drain(..idx + delimiter_len).collect::<Vec<_>>();
|
||||
let text = String::from_utf8_lossy(&block);
|
||||
|
||||
let data = text
|
||||
.lines()
|
||||
.filter_map(|line| line.strip_prefix("data: "))
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n");
|
||||
|
||||
if data.is_empty() {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
let value: Value = serde_json::from_str(&data).map_err(|err| {
|
||||
io::Error::new(
|
||||
io::ErrorKind::InvalidData,
|
||||
format!("invalid sse json payload: {err}"),
|
||||
)
|
||||
})?;
|
||||
|
||||
Ok(Some(value))
|
||||
}
|
||||
}
|
||||
|
||||
fn spawn_adapter() -> io::Result<AdapterHandle> {
|
||||
let port = pick_port()?;
|
||||
let base_url = format!("http://127.0.0.1:{port}");
|
||||
|
||||
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
|
||||
let workspace_root = manifest_dir
|
||||
.ancestors()
|
||||
.nth(3)
|
||||
.expect("workspace root")
|
||||
.to_path_buf();
|
||||
let mock_agent_js = workspace_root.join("examples/mock-acp-agent/dist/index.js");
|
||||
|
||||
let registry_blob = json!({
|
||||
"id": "mock-acp-agent",
|
||||
"distribution": {
|
||||
"binary": {
|
||||
"linux-x86_64": {
|
||||
"cmd": "node",
|
||||
"args": [mock_agent_js.to_string_lossy()],
|
||||
"env": {}
|
||||
},
|
||||
"linux-aarch64": {
|
||||
"cmd": "node",
|
||||
"args": [mock_agent_js.to_string_lossy()],
|
||||
"env": {}
|
||||
},
|
||||
"darwin-x86_64": {
|
||||
"cmd": "node",
|
||||
"args": [mock_agent_js.to_string_lossy()],
|
||||
"env": {}
|
||||
},
|
||||
"darwin-aarch64": {
|
||||
"cmd": "node",
|
||||
"args": [mock_agent_js.to_string_lossy()],
|
||||
"env": {}
|
||||
},
|
||||
"windows-x86_64": {
|
||||
"cmd": "node",
|
||||
"args": [mock_agent_js.to_string_lossy()],
|
||||
"env": {}
|
||||
},
|
||||
"windows-aarch64": {
|
||||
"cmd": "node",
|
||||
"args": [mock_agent_js.to_string_lossy()],
|
||||
"env": {}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let child = Command::new(env!("CARGO_BIN_EXE_acp-http-adapter"))
|
||||
.arg("--host")
|
||||
.arg("127.0.0.1")
|
||||
.arg("--port")
|
||||
.arg(port.to_string())
|
||||
.arg("--registry-json")
|
||||
.arg(registry_blob)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()?;
|
||||
|
||||
Ok(AdapterHandle { child, base_url })
|
||||
}
|
||||
|
||||
fn pick_port() -> io::Result<u16> {
|
||||
let listener = TcpListener::bind("127.0.0.1:0")?;
|
||||
let port = listener.local_addr()?.port();
|
||||
drop(listener);
|
||||
Ok(port)
|
||||
}
|
||||
|
||||
async fn wait_for_health(base_url: &str) -> io::Result<()> {
|
||||
let client = Client::new();
|
||||
let deadline = Instant::now() + Duration::from_secs(10);
|
||||
|
||||
loop {
|
||||
if Instant::now() > deadline {
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::TimedOut,
|
||||
"adapter did not become healthy",
|
||||
));
|
||||
}
|
||||
|
||||
if let Ok(response) = client.get(format!("{base_url}/v1/health")).send().await {
|
||||
if response.status() == StatusCode::OK {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue