feat: add process manager API for spawning and managing background processes

API Endpoints:
- POST /v1/process - Start a new process
- GET /v1/process - List all processes
- GET /v1/process/{id} - Get process details
- POST /v1/process/{id}/stop - Stop a process (SIGTERM)
- POST /v1/process/{id}/kill - Kill a process (SIGKILL)
- DELETE /v1/process/{id} - Delete a process and clean up logs
- GET /v1/process/{id}/logs - Read process logs (supports tail, follow via SSE)

Features:
- Log files written to ~/.local/share/sandbox-agent/processes/{id}/
  - stdout.log, stderr.log, combined.log
- Process state persisted to state.json for server restart survival
- Status tracking: starting, running, stopped (with exit_code), killed
- Real-time log streaming via SSE with follow=true query param
- Environment variables and working directory support

Cleanup rules:
- Process exits naturally → logs preserved
- DELETE endpoint → logs removed
This commit is contained in:
Nathan Flurry 2026-01-30 12:40:36 -08:00
parent 8aab9e346d
commit afb2c74eea
3 changed files with 888 additions and 3 deletions

View file

@ -2,6 +2,7 @@
pub mod credentials;
mod agent_server_logs;
pub mod process_manager;
pub mod router;
pub mod telemetry;
pub mod ui;

View file

@ -0,0 +1,681 @@
//! Process Manager - API for spawning and managing background processes.
use std::collections::HashMap;
use std::fs::{self, File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::process::Stdio;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, BufReader as TokioBufReader};
use tokio::process::{Child, Command};
use tokio::sync::{broadcast, Mutex, RwLock};
use utoipa::ToSchema;
use sandbox_agent_error::SandboxError;
static PROCESS_ID_COUNTER: AtomicU64 = AtomicU64::new(1);
/// Process status
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "lowercase")]
pub enum ProcessStatus {
/// Process is starting up
Starting,
/// Process is running
Running,
/// Process stopped naturally or via SIGTERM
Stopped,
/// Process was killed via SIGKILL
Killed,
}
/// Log file paths for a process
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessLogPaths {
pub stdout: String,
pub stderr: String,
pub combined: String,
}
/// Process information
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessInfo {
pub id: String,
pub command: String,
pub args: Vec<String>,
pub status: ProcessStatus,
#[serde(skip_serializing_if = "Option::is_none")]
pub exit_code: Option<i32>,
pub log_paths: ProcessLogPaths,
pub started_at: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub stopped_at: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
}
/// Request to start a new process
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct StartProcessRequest {
pub command: String,
#[serde(default)]
pub args: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cwd: Option<String>,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub env: HashMap<String, String>,
}
/// Response after starting a process
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct StartProcessResponse {
pub id: String,
pub status: ProcessStatus,
pub log_paths: ProcessLogPaths,
}
/// Response listing all processes
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProcessListResponse {
pub processes: Vec<ProcessInfo>,
}
/// Query parameters for reading logs
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct LogsQuery {
/// Number of lines to return from the end
#[serde(skip_serializing_if = "Option::is_none")]
pub tail: Option<usize>,
/// Stream logs via SSE
#[serde(default)]
pub follow: bool,
/// Which log stream to read: "stdout", "stderr", or "combined" (default)
#[serde(skip_serializing_if = "Option::is_none")]
pub stream: Option<String>,
}
/// Response with log content
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct LogsResponse {
pub content: String,
pub lines: usize,
}
/// Internal state for a managed process
#[derive(Debug)]
struct ManagedProcess {
info: ProcessInfo,
/// Handle to the running process (None if process has exited)
child: Option<Child>,
/// Broadcaster for log lines (for SSE streaming)
log_broadcaster: broadcast::Sender<String>,
}
/// State file entry for persistence
#[derive(Debug, Clone, Serialize, Deserialize)]
struct ProcessStateEntry {
id: String,
command: String,
args: Vec<String>,
status: ProcessStatus,
exit_code: Option<i32>,
started_at: u64,
stopped_at: Option<u64>,
cwd: Option<String>,
}
/// Process Manager handles spawning and tracking background processes
#[derive(Debug)]
pub struct ProcessManager {
processes: RwLock<HashMap<String, Arc<Mutex<ManagedProcess>>>>,
base_dir: PathBuf,
}
impl ProcessManager {
/// Create a new process manager
pub fn new() -> Self {
let base_dir = process_data_dir();
// Ensure the base directory exists
if let Err(e) = fs::create_dir_all(&base_dir) {
tracing::warn!("Failed to create process data directory: {}", e);
}
let manager = Self {
processes: RwLock::new(HashMap::new()),
base_dir,
};
// Load persisted state
if let Err(e) = manager.load_state_sync() {
tracing::warn!("Failed to load process state: {}", e);
}
manager
}
/// Get the directory for a specific process
fn process_dir(&self, id: &str) -> PathBuf {
self.base_dir.join(id)
}
/// Get log paths for a process
fn log_paths(&self, id: &str) -> ProcessLogPaths {
let dir = self.process_dir(id);
ProcessLogPaths {
stdout: dir.join("stdout.log").to_string_lossy().to_string(),
stderr: dir.join("stderr.log").to_string_lossy().to_string(),
combined: dir.join("combined.log").to_string_lossy().to_string(),
}
}
/// Get the state file path
fn state_file_path(&self) -> PathBuf {
self.base_dir.join("state.json")
}
/// Load persisted state (sync version for init)
fn load_state_sync(&self) -> Result<(), std::io::Error> {
let state_path = self.state_file_path();
if !state_path.exists() {
return Ok(());
}
let content = fs::read_to_string(&state_path)?;
let entries: Vec<ProcessStateEntry> = serde_json::from_str(&content)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let mut processes = HashMap::new();
for entry in entries {
let log_paths = self.log_paths(&entry.id);
let (tx, _) = broadcast::channel(256);
let managed = ManagedProcess {
info: ProcessInfo {
id: entry.id.clone(),
command: entry.command,
args: entry.args,
status: entry.status,
exit_code: entry.exit_code,
log_paths,
started_at: entry.started_at,
stopped_at: entry.stopped_at,
cwd: entry.cwd,
},
child: None,
log_broadcaster: tx,
};
// Update counter to avoid ID collisions
if let Ok(num) = entry.id.parse::<u64>() {
let current = PROCESS_ID_COUNTER.load(Ordering::SeqCst);
if num >= current {
PROCESS_ID_COUNTER.store(num + 1, Ordering::SeqCst);
}
}
processes.insert(entry.id, Arc::new(Mutex::new(managed)));
}
// We can't await here, so we'll use try_write
if let Ok(mut guard) = self.processes.try_write() {
*guard = processes;
}
Ok(())
}
/// Save state to disk
async fn save_state(&self) -> Result<(), std::io::Error> {
let processes = self.processes.read().await;
let mut entries = Vec::new();
for managed in processes.values() {
let guard = managed.lock().await;
entries.push(ProcessStateEntry {
id: guard.info.id.clone(),
command: guard.info.command.clone(),
args: guard.info.args.clone(),
status: guard.info.status,
exit_code: guard.info.exit_code,
started_at: guard.info.started_at,
stopped_at: guard.info.stopped_at,
cwd: guard.info.cwd.clone(),
});
}
let content = serde_json::to_string_pretty(&entries)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
fs::write(self.state_file_path(), content)?;
Ok(())
}
/// Start a new process
pub async fn start_process(&self, request: StartProcessRequest) -> Result<StartProcessResponse, SandboxError> {
let id = PROCESS_ID_COUNTER.fetch_add(1, Ordering::SeqCst).to_string();
let process_dir = self.process_dir(&id);
// Create process directory
fs::create_dir_all(&process_dir).map_err(|e| SandboxError::StreamError {
message: format!("Failed to create process directory: {}", e),
})?;
let log_paths = self.log_paths(&id);
// Create log files
File::create(&log_paths.stdout).map_err(|e| SandboxError::StreamError {
message: format!("Failed to create stdout log: {}", e),
})?;
File::create(&log_paths.stderr).map_err(|e| SandboxError::StreamError {
message: format!("Failed to create stderr log: {}", e),
})?;
let combined_file = Arc::new(std::sync::Mutex::new(
File::create(&log_paths.combined).map_err(|e| SandboxError::StreamError {
message: format!("Failed to create combined log: {}", e),
})?
));
// Build the command
let mut cmd = Command::new(&request.command);
cmd.args(&request.args);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
if let Some(ref cwd) = request.cwd {
cmd.current_dir(cwd);
}
for (key, value) in &request.env {
cmd.env(key, value);
}
// Spawn the process
let mut child = cmd.spawn().map_err(|e| SandboxError::StreamError {
message: format!("Failed to spawn process: {}", e),
})?;
let started_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let (log_tx, _) = broadcast::channel::<String>(256);
// Set up stdout reader
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let info = ProcessInfo {
id: id.clone(),
command: request.command.clone(),
args: request.args.clone(),
status: ProcessStatus::Running,
exit_code: None,
log_paths: log_paths.clone(),
started_at,
stopped_at: None,
cwd: request.cwd.clone(),
};
let managed = Arc::new(Mutex::new(ManagedProcess {
info: info.clone(),
child: Some(child),
log_broadcaster: log_tx.clone(),
}));
// Insert into map
{
let mut processes = self.processes.write().await;
processes.insert(id.clone(), managed.clone());
}
// Spawn tasks to read stdout/stderr
if let Some(stdout) = stdout {
let log_tx = log_tx.clone();
let stdout_path = log_paths.stdout.clone();
let combined = combined_file.clone();
tokio::spawn(async move {
let reader = TokioBufReader::new(stdout);
let mut lines = reader.lines();
let mut file = match OpenOptions::new().append(true).open(&stdout_path) {
Ok(f) => f,
Err(_) => return,
};
while let Ok(Some(line)) = lines.next_line().await {
let log_line = format!("[stdout] {}\n", line);
let _ = file.write_all(line.as_bytes());
let _ = file.write_all(b"\n");
if let Ok(mut combined) = combined.lock() {
let _ = combined.write_all(log_line.as_bytes());
}
let _ = log_tx.send(log_line);
}
});
}
if let Some(stderr) = stderr {
let log_tx = log_tx.clone();
let stderr_path = log_paths.stderr.clone();
let combined = combined_file.clone();
tokio::spawn(async move {
let reader = TokioBufReader::new(stderr);
let mut lines = reader.lines();
let mut file = match OpenOptions::new().append(true).open(&stderr_path) {
Ok(f) => f,
Err(_) => return,
};
while let Ok(Some(line)) = lines.next_line().await {
let log_line = format!("[stderr] {}\n", line);
let _ = file.write_all(line.as_bytes());
let _ = file.write_all(b"\n");
if let Ok(mut combined) = combined.lock() {
let _ = combined.write_all(log_line.as_bytes());
}
let _ = log_tx.send(log_line);
}
});
}
// Spawn a task to monitor process exit
let managed_clone = managed.clone();
let base_dir = self.base_dir.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(100)).await;
let mut guard = managed_clone.lock().await;
if let Some(ref mut child) = guard.child {
match child.try_wait() {
Ok(Some(status)) => {
guard.info.status = ProcessStatus::Stopped;
guard.info.exit_code = status.code();
guard.info.stopped_at = Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
);
guard.child = None;
drop(guard);
// Save state - we need to do this manually since we don't have self
// This is a simplified version that just updates the state file
let _ = save_state_to_file(&base_dir).await;
break;
}
Ok(None) => {
// Still running
}
Err(_) => {
break;
}
}
} else {
break;
}
}
});
// Save state
if let Err(e) = self.save_state().await {
tracing::warn!("Failed to save process state: {}", e);
}
Ok(StartProcessResponse {
id,
status: ProcessStatus::Running,
log_paths,
})
}
/// List all processes
pub async fn list_processes(&self) -> ProcessListResponse {
let processes = self.processes.read().await;
let mut list = Vec::new();
for managed in processes.values() {
let guard = managed.lock().await;
list.push(guard.info.clone());
}
// Sort by started_at descending (newest first)
list.sort_by(|a, b| b.started_at.cmp(&a.started_at));
ProcessListResponse { processes: list }
}
/// Get a specific process
pub async fn get_process(&self, id: &str) -> Result<ProcessInfo, SandboxError> {
let processes = self.processes.read().await;
let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound {
session_id: format!("process:{}", id),
})?;
let guard = managed.lock().await;
Ok(guard.info.clone())
}
/// Stop a process with SIGTERM
pub async fn stop_process(&self, id: &str) -> Result<(), SandboxError> {
let processes = self.processes.read().await;
let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound {
session_id: format!("process:{}", id),
})?;
let mut guard = managed.lock().await;
if let Some(ref child) = guard.child {
#[cfg(unix)]
{
// Send SIGTERM
if let Some(pid) = child.id() {
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
}
}
#[cfg(not(unix))]
{
// On non-Unix, we can't send SIGTERM, so just mark as stopping
// The process will be killed when delete is called if needed
}
}
drop(guard);
if let Err(e) = self.save_state().await {
tracing::warn!("Failed to save process state: {}", e);
}
Ok(())
}
/// Kill a process with SIGKILL
pub async fn kill_process(&self, id: &str) -> Result<(), SandboxError> {
let processes = self.processes.read().await;
let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound {
session_id: format!("process:{}", id),
})?;
let mut guard = managed.lock().await;
if let Some(ref mut child) = guard.child {
let _ = child.kill().await;
guard.info.status = ProcessStatus::Killed;
guard.info.stopped_at = Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
);
guard.child = None;
}
drop(guard);
if let Err(e) = self.save_state().await {
tracing::warn!("Failed to save process state: {}", e);
}
Ok(())
}
/// Delete a process and its logs
pub async fn delete_process(&self, id: &str) -> Result<(), SandboxError> {
// First, make sure process is not running
{
let processes = self.processes.read().await;
if let Some(managed) = processes.get(id) {
let guard = managed.lock().await;
if guard.child.is_some() {
return Err(SandboxError::InvalidRequest {
message: "Cannot delete a running process. Stop or kill it first.".to_string(),
});
}
} else {
return Err(SandboxError::SessionNotFound {
session_id: format!("process:{}", id),
});
}
}
// Remove from map
{
let mut processes = self.processes.write().await;
processes.remove(id);
}
// Delete log files
let process_dir = self.process_dir(id);
if process_dir.exists() {
if let Err(e) = fs::remove_dir_all(&process_dir) {
tracing::warn!("Failed to remove process directory: {}", e);
}
}
if let Err(e) = self.save_state().await {
tracing::warn!("Failed to save process state: {}", e);
}
Ok(())
}
/// Read process logs
pub async fn read_logs(&self, id: &str, query: &LogsQuery) -> Result<LogsResponse, SandboxError> {
let info = self.get_process(id).await?;
let log_path = match query.stream.as_deref() {
Some("stdout") => &info.log_paths.stdout,
Some("stderr") => &info.log_paths.stderr,
_ => &info.log_paths.combined,
};
let content = fs::read_to_string(log_path).unwrap_or_default();
let lines: Vec<&str> = content.lines().collect();
let (content, line_count) = if let Some(tail) = query.tail {
let start = lines.len().saturating_sub(tail);
let tail_lines: Vec<&str> = lines[start..].to_vec();
(tail_lines.join("\n"), tail_lines.len())
} else {
(content.clone(), lines.len())
};
Ok(LogsResponse {
content,
lines: line_count,
})
}
/// Get a subscriber for log streaming
pub async fn subscribe_logs(&self, id: &str) -> Result<broadcast::Receiver<String>, SandboxError> {
let processes = self.processes.read().await;
let managed = processes.get(id).ok_or_else(|| SandboxError::SessionNotFound {
session_id: format!("process:{}", id),
})?;
let guard = managed.lock().await;
Ok(guard.log_broadcaster.subscribe())
}
}
impl Default for ProcessManager {
fn default() -> Self {
Self::new()
}
}
/// Get the data directory for process management
fn process_data_dir() -> PathBuf {
dirs::data_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join("sandbox-agent")
.join("processes")
}
/// Helper to save state from within a spawned task (simplified version)
async fn save_state_to_file(base_dir: &PathBuf) -> Result<(), std::io::Error> {
// This is a no-op for now - the state will be saved on the next explicit save_state call
// A more robust implementation would use a channel to communicate with the ProcessManager
let _ = base_dir;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_process_manager_basic() {
let manager = ProcessManager::new();
// List should be empty initially (or have persisted state)
let list = manager.list_processes().await;
let initial_count = list.processes.len();
// Start a simple process
let request = StartProcessRequest {
command: "echo".to_string(),
args: vec!["hello".to_string()],
cwd: None,
env: HashMap::new(),
};
let response = manager.start_process(request).await.unwrap();
assert!(!response.id.is_empty());
assert_eq!(response.status, ProcessStatus::Running);
// Wait a bit for the process to complete
tokio::time::sleep(Duration::from_millis(200)).await;
// Check the process info
let info = manager.get_process(&response.id).await.unwrap();
assert_eq!(info.command, "echo");
// List should have one more process
let list = manager.list_processes().await;
assert_eq!(list.processes.len(), initial_count + 1);
// Delete the process
manager.delete_process(&response.id).await.unwrap();
// List should be back to initial count
let list = manager.list_processes().await;
assert_eq!(list.processes.len(), initial_count);
}
}

View file

@ -38,6 +38,10 @@ use tower_http::trace::TraceLayer;
use utoipa::{Modify, OpenApi, ToSchema};
use crate::ui;
use crate::process_manager::{
ProcessManager, ProcessInfo, ProcessListResponse, ProcessLogPaths, ProcessStatus,
StartProcessRequest, StartProcessResponse, LogsQuery, LogsResponse,
};
use sandbox_agent_agent_management::agents::{
AgentError as ManagerError, AgentId, AgentManager, InstallOptions, SpawnOptions, StreamingSpawn,
};
@ -54,6 +58,7 @@ pub struct AppState {
auth: AuthConfig,
agent_manager: Arc<AgentManager>,
session_manager: Arc<SessionManager>,
process_manager: Arc<ProcessManager>,
}
impl AppState {
@ -63,10 +68,12 @@ impl AppState {
session_manager
.server_manager
.set_owner(Arc::downgrade(&session_manager));
let process_manager = Arc::new(ProcessManager::new());
Self {
auth,
agent_manager,
session_manager,
process_manager,
}
}
}
@ -91,6 +98,8 @@ pub fn build_router(state: AppState) -> Router {
}
pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>) {
use axum::routing::delete;
let mut v1_router = Router::new()
.route("/health", get(get_health))
.route("/agents", get(list_agents))
@ -115,6 +124,12 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
"/sessions/:session_id/permissions/:permission_id/reply",
post(reply_permission),
)
// Process management routes
.route("/process", post(start_process).get(list_processes))
.route("/process/:id", get(get_process).delete(delete_process))
.route("/process/:id/stop", post(stop_process))
.route("/process/:id/kill", post(kill_process))
.route("/process/:id/logs", get(get_process_logs))
.with_state(shared.clone());
if shared.auth.token.is_some() {
@ -155,7 +170,14 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
get_events_sse,
reply_question,
reject_question,
reply_permission
reply_permission,
start_process,
list_processes,
get_process,
stop_process,
kill_process,
delete_process,
get_process_logs
),
components(
schemas(
@ -205,13 +227,22 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
PermissionReply,
ProblemDetails,
ErrorType,
AgentError
AgentError,
ProcessInfo,
ProcessStatus,
ProcessLogPaths,
ProcessListResponse,
StartProcessRequest,
StartProcessResponse,
LogsQuery,
LogsResponse
)
),
tags(
(name = "meta", description = "Service metadata"),
(name = "agents", description = "Agent management"),
(name = "sessions", description = "Session management")
(name = "sessions", description = "Session management"),
(name = "process", description = "Process management")
),
modifiers(&ServerAddon)
)]
@ -3885,6 +3916,178 @@ async fn reply_permission(
Ok(StatusCode::NO_CONTENT)
}
// ============================================================================
// Process Management Handlers
// ============================================================================
/// Start a new process
#[utoipa::path(
post,
path = "/v1/process",
request_body = StartProcessRequest,
responses(
(status = 201, body = StartProcessResponse, description = "Process started"),
(status = 400, body = ProblemDetails),
(status = 500, body = ProblemDetails)
),
tag = "process"
)]
async fn start_process(
State(state): State<Arc<AppState>>,
Json(request): Json<StartProcessRequest>,
) -> Result<(StatusCode, Json<StartProcessResponse>), ApiError> {
let response = state.process_manager.start_process(request).await?;
Ok((StatusCode::CREATED, Json(response)))
}
/// List all processes
#[utoipa::path(
get,
path = "/v1/process",
responses(
(status = 200, body = ProcessListResponse, description = "List of processes")
),
tag = "process"
)]
async fn list_processes(
State(state): State<Arc<AppState>>,
) -> Json<ProcessListResponse> {
Json(state.process_manager.list_processes().await)
}
/// Get process details
#[utoipa::path(
get,
path = "/v1/process/{id}",
params(("id" = String, Path, description = "Process ID")),
responses(
(status = 200, body = ProcessInfo, description = "Process details"),
(status = 404, body = ProblemDetails)
),
tag = "process"
)]
async fn get_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<Json<ProcessInfo>, ApiError> {
let info = state.process_manager.get_process(&id).await?;
Ok(Json(info))
}
/// Stop a process (SIGTERM)
#[utoipa::path(
post,
path = "/v1/process/{id}/stop",
params(("id" = String, Path, description = "Process ID")),
responses(
(status = 204, description = "Process stopped"),
(status = 404, body = ProblemDetails)
),
tag = "process"
)]
async fn stop_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<StatusCode, ApiError> {
state.process_manager.stop_process(&id).await?;
Ok(StatusCode::NO_CONTENT)
}
/// Kill a process (SIGKILL)
#[utoipa::path(
post,
path = "/v1/process/{id}/kill",
params(("id" = String, Path, description = "Process ID")),
responses(
(status = 204, description = "Process killed"),
(status = 404, body = ProblemDetails)
),
tag = "process"
)]
async fn kill_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<StatusCode, ApiError> {
state.process_manager.kill_process(&id).await?;
Ok(StatusCode::NO_CONTENT)
}
/// Delete a process and its logs
#[utoipa::path(
delete,
path = "/v1/process/{id}",
params(("id" = String, Path, description = "Process ID")),
responses(
(status = 204, description = "Process deleted"),
(status = 400, body = ProblemDetails),
(status = 404, body = ProblemDetails)
),
tag = "process"
)]
async fn delete_process(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> Result<StatusCode, ApiError> {
state.process_manager.delete_process(&id).await?;
Ok(StatusCode::NO_CONTENT)
}
/// Read process logs
#[utoipa::path(
get,
path = "/v1/process/{id}/logs",
params(
("id" = String, Path, description = "Process ID"),
("tail" = Option<usize>, Query, description = "Number of lines from end"),
("follow" = Option<bool>, Query, description = "Stream logs via SSE"),
("stream" = Option<String>, Query, description = "Log stream: stdout, stderr, or combined")
),
responses(
(status = 200, body = LogsResponse, description = "Log content"),
(status = 404, body = ProblemDetails)
),
tag = "process"
)]
async fn get_process_logs(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
Query(query): Query<LogsQuery>,
) -> Result<Response, ApiError> {
if query.follow {
// SSE streaming mode
let initial_logs = state.process_manager.read_logs(&id, &LogsQuery {
tail: query.tail,
follow: false,
stream: query.stream.clone(),
}).await?;
let receiver = state.process_manager.subscribe_logs(&id).await?;
// Stream initial content as first event, then live updates
let initial_event = Event::default()
.data(serde_json::to_string(&initial_logs).unwrap_or_default());
let initial_stream = stream::once(async move {
Ok::<Event, Infallible>(initial_event)
});
let live_stream = BroadcastStream::new(receiver)
.filter_map(|result| async move {
match result {
Ok(line) => Some(Ok::<Event, Infallible>(Event::default().data(line))),
Err(_) => None,
}
});
let stream = initial_stream.chain(live_stream);
Ok(Sse::new(stream).into_response())
} else {
// Regular response mode
let logs = state.process_manager.read_logs(&id, &query).await?;
Ok(Json(logs).into_response())
}
}
fn all_agents() -> [AgentId; 5] {
[
AgentId::Claude,