diff --git a/docs/conversion.mdx b/docs/conversion.mdx
index d155ab4..4363e06 100644
--- a/docs/conversion.mdx
+++ b/docs/conversion.mdx
@@ -74,7 +74,7 @@ Policy:
Message normalization notes
- user vs assistant: normalized via role in the universal item; provider role fields or item types determine role.
-- file artifacts: always represented as content parts (type=file_ref) inside message/tool_result items, not a separate item kind.
+- file artifacts: always represented as content parts (type=file_ref) inside message/tool_result items, not a separate item kind. Rename actions use `target_path`.
- reasoning: represented as content parts (type=reasoning) inside message items, with visibility when available.
- subagents: OpenCode subtask parts and Claude Task tool usage are currently normalized into standard message/tool flow (no dedicated subagent fields).
- OpenCode unrolling: message.updated creates/updates the parent message item; tool-related parts emit separate tool item events (item.started/ item.completed) with parent_id pointing to the message item.
diff --git a/docs/opencode-compatibility.mdx b/docs/opencode-compatibility.mdx
index 563d8d6..c0ac2eb 100644
--- a/docs/opencode-compatibility.mdx
+++ b/docs/opencode-compatibility.mdx
@@ -17,8 +17,8 @@ Authentication matches `/v1`: if a token is configured, requests must include `A
| GET | /config/providers | Stubbed | Returns/echoes config payloads. | E2E: openapi-coverage |
| GET | /event | SSE stub | Emits compat events for session/message/pty updates only. | E2E: openapi-coverage, events |
| GET | /experimental/resource | Stubbed | Experimental endpoints return empty stubs. | E2E: openapi-coverage |
-| GET | /experimental/tool | Stubbed | Experimental endpoints return empty stubs. | E2E: openapi-coverage |
-| GET | /experimental/tool/ids | Stubbed | Experimental endpoints return empty stubs. | E2E: openapi-coverage |
+| GET | /experimental/tool | Derived | Returns MCP tool metadata for connected servers. | E2E: openapi-coverage, opencode-mcp |
+| GET | /experimental/tool/ids | Derived | Returns MCP tool IDs for connected servers. | E2E: openapi-coverage, opencode-mcp |
| DELETE | /experimental/worktree | Stubbed | Experimental endpoints return empty stubs. | E2E: openapi-coverage |
| GET | /experimental/worktree | Stubbed | Experimental endpoints return empty stubs. | E2E: openapi-coverage |
| POST | /experimental/worktree | Stubbed | Experimental endpoints return empty stubs. | E2E: openapi-coverage |
@@ -29,7 +29,7 @@ Authentication matches `/v1`: if a token is configured, requests must include `A
| GET | /find | Stubbed | Returns empty results. | E2E: openapi-coverage |
| GET | /find/file | Stubbed | Returns empty results. | E2E: openapi-coverage |
| GET | /find/symbol | Stubbed | Returns empty results. | E2E: openapi-coverage |
-| GET | /formatter | Stubbed | | E2E: openapi-coverage |
+| GET | /formatter | Derived | Scans workspace files to report formatter availability. | E2E: openapi-coverage, formatter-lsp |
| GET | /global/config | Stubbed | | E2E: openapi-coverage |
| PATCH | /global/config | Stubbed | | E2E: openapi-coverage |
| POST | /global/dispose | Stubbed | | E2E: openapi-coverage |
@@ -37,15 +37,15 @@ Authentication matches `/v1`: if a token is configured, requests must include `A
| GET | /global/health | Stubbed | | E2E: openapi-coverage |
| POST | /instance/dispose | Stubbed | | E2E: openapi-coverage |
| POST | /log | Stubbed | | E2E: openapi-coverage |
-| GET | /lsp | Stubbed | | E2E: openapi-coverage |
-| GET | /mcp | Stubbed | Returns disabled/needs_auth stubs. | E2E: openapi-coverage |
-| POST | /mcp | Stubbed | Returns disabled/needs_auth stubs. | E2E: openapi-coverage |
-| DELETE | /mcp/{name}/auth | Stubbed | Returns disabled/needs_auth stubs. | E2E: openapi-coverage |
-| POST | /mcp/{name}/auth | Stubbed | Returns disabled/needs_auth stubs. | E2E: openapi-coverage |
-| POST | /mcp/{name}/auth/authenticate | Stubbed | Returns disabled/needs_auth stubs. | E2E: openapi-coverage |
-| POST | /mcp/{name}/auth/callback | Stubbed | Returns disabled/needs_auth stubs. | E2E: openapi-coverage |
-| POST | /mcp/{name}/connect | Stubbed | Returns disabled/needs_auth stubs. | E2E: openapi-coverage |
-| POST | /mcp/{name}/disconnect | Stubbed | Returns disabled/needs_auth stubs. | E2E: openapi-coverage |
+| GET | /lsp | Derived | Reports LSP status per language based on workspace scan and PATH. | E2E: openapi-coverage, formatter-lsp |
+| GET | /mcp | Stateful | Lists MCP registry status. | E2E: openapi-coverage, opencode-mcp |
+| POST | /mcp | Stateful | Registers MCP servers and stores config. | E2E: openapi-coverage, opencode-mcp |
+| DELETE | /mcp/{name}/auth | Stateful | Clears MCP auth credentials. | E2E: openapi-coverage, opencode-mcp |
+| POST | /mcp/{name}/auth | Stateful | Starts MCP auth flow. | E2E: openapi-coverage, opencode-mcp |
+| POST | /mcp/{name}/auth/authenticate | Stateful | Returns MCP auth status. | E2E: openapi-coverage, opencode-mcp |
+| POST | /mcp/{name}/auth/callback | Stateful | Completes MCP auth and connects. | E2E: openapi-coverage, opencode-mcp |
+| POST | /mcp/{name}/connect | Stateful | Connects MCP server and loads tools. | E2E: openapi-coverage, opencode-mcp |
+| POST | /mcp/{name}/disconnect | Stateful | Disconnects MCP server and clears tools. | E2E: openapi-coverage, opencode-mcp |
| GET | /path | Derived stub | | E2E: openapi-coverage |
| GET | /permission | Stubbed | | E2E: openapi-coverage, permissions |
| POST | /permission/{requestID}/reply | Stubbed | | E2E: openapi-coverage, permissions |
@@ -105,4 +105,4 @@ Authentication matches `/v1`: if a token is configured, requests must include `A
| POST | /tui/select-session | Stubbed | Returns true/empty control payloads. | E2E: openapi-coverage |
| POST | /tui/show-toast | Stubbed | Returns true/empty control payloads. | E2E: openapi-coverage |
| POST | /tui/submit-prompt | Stubbed | Returns true/empty control payloads. | E2E: openapi-coverage |
-| GET | /vcs | Derived stub | | E2E: openapi-coverage |
\ No newline at end of file
+| GET | /vcs | Derived stub | | E2E: openapi-coverage |
diff --git a/docs/session-transcript-schema.mdx b/docs/session-transcript-schema.mdx
index a5a0158..fc39606 100644
--- a/docs/session-transcript-schema.mdx
+++ b/docs/session-transcript-schema.mdx
@@ -291,8 +291,9 @@ File reference with optional diff.
| Field | Type | Description |
|-------|------|-------------|
| `path` | string | File path |
-| `action` | string | `read`, `write`, `patch` |
+| `action` | string | `read`, `write`, `patch`, `rename`, `delete` |
| `diff` | string? | Unified diff (for patches) |
+| `target_path` | string? | Destination path for renames |
```json
{
diff --git a/frontend/packages/inspector/src/components/chat/renderContentPart.tsx b/frontend/packages/inspector/src/components/chat/renderContentPart.tsx
index ce61f9f..440de5c 100644
--- a/frontend/packages/inspector/src/components/chat/renderContentPart.tsx
+++ b/frontend/packages/inspector/src/components/chat/renderContentPart.tsx
@@ -44,11 +44,17 @@ const renderContentPart = (part: ContentPart, index: number) => {
);
}
case "file_ref": {
- const { path, action, diff } = part as { path: string; action: string; diff?: string | null };
+ const { path, action, diff, target_path } = part as {
+ path: string;
+ action: string;
+ diff?: string | null;
+ target_path?: string | null;
+ };
+ const displayPath = target_path && action === "rename" ? `${path} -> ${target_path}` : path;
return (
file - {action}
-
{path}
+
{displayPath}
{diff &&
{diff}}
);
diff --git a/sdks/typescript/src/generated/openapi.ts b/sdks/typescript/src/generated/openapi.ts
index 52816ad..c882705 100644
--- a/sdks/typescript/src/generated/openapi.ts
+++ b/sdks/typescript/src/generated/openapi.ts
@@ -131,6 +131,7 @@ export interface components {
action: components["schemas"]["FileAction"];
diff?: string | null;
path: string;
+ target_path?: string | null;
/** @enum {string} */
type: "file_ref";
}) | {
@@ -183,7 +184,7 @@ export interface components {
hasMore: boolean;
};
/** @enum {string} */
- FileAction: "read" | "write" | "patch";
+ FileAction: "read" | "write" | "patch" | "rename" | "delete";
HealthResponse: {
status: string;
};
diff --git a/server/packages/sandbox-agent/Cargo.toml b/server/packages/sandbox-agent/Cargo.toml
index 5f45ad0..32d6047 100644
--- a/server/packages/sandbox-agent/Cargo.toml
+++ b/server/packages/sandbox-agent/Cargo.toml
@@ -23,6 +23,7 @@ axum.workspace = true
clap.workspace = true
futures.workspace = true
reqwest.workspace = true
+regress.workspace = true
dirs.workspace = true
time.workspace = true
chrono.workspace = true
@@ -36,6 +37,7 @@ tracing-logfmt.workspace = true
tracing-subscriber.workspace = true
include_dir.workspace = true
base64.workspace = true
+portable-pty = "0.8"
tempfile = { workspace = true, optional = true }
[target.'cfg(unix)'.dependencies]
diff --git a/server/packages/sandbox-agent/src/formatter_lsp.rs b/server/packages/sandbox-agent/src/formatter_lsp.rs
new file mode 100644
index 0000000..99997fb
--- /dev/null
+++ b/server/packages/sandbox-agent/src/formatter_lsp.rs
@@ -0,0 +1,337 @@
+use std::collections::HashSet;
+use std::env;
+use std::ffi::OsStr;
+use std::path::{Path, PathBuf};
+
+use serde::Serialize;
+
+const MAX_SCAN_FILES: usize = 10_000;
+const MAX_SCAN_DEPTH: usize = 6;
+
+const IGNORE_DIRS: &[&str] = &[
+ ".git",
+ ".idea",
+ ".sandbox-agent",
+ ".venv",
+ ".vscode",
+ "build",
+ "dist",
+ "node_modules",
+ "target",
+ "venv",
+];
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct FormatterStatus {
+ pub name: String,
+ pub extensions: Vec,
+ pub enabled: bool,
+}
+
+#[derive(Debug, Clone, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct LspStatus {
+ pub id: String,
+ pub name: String,
+ pub root: String,
+ pub status: String,
+}
+
+#[derive(Debug, Clone)]
+pub struct FormatterService {
+ formatters: Vec,
+}
+
+#[derive(Debug, Clone)]
+pub struct LspRegistry {
+ servers: Vec,
+}
+
+#[derive(Debug, Clone)]
+struct FormatterDefinition {
+ name: &'static str,
+ extensions: &'static [&'static str],
+ config_files: &'static [&'static str],
+ binaries: &'static [&'static str],
+}
+
+#[derive(Debug, Clone)]
+struct LspDefinition {
+ id: &'static str,
+ name: &'static str,
+ extensions: &'static [&'static str],
+ binaries: &'static [&'static str],
+ #[allow(dead_code)]
+ capabilities: &'static [&'static str],
+}
+
+impl FormatterService {
+ pub fn new() -> Self {
+ Self {
+ formatters: vec![
+ FormatterDefinition {
+ name: "prettier",
+ extensions: &[
+ ".js", ".jsx", ".ts", ".tsx", ".json", ".css", ".scss", ".md", ".mdx",
+ ".yaml", ".yml", ".html",
+ ],
+ config_files: &[
+ ".prettierrc",
+ ".prettierrc.json",
+ ".prettierrc.yaml",
+ ".prettierrc.yml",
+ ".prettierrc.js",
+ ".prettierrc.cjs",
+ ".prettierrc.mjs",
+ "prettier.config.js",
+ "prettier.config.cjs",
+ "prettier.config.mjs",
+ ],
+ binaries: &["prettier"],
+ },
+ FormatterDefinition {
+ name: "rustfmt",
+ extensions: &[".rs"],
+ config_files: &["rustfmt.toml"],
+ binaries: &["rustfmt"],
+ },
+ FormatterDefinition {
+ name: "gofmt",
+ extensions: &[".go"],
+ config_files: &[],
+ binaries: &["gofmt"],
+ },
+ FormatterDefinition {
+ name: "black",
+ extensions: &[".py"],
+ config_files: &["pyproject.toml", "black.toml", "setup.cfg"],
+ binaries: &["black"],
+ },
+ FormatterDefinition {
+ name: "shfmt",
+ extensions: &[".sh", ".bash"],
+ config_files: &[],
+ binaries: &["shfmt"],
+ },
+ FormatterDefinition {
+ name: "stylua",
+ extensions: &[".lua"],
+ config_files: &["stylua.toml"],
+ binaries: &["stylua"],
+ },
+ ],
+ }
+ }
+
+ pub fn status_for_directory(&self, directory: &str) -> Vec {
+ let root = resolve_root(directory);
+ let scan = scan_workspace(root);
+ let mut entries = Vec::new();
+
+ for formatter in &self.formatters {
+ let has_extension = formatter
+ .extensions
+ .iter()
+ .any(|ext| scan.extensions.contains(&ext.to_ascii_lowercase()));
+ let has_config = formatter
+ .config_files
+ .iter()
+ .any(|name| scan.file_names.contains(&name.to_ascii_lowercase()));
+ if !has_extension && !has_config {
+ continue;
+ }
+ let enabled = has_binary_in_workspace(root, formatter.binaries);
+ entries.push(FormatterStatus {
+ name: formatter.name.to_string(),
+ extensions: formatter
+ .extensions
+ .iter()
+ .map(|ext| ext.to_string())
+ .collect(),
+ enabled,
+ });
+ }
+
+ entries
+ }
+}
+
+impl LspRegistry {
+ pub fn new() -> Self {
+ Self {
+ servers: vec![
+ LspDefinition {
+ id: "rust-analyzer",
+ name: "Rust Analyzer",
+ extensions: &[".rs"],
+ binaries: &["rust-analyzer"],
+ capabilities: &["completion", "diagnostics", "formatting"],
+ },
+ LspDefinition {
+ id: "typescript-language-server",
+ name: "TypeScript Language Server",
+ extensions: &[".ts", ".tsx", ".js", ".jsx"],
+ binaries: &["typescript-language-server", "tsserver"],
+ capabilities: &["completion", "diagnostics", "formatting"],
+ },
+ LspDefinition {
+ id: "pyright",
+ name: "Pyright",
+ extensions: &[".py"],
+ binaries: &["pyright-langserver", "pyright"],
+ capabilities: &["completion", "diagnostics"],
+ },
+ LspDefinition {
+ id: "gopls",
+ name: "gopls",
+ extensions: &[".go"],
+ binaries: &["gopls"],
+ capabilities: &["completion", "diagnostics", "formatting"],
+ },
+ ],
+ }
+ }
+
+ pub fn status_for_directory(&self, directory: &str) -> Vec {
+ let root = resolve_root(directory);
+ let scan = scan_workspace(root);
+ let mut entries = Vec::new();
+
+ for server in &self.servers {
+ let has_extension = server
+ .extensions
+ .iter()
+ .any(|ext| scan.extensions.contains(&ext.to_ascii_lowercase()));
+ if !has_extension {
+ continue;
+ }
+ let status = if has_binary_in_workspace(root, server.binaries) {
+ "connected"
+ } else {
+ "error"
+ };
+ entries.push(LspStatus {
+ id: server.id.to_string(),
+ name: server.name.to_string(),
+ root: root.to_string_lossy().to_string(),
+ status: status.to_string(),
+ });
+ }
+
+ entries
+ }
+}
+
+#[derive(Default)]
+struct WorkspaceScan {
+ extensions: HashSet,
+ file_names: HashSet,
+}
+
+fn resolve_root(directory: &str) -> PathBuf {
+ let root = PathBuf::from(directory);
+ if root.is_dir() {
+ return root;
+ }
+ env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
+}
+
+fn scan_workspace(root: &Path) -> WorkspaceScan {
+ let mut scan = WorkspaceScan::default();
+ let mut stack = Vec::new();
+ let mut files_seen = 0usize;
+ stack.push((root.to_path_buf(), 0usize));
+
+ while let Some((dir, depth)) = stack.pop() {
+ if depth > MAX_SCAN_DEPTH {
+ continue;
+ }
+ let entries = match std::fs::read_dir(&dir) {
+ Ok(entries) => entries,
+ Err(_) => continue,
+ };
+ for entry in entries.flatten() {
+ let path = entry.path();
+ let file_type = match entry.file_type() {
+ Ok(file_type) => file_type,
+ Err(_) => continue,
+ };
+ let name = entry.file_name();
+ if file_type.is_dir() {
+ if should_skip_dir(&name) {
+ continue;
+ }
+ stack.push((path, depth + 1));
+ } else if file_type.is_file() {
+ files_seen += 1;
+ if files_seen > MAX_SCAN_FILES {
+ return scan;
+ }
+ if let Some(extension) = path.extension().and_then(|ext| ext.to_str()) {
+ scan.extensions
+ .insert(format!(".{}", extension.to_ascii_lowercase()));
+ }
+ if let Some(name) = name.to_str() {
+ scan.file_names.insert(name.to_ascii_lowercase());
+ }
+ }
+ }
+ }
+
+ scan
+}
+
+fn should_skip_dir(name: &OsStr) -> bool {
+ let Some(name) = name.to_str() else {
+ return false;
+ };
+ let name = name.to_ascii_lowercase();
+ IGNORE_DIRS.iter().any(|dir| dir == &name)
+}
+
+fn has_binary_in_workspace(root: &Path, binaries: &[&str]) -> bool {
+ binaries
+ .iter()
+ .any(|binary| binary_exists_in_workspace(root, binary) || binary_exists_in_path(binary))
+}
+
+fn binary_exists_in_workspace(root: &Path, binary: &str) -> bool {
+ let bin_dir = root.join("node_modules").join(".bin");
+ if !bin_dir.is_dir() {
+ return false;
+ }
+ path_has_binary(&bin_dir, binary)
+}
+
+fn binary_exists_in_path(binary: &str) -> bool {
+ let Some(paths) = env::var_os("PATH") else {
+ return false;
+ };
+ for path in env::split_paths(&paths) {
+ if path_has_binary(&path, binary) {
+ return true;
+ }
+ }
+ false
+}
+
+fn path_has_binary(path: &Path, binary: &str) -> bool {
+ let candidate = path.join(binary);
+ if candidate.is_file() {
+ return true;
+ }
+ if cfg!(windows) {
+ if let Some(exts) = std::env::var_os("PATHEXT") {
+ for ext in std::env::split_paths(&exts) {
+ if let Some(ext_str) = ext.to_str() {
+ let candidate = path.join(format!("{}{}", binary, ext_str));
+ if candidate.is_file() {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ false
+}
diff --git a/server/packages/sandbox-agent/src/lib.rs b/server/packages/sandbox-agent/src/lib.rs
index 8c11343..8853f49 100644
--- a/server/packages/sandbox-agent/src/lib.rs
+++ b/server/packages/sandbox-agent/src/lib.rs
@@ -1,9 +1,12 @@
//! Sandbox agent core utilities.
mod agent_server_logs;
+pub mod formatter_lsp;
pub mod credentials;
pub mod opencode_compat;
+pub mod pty;
pub mod router;
+pub mod search;
pub mod server_logs;
pub mod telemetry;
pub mod ui;
diff --git a/server/packages/sandbox-agent/src/opencode_compat.rs b/server/packages/sandbox-agent/src/opencode_compat.rs
index 55b7050..eb86615 100644
--- a/server/packages/sandbox-agent/src/opencode_compat.rs
+++ b/server/packages/sandbox-agent/src/opencode_compat.rs
@@ -2,28 +2,38 @@
//!
//! These endpoints implement the full OpenCode OpenAPI surface. Most routes are
//! stubbed responses with deterministic helpers for snapshot testing. A minimal
-//! in-memory state tracks sessions/messages/ptys to keep behavior coherent.
+//! in-memory state tracks sessions/messages to keep behavior coherent.
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
use std::convert::Infallible;
+use std::fs;
+use std::path::Path as FsPath;
+use std::path::PathBuf;
use std::sync::atomic::{AtomicU64, Ordering};
-use std::sync::Arc;
+use std::sync::{Arc, Mutex as StdMutex};
use std::str::FromStr;
use axum::extract::{Path, Query, State};
+use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::http::{HeaderMap, StatusCode};
use axum::response::sse::{Event, KeepAlive};
use axum::response::{IntoResponse, Sse};
use axum::routing::{get, patch, post, put};
use axum::{Json, Router};
-use futures::stream;
+use futures::{stream, SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
+use tokio::process::Command;
use tokio::sync::{broadcast, Mutex};
use tokio::time::interval;
+use tokio_stream::wrappers::ReceiverStream;
use utoipa::{IntoParams, OpenApi, ToSchema};
-use crate::router::{AppState, CreateSessionRequest, PermissionReply};
+use crate::pty::{PtyCreateOptions, PtyEvent, PtyIo, PtyRecord, PtySizeSpec, PtyUpdateOptions};
+use crate::router::{
+ AppState, CreateSessionRequest, FileActionSnapshot, McpRegistryError, McpServerConfig,
+ PermissionReply, ToolCallSnapshot, ToolCallStatus,
+};
use sandbox_agent_error::SandboxError;
use sandbox_agent_agent_management::agents::AgentId;
use sandbox_agent_universal_agent_schema::{
@@ -41,6 +51,19 @@ const OPENCODE_PROVIDER_ID: &str = "sandbox-agent";
const OPENCODE_PROVIDER_NAME: &str = "Sandbox Agent";
const OPENCODE_DEFAULT_MODEL_ID: &str = "mock";
const OPENCODE_DEFAULT_AGENT_MODE: &str = "build";
+const FIND_MAX_RESULTS: usize = 200;
+const FIND_IGNORE_DIRS: &[&str] = &[
+ ".git",
+ ".idea",
+ ".sandbox-agent",
+ ".venv",
+ ".vscode",
+ "build",
+ "dist",
+ "node_modules",
+ "target",
+ "venv",
+];
#[derive(Clone, Debug)]
struct OpenCodeCompatConfig {
@@ -77,9 +100,22 @@ impl OpenCodeCompatConfig {
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
+
+ fn home_dir(&self) -> String {
+ self.fixed_home
+ .clone()
+ .or_else(|| std::env::var("HOME").ok())
+ .unwrap_or_else(|| "/".to_string())
+ }
+
+ fn state_dir(&self) -> String {
+ self.fixed_state
+ .clone()
+ .unwrap_or_else(|| format!("{}/.local/state/opencode", self.home_dir()))
+ }
}
-#[derive(Clone, Debug)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
struct OpenCodeSessionRecord {
id: String,
slug: String,
@@ -91,6 +127,8 @@ struct OpenCodeSessionRecord {
created_at: i64,
updated_at: i64,
share_url: Option,
+ #[serde(default)]
+ status: OpenCodeSessionStatus,
}
impl OpenCodeSessionRecord {
@@ -115,40 +153,46 @@ impl OpenCodeSessionRecord {
if let Some(url) = &self.share_url {
map.insert("share".to_string(), json!({"url": url}));
}
+ map.insert(
+ "status".to_string(),
+ json!({"type": self.status.status, "updated": self.status.updated_at}),
+ );
Value::Object(map)
}
}
+#[derive(Clone, Debug, Serialize, Deserialize, Default)]
+struct OpenCodeSessionStatus {
+ status: String,
+ updated_at: i64,
+}
+
+#[derive(Clone, Debug, Serialize, Deserialize, Default)]
+struct OpenCodePersistedState {
+ #[serde(default)]
+ default_project_id: String,
+ #[serde(default)]
+ next_session_id: u64,
+ #[serde(default)]
+ sessions: HashMap,
+}
+
+impl OpenCodePersistedState {
+ fn empty(default_project_id: String) -> Self {
+ Self {
+ default_project_id,
+ next_session_id: 1,
+ sessions: HashMap::new(),
+ }
+ }
+}
+
#[derive(Clone, Debug)]
struct OpenCodeMessageRecord {
info: Value,
parts: Vec,
}
-#[derive(Clone, Debug)]
-struct OpenCodePtyRecord {
- id: String,
- title: String,
- command: String,
- args: Vec,
- cwd: String,
- status: String,
- pid: i64,
-}
-
-impl OpenCodePtyRecord {
- fn to_value(&self) -> Value {
- json!({
- "id": self.id,
- "title": self.title,
- "command": self.command,
- "args": self.args,
- "cwd": self.cwd,
- "status": self.status,
- "pid": self.pid,
- })
- }
-}
#[derive(Clone, Debug)]
struct OpenCodePermissionRecord {
@@ -198,6 +242,12 @@ impl OpenCodeQuestionRecord {
}
}
+#[derive(Clone, Debug)]
+struct OpenCodeEventRecord {
+ sequence: u64,
+ payload: Value,
+}
+
#[derive(Default, Clone)]
struct OpenCodeSessionRuntime {
last_user_message_id: Option,
@@ -217,40 +267,84 @@ struct OpenCodeSessionRuntime {
pub struct OpenCodeState {
config: OpenCodeCompatConfig,
default_project_id: String,
+ session_store_path: PathBuf,
sessions: Mutex>,
messages: Mutex>>,
- ptys: Mutex>,
permissions: Mutex>,
questions: Mutex>,
session_runtime: Mutex>,
session_streams: Mutex>,
- event_broadcaster: broadcast::Sender,
+ event_log: StdMutex>,
+ event_sequence: AtomicU64,
+ event_broadcaster: broadcast::Sender,
}
impl OpenCodeState {
pub fn new() -> Self {
let (event_broadcaster, _) = broadcast::channel(256);
- let project_id = format!("proj_{}", PROJECT_COUNTER.fetch_add(1, Ordering::Relaxed));
+ let config = OpenCodeCompatConfig::from_env();
+ let state_dir = config.state_dir();
+ let session_store_path = PathBuf::from(state_dir).join("sessions.json");
+ let mut persisted = load_persisted_state(&session_store_path).unwrap_or_else(|| {
+ let project_id = format!("proj_{}", PROJECT_COUNTER.fetch_add(1, Ordering::Relaxed));
+ OpenCodePersistedState::empty(project_id)
+ });
+ if persisted.default_project_id.is_empty() {
+ persisted.default_project_id =
+ format!("proj_{}", PROJECT_COUNTER.fetch_add(1, Ordering::Relaxed));
+ }
+ for session in persisted.sessions.values_mut() {
+ if session.status.status.is_empty() {
+ session.status.status = "idle".to_string();
+ session.status.updated_at = session.updated_at;
+ }
+ }
+ let derived_next = next_session_id_from(&persisted.sessions);
+ if persisted.next_session_id < derived_next {
+ persisted.next_session_id = derived_next;
+ }
+ SESSION_COUNTER.store(persisted.next_session_id, Ordering::Relaxed);
Self {
- config: OpenCodeCompatConfig::from_env(),
- default_project_id: project_id,
- sessions: Mutex::new(HashMap::new()),
+ config,
+ default_project_id: persisted.default_project_id,
+ session_store_path,
+ sessions: Mutex::new(persisted.sessions),
messages: Mutex::new(HashMap::new()),
- ptys: Mutex::new(HashMap::new()),
permissions: Mutex::new(HashMap::new()),
questions: Mutex::new(HashMap::new()),
session_runtime: Mutex::new(HashMap::new()),
session_streams: Mutex::new(HashMap::new()),
+ event_log: StdMutex::new(Vec::new()),
+ event_sequence: AtomicU64::new(0),
event_broadcaster,
}
}
- pub fn subscribe(&self) -> broadcast::Receiver {
+ pub fn subscribe(&self) -> broadcast::Receiver {
self.event_broadcaster.subscribe()
}
pub fn emit_event(&self, event: Value) {
- let _ = self.event_broadcaster.send(event);
+ let sequence = self.event_sequence.fetch_add(1, Ordering::Relaxed) + 1;
+ let record = OpenCodeEventRecord {
+ sequence,
+ payload: event,
+ };
+ if let Ok(mut events) = self.event_log.lock() {
+ events.push(record.clone());
+ }
+ let _ = self.event_broadcaster.send(record);
+ }
+
+ pub fn events_since(&self, offset: u64) -> Vec {
+ let Ok(events) = self.event_log.lock() else {
+ return Vec::new();
+ };
+ events
+ .iter()
+ .filter(|record| record.sequence > offset)
+ .cloned()
+ .collect()
}
fn now_ms(&self) -> i64 {
@@ -289,18 +383,87 @@ impl OpenCodeState {
}
fn home_dir(&self) -> String {
- self.config
- .fixed_home
- .clone()
- .or_else(|| std::env::var("HOME").ok())
- .unwrap_or_else(|| "/".to_string())
+ self.config.home_dir()
}
fn state_dir(&self) -> String {
- self.config
- .fixed_state
- .clone()
- .unwrap_or_else(|| format!("{}/.local/state/opencode", self.home_dir()))
+ self.config.state_dir()
+ }
+
+ async fn persist_sessions(&self) {
+ let sessions = self.sessions.lock().await;
+ let persisted = OpenCodePersistedState {
+ default_project_id: self.default_project_id.clone(),
+ next_session_id: SESSION_COUNTER.load(Ordering::Relaxed),
+ sessions: sessions.clone(),
+ };
+ drop(sessions);
+ let path = self.session_store_path.clone();
+ let payload = match serde_json::to_vec_pretty(&persisted) {
+ Ok(value) => value,
+ Err(err) => {
+ tracing::warn!(
+ target = "sandbox_agent::opencode",
+ ?err,
+ "failed to serialize session store"
+ );
+ return;
+ }
+ };
+ let result = tokio::task::spawn_blocking(move || {
+ if let Some(parent) = path.parent() {
+ fs::create_dir_all(parent)?;
+ }
+ fs::write(&path, payload)
+ })
+ .await;
+ match result {
+ Ok(Ok(())) => {}
+ Ok(Err(err)) => {
+ tracing::warn!(
+ target = "sandbox_agent::opencode",
+ ?err,
+ "failed to persist session store"
+ );
+ }
+ Err(err) => {
+ tracing::warn!(
+ target = "sandbox_agent::opencode",
+ ?err,
+ "failed to persist session store"
+ );
+ }
+ }
+ }
+
+ async fn mutate_session(
+ &self,
+ session_id: &str,
+ update: impl FnOnce(&mut OpenCodeSessionRecord),
+ ) -> Option {
+ let mut sessions = self.sessions.lock().await;
+ let session = sessions.get_mut(session_id)?;
+ update(session);
+ Some(session.clone())
+ }
+
+ async fn update_session_status(
+ &self,
+ session_id: &str,
+ status: &str,
+ ) -> Option {
+ let now = self.now_ms();
+ let updated = self
+ .mutate_session(session_id, |session| {
+ session.status.status = status.to_string();
+ session.status.updated_at = now;
+ session.updated_at = now;
+ })
+ .await;
+ if updated.is_some() {
+ self.persist_sessions().await;
+ }
+ updated
}
async fn ensure_session(&self, session_id: &str, directory: String) -> Value {
@@ -321,12 +484,17 @@ impl OpenCodeState {
created_at: now,
updated_at: now,
share_url: None,
+ status: OpenCodeSessionStatus {
+ status: "idle".to_string(),
+ updated_at: now,
+ },
};
let value = record.to_value();
sessions.insert(session_id.to_string(), record);
drop(sessions);
self.emit_event(session_event("session.created", &value));
+ self.persist_sessions().await;
value
}
@@ -366,10 +534,12 @@ pub struct OpenCodeAppState {
impl OpenCodeAppState {
pub fn new(inner: Arc) -> Arc {
- Arc::new(Self {
+ let state = Arc::new(Self {
inner,
opencode: Arc::new(OpenCodeState::new()),
- })
+ });
+ spawn_pty_event_forwarder(state.clone());
+ state
}
}
@@ -466,6 +636,12 @@ struct DirectoryQuery {
directory: Option,
}
+#[derive(Debug, Deserialize, IntoParams)]
+struct EventStreamQuery {
+ directory: Option,
+ offset: Option,
+}
+
#[derive(Debug, Deserialize, IntoParams)]
struct ToolQuery {
directory: Option,
@@ -473,22 +649,43 @@ struct ToolQuery {
model: Option,
}
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+struct OpenCodeMcpRegisterRequest {
+ name: String,
+ config: McpServerConfig,
+}
+
+#[derive(Debug, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+struct OpenCodeMcpAuthCallbackRequest {
+ code: String,
+}
+
#[derive(Debug, Deserialize, IntoParams)]
struct FindTextQuery {
directory: Option,
pattern: Option,
+ #[serde(rename = "caseSensitive")]
+ case_sensitive: Option,
+ limit: Option,
}
#[derive(Debug, Deserialize, IntoParams)]
struct FindFilesQuery {
directory: Option,
query: Option,
+ dirs: Option,
+ #[serde(rename = "type")]
+ entry_type: Option,
+ limit: Option,
}
#[derive(Debug, Deserialize, IntoParams)]
struct FindSymbolsQuery {
directory: Option,
query: Option,
+ limit: Option,
}
#[derive(Debug, Deserialize, IntoParams)]
@@ -568,6 +765,21 @@ struct PtyCreateRequest {
args: Option>,
cwd: Option,
title: Option,
+ env: Option>,
+}
+
+#[derive(Debug, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+struct PtySizeRequest {
+ rows: u16,
+ cols: u16,
+}
+
+#[derive(Debug, Serialize, Deserialize, ToSchema)]
+#[serde(rename_all = "camelCase")]
+struct PtyUpdateRequest {
+ title: Option,
+ size: Option,
}
fn next_id(prefix: &str, counter: &AtomicU64) -> String {
@@ -575,6 +787,44 @@ fn next_id(prefix: &str, counter: &AtomicU64) -> String {
format!("{}{}", prefix, id)
}
+fn next_session_id_from(sessions: &HashMap) -> u64 {
+ let mut max_id = 0;
+ for session_id in sessions.keys() {
+ if let Some(raw) = session_id.strip_prefix("ses_") {
+ if let Ok(value) = raw.parse::() {
+ if value > max_id {
+ max_id = value;
+ }
+ }
+ }
+ }
+ if max_id == 0 {
+ 1
+ } else {
+ max_id + 1
+ }
+}
+
+fn bump_version(version: &str) -> String {
+ let parsed = version.parse::().unwrap_or(0);
+ (parsed + 1).to_string()
+}
+
+fn load_persisted_state(path: &FsPath) -> Option {
+ let contents = fs::read_to_string(path).ok()?;
+ match serde_json::from_str::(&contents) {
+ Ok(state) => Some(state),
+ Err(err) => {
+ tracing::warn!(
+ target = "sandbox_agent::opencode",
+ ?err,
+ "failed to parse session store"
+ );
+ None
+ }
+ }
+}
+
fn available_agent_ids() -> Vec {
vec![
AgentId::Claude,
@@ -769,6 +1019,14 @@ fn sandbox_error_response(err: SandboxError) -> (StatusCode, Json) {
}
}
+fn mcp_error_response(err: McpRegistryError) -> (StatusCode, Json) {
+ match err {
+ McpRegistryError::NotFound => not_found("MCP server not found"),
+ McpRegistryError::Invalid(message) => bad_request(&message),
+ McpRegistryError::Transport(message) => internal_error(&message),
+ }
+}
+
fn parse_permission_reply_value(value: Option<&str>) -> Result {
let value = value.unwrap_or("once").to_ascii_lowercase();
match value.as_str() {
@@ -783,6 +1041,40 @@ fn bool_ok(value: bool) -> (StatusCode, Json) {
(StatusCode::OK, Json(json!(value)))
}
+fn pty_to_value(pty: &PtyRecord) -> Value {
+ json!({
+ "id": pty.id,
+ "title": pty.title,
+ "command": pty.command,
+ "args": pty.args,
+ "cwd": pty.cwd,
+ "status": pty.status.as_str(),
+ "pid": pty.pid,
+ })
+}
+
+fn spawn_pty_event_forwarder(state: Arc) {
+ let mut receiver = state
+ .inner
+ .session_manager()
+ .pty_manager()
+ .subscribe();
+ tokio::spawn(async move {
+ loop {
+ match receiver.recv().await {
+ Ok(PtyEvent::Exited { id, exit_code }) => {
+ state.opencode.emit_event(json!({
+ "type": "pty.exited",
+ "properties": {"id": id, "exitCode": exit_code}
+ }));
+ }
+ Err(broadcast::error::RecvError::Lagged(_)) => continue,
+ Err(broadcast::error::RecvError::Closed) => break,
+ }
+ }
+ });
+}
+
fn build_user_message(
session_id: &str,
message_id: &str,
@@ -1233,7 +1525,7 @@ struct ToolContentInfo {
tool_name: Option,
arguments: Option,
output: Option,
- file_refs: Vec<(String, FileAction, Option)>,
+ file_refs: Vec<(String, FileAction, Option, Option)>,
}
fn extract_tool_content(parts: &[ContentPart]) -> ToolContentInfo {
@@ -1253,9 +1545,18 @@ fn extract_tool_content(parts: &[ContentPart]) -> ToolContentInfo {
info.call_id = Some(call_id.clone());
info.output = Some(output.clone());
}
- ContentPart::FileRef { path, action, diff } => {
- info.file_refs
- .push((path.clone(), action.clone(), diff.clone()));
+ ContentPart::FileRef {
+ path,
+ action,
+ diff,
+ target_path,
+ } => {
+ info.file_refs.push((
+ path.clone(),
+ action.clone(),
+ diff.clone(),
+ target_path.clone(),
+ ));
}
_ => {}
}
@@ -1275,6 +1576,105 @@ fn tool_input_from_arguments(arguments: Option<&str>) -> Value {
json!({ "arguments": arguments })
}
+async fn tool_call_snapshot(
+ state: &OpenCodeAppState,
+ session_id: &str,
+ call_id: &str,
+) -> Option {
+ state
+ .inner
+ .session_manager()
+ .tool_call_snapshot(session_id, call_id)
+ .await
+}
+
+async fn file_actions_for_event(
+ state: &OpenCodeAppState,
+ session_id: &str,
+ sequence: u64,
+) -> Vec {
+ state
+ .inner
+ .session_manager()
+ .file_actions_for_event(session_id, sequence)
+ .await
+}
+
+fn tool_state_from_snapshot(
+ snapshot: Option<&ToolCallSnapshot>,
+ fallback_name: &str,
+ fallback_arguments: Option<&str>,
+ fallback_output: Option<&str>,
+ attachments: Vec,
+ now: i64,
+) -> (String, Value) {
+ let tool_name = snapshot
+ .and_then(|state| state.name.clone())
+ .unwrap_or_else(|| fallback_name.to_string());
+ let arguments = snapshot
+ .and_then(|state| state.arguments.clone())
+ .or_else(|| fallback_arguments.map(|value| value.to_string()));
+ let raw_args = arguments.clone().unwrap_or_default();
+ let input_value = tool_input_from_arguments(arguments.as_deref());
+ let output = snapshot
+ .and_then(|state| state.output.clone())
+ .or_else(|| fallback_output.map(|value| value.to_string()));
+ let status = snapshot.map(|state| &state.status);
+
+ let state_value = match status {
+ Some(ToolCallStatus::Result) => json!({
+ "status": "completed",
+ "input": input_value,
+ "output": output.unwrap_or_default(),
+ "title": "Tool result",
+ "metadata": {},
+ "time": {"start": now, "end": now},
+ "attachments": attachments,
+ }),
+ Some(ToolCallStatus::Failed) => json!({
+ "status": "error",
+ "input": input_value,
+ "error": output.unwrap_or_else(|| "Tool failed".to_string()),
+ "metadata": {},
+ "time": {"start": now, "end": now},
+ }),
+ Some(ToolCallStatus::Completed)
+ | Some(ToolCallStatus::Running)
+ | Some(ToolCallStatus::Delta) => json!({
+ "status": "running",
+ "input": input_value,
+ "time": {"start": now},
+ }),
+ Some(ToolCallStatus::Started) | None => json!({
+ "status": "pending",
+ "input": input_value,
+ "raw": raw_args,
+ }),
+ };
+
+ (tool_name, state_value)
+}
+
+fn file_action_applied(
+ file_actions: &[FileActionSnapshot],
+ path: &str,
+ action: &FileAction,
+ target_path: Option<&str>,
+ diff: Option<&str>,
+) -> bool {
+ file_actions.iter().any(|record| {
+ let diff_matches = match diff {
+ Some(value) => record.diff.as_deref() == Some(value),
+ None => true,
+ };
+ record.action == *action
+ && record.path == path
+ && record.target_path.as_deref() == target_path
+ && diff_matches
+ && record.applied
+ })
+}
+
fn patterns_from_metadata(metadata: &Option) -> Vec {
let mut patterns = Vec::new();
let Some(metadata) = metadata else {
@@ -1334,6 +1734,10 @@ async fn apply_universal_event(state: Arc, event: UniversalEve
"type": "session.idle",
"properties": {"sessionID": event.session_id}
}));
+ let _ = state
+ .opencode
+ .update_session_status(&event.session_id, "idle")
+ .await;
}
UniversalEventType::PermissionRequested | UniversalEventType::PermissionResolved => {
if let UniversalEventData::Permission(permission) = &event.data {
@@ -1347,6 +1751,13 @@ async fn apply_universal_event(state: Arc, event: UniversalEve
}
UniversalEventType::Error => {
if let UniversalEventData::Error(error) = &event.data {
+ state.opencode.emit_event(json!({
+ "type": "session.status",
+ "properties": {
+ "sessionID": event.session_id,
+ "status": {"type": "error"}
+ }
+ }));
state.opencode.emit_event(json!({
"type": "session.error",
"properties": {
@@ -1358,6 +1769,10 @@ async fn apply_universal_event(state: Arc, event: UniversalEve
}
}
}));
+ let _ = state
+ .opencode
+ .update_session_status(&event.session_id, "error")
+ .await;
}
}
_ => {}
@@ -1578,6 +1993,20 @@ async fn apply_item_event(
state
.opencode
.emit_event(message_event("message.updated", &info));
+ if event.event_type == UniversalEventType::ItemCompleted {
+ state.opencode.emit_event(json!({
+ "type": "session.status",
+ "properties": {"sessionID": session_id, "status": {"type": "idle"}}
+ }));
+ state.opencode.emit_event(json!({
+ "type": "session.idle",
+ "properties": {"sessionID": session_id}
+ }));
+ let _ = state
+ .opencode
+ .update_session_status(&session_id, "idle")
+ .await;
+ }
let mut runtime = state
.opencode
@@ -1613,6 +2042,8 @@ async fn apply_item_event(
.await;
}
+ let file_actions = file_actions_for_event(&state, &session_id, event.sequence).await;
+
for part in item.content.iter() {
match part {
ContentPart::Reasoning { text, .. } => {
@@ -1635,13 +2066,23 @@ async fn apply_item_event(
.entry(call_id.clone())
.or_insert_with(|| next_id("part_", &PART_COUNTER))
.clone();
- let state_value = json!({
- "status": "pending",
- "input": {"arguments": arguments},
- "raw": arguments,
- });
- let tool_part =
- build_tool_part(&session_id, &message_id, &part_id, call_id, name, state_value);
+ let snapshot = tool_call_snapshot(&state, &session_id, call_id).await;
+ let (tool_name, state_value) = tool_state_from_snapshot(
+ snapshot.as_ref(),
+ name,
+ Some(arguments),
+ None,
+ Vec::new(),
+ now,
+ );
+ let tool_part = build_tool_part(
+ &session_id,
+ &message_id,
+ &part_id,
+ call_id,
+ &tool_name,
+ state_value,
+ );
upsert_message_part(&state.opencode, &session_id, &message_id, tool_part.clone())
.await;
state
@@ -1665,21 +2106,21 @@ async fn apply_item_event(
.entry(call_id.clone())
.or_insert_with(|| next_id("part_", &PART_COUNTER))
.clone();
- let state_value = json!({
- "status": "completed",
- "input": {},
- "output": output,
- "title": "Tool result",
- "metadata": {},
- "time": {"start": now, "end": now},
- "attachments": [],
- });
+ let snapshot = tool_call_snapshot(&state, &session_id, call_id).await;
+ let (tool_name, state_value) = tool_state_from_snapshot(
+ snapshot.as_ref(),
+ "tool",
+ None,
+ Some(output),
+ Vec::new(),
+ now,
+ );
let tool_part = build_tool_part(
&session_id,
&message_id,
&part_id,
call_id,
- "tool",
+ &tool_name,
state_value,
);
upsert_message_part(&state.opencode, &session_id, &message_id, tool_part.clone())
@@ -1699,19 +2140,39 @@ async fn apply_item_event(
})
.await;
}
- ContentPart::FileRef { path, action, diff } => {
+ ContentPart::FileRef {
+ path,
+ action,
+ diff,
+ target_path,
+ } => {
let mime = match action {
FileAction::Patch => "text/x-diff",
_ => "text/plain",
};
- let part =
- build_file_part_from_path(&session_id, &message_id, path, mime, diff.as_deref());
+ let display_path = target_path.as_deref().unwrap_or(path);
+ let part = build_file_part_from_path(
+ &session_id,
+ &message_id,
+ display_path,
+ mime,
+ diff.as_deref(),
+ );
upsert_message_part(&state.opencode, &session_id, &message_id, part.clone()).await;
state
.opencode
.emit_event(part_event("message.part.updated", &part));
- if matches!(action, FileAction::Write | FileAction::Patch) {
- emit_file_edited(&state.opencode, path);
+ if matches!(
+ action,
+ FileAction::Write | FileAction::Patch | FileAction::Rename | FileAction::Delete
+ ) && file_action_applied(
+ &file_actions,
+ path,
+ action,
+ target_path.as_deref(),
+ diff.as_deref(),
+ ) {
+ emit_file_edited(&state.opencode, display_path);
}
}
ContentPart::Image { path, mime } => {
@@ -1845,22 +2306,38 @@ async fn apply_tool_item_event(
.opencode
.emit_event(message_event("message.updated", &info));
+ let file_actions = file_actions_for_event(&state, &session_id, event.sequence).await;
let mut attachments = Vec::new();
if item.kind == ItemKind::ToolResult && event.event_type == UniversalEventType::ItemCompleted {
- for (path, action, diff) in tool_info.file_refs.iter() {
+ for (path, action, diff, target_path) in tool_info.file_refs.iter() {
let mime = match action {
FileAction::Patch => "text/x-diff",
_ => "text/plain",
};
- let part =
- build_file_part_from_path(&session_id, &message_id, path, mime, diff.as_deref());
+ let display_path = target_path.as_deref().unwrap_or(path);
+ let part = build_file_part_from_path(
+ &session_id,
+ &message_id,
+ display_path,
+ mime,
+ diff.as_deref(),
+ );
upsert_message_part(&state.opencode, &session_id, &message_id, part.clone()).await;
state
.opencode
.emit_event(part_event("message.part.updated", &part));
attachments.push(part.clone());
- if matches!(action, FileAction::Write | FileAction::Patch) {
- emit_file_edited(&state.opencode, path);
+ if matches!(
+ action,
+ FileAction::Write | FileAction::Patch | FileAction::Rename | FileAction::Delete
+ ) && file_action_applied(
+ &file_actions,
+ path,
+ action,
+ target_path.as_deref(),
+ diff.as_deref(),
+ ) {
+ emit_file_edited(&state.opencode, display_path);
}
}
}
@@ -1870,68 +2347,22 @@ async fn apply_tool_item_event(
.get(&call_id)
.cloned()
.unwrap_or_else(|| next_id("part_", &PART_COUNTER));
- let tool_name = tool_info
- .tool_name
- .clone()
- .unwrap_or_else(|| "tool".to_string());
- let input_value = tool_input_from_arguments(tool_info.arguments.as_deref());
- let raw_args = tool_info.arguments.clone().unwrap_or_default();
- let output_value = tool_info
+ let snapshot = tool_call_snapshot(&state, &session_id, &call_id).await;
+ let output_fallback = tool_info
.output
.clone()
.or_else(|| extract_text_from_content(&item.content));
-
- let state_value = match event.event_type {
- UniversalEventType::ItemStarted => {
- if item.kind == ItemKind::ToolResult {
- json!({
- "status": "running",
- "input": input_value,
- "time": {"start": now}
- })
- } else {
- json!({
- "status": "pending",
- "input": input_value,
- "raw": raw_args,
- })
- }
- }
- UniversalEventType::ItemCompleted => {
- if item.kind == ItemKind::ToolResult {
- if matches!(item.status, ItemStatus::Failed) {
- json!({
- "status": "error",
- "input": input_value,
- "error": output_value.unwrap_or_else(|| "Tool failed".to_string()),
- "metadata": {},
- "time": {"start": now, "end": now},
- })
- } else {
- json!({
- "status": "completed",
- "input": input_value,
- "output": output_value.unwrap_or_default(),
- "title": "Tool result",
- "metadata": {},
- "time": {"start": now, "end": now},
- "attachments": attachments,
- })
- }
- } else {
- json!({
- "status": "running",
- "input": input_value,
- "time": {"start": now},
- })
- }
- }
- _ => json!({
- "status": "pending",
- "input": input_value,
- "raw": raw_args,
- }),
- };
+ let (tool_name, state_value) = tool_state_from_snapshot(
+ snapshot.as_ref(),
+ tool_info
+ .tool_name
+ .as_deref()
+ .unwrap_or("tool"),
+ tool_info.arguments.as_deref(),
+ output_fallback.as_deref(),
+ attachments,
+ now,
+ );
let tool_part = build_tool_part(
&session_id,
@@ -2297,9 +2728,20 @@ async fn oc_config_providers() -> impl IntoResponse {
async fn oc_event_subscribe(
State(state): State>,
headers: HeaderMap,
- Query(query): Query,
+ Query(query): Query,
) -> Sse>> {
let receiver = state.opencode.subscribe();
+ let offset = query
+ .offset
+ .or_else(|| {
+ headers
+ .get("last-event-id")
+ .and_then(|value| value.to_str().ok())
+ .and_then(|value| value.parse::().ok())
+ })
+ .unwrap_or(0);
+ let mut pending_events: VecDeque =
+ state.opencode.events_since(offset).into();
let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
let branch = state.opencode.branch_name();
state.opencode.emit_event(json!({
@@ -2318,33 +2760,59 @@ async fn oc_event_subscribe(
"type": "server.heartbeat",
"properties": {}
});
- let stream = stream::unfold((receiver, interval(std::time::Duration::from_secs(30))), move |(mut rx, mut ticker)| {
- let heartbeat = heartbeat_payload.clone();
- async move {
- tokio::select! {
- _ = ticker.tick() => {
- let sse_event = Event::default()
- .json_data(&heartbeat)
- .unwrap_or_else(|_| Event::default().data("{}"));
- Some((Ok(sse_event), (rx, ticker)))
- }
- event = rx.recv() => {
- match event {
- Ok(event) => {
+ let opencode = state.opencode.clone();
+ let stream = stream::unfold(
+ (
+ receiver,
+ pending_events,
+ offset,
+ interval(std::time::Duration::from_secs(30)),
+ ),
+ move |(mut rx, mut pending, mut last_sequence, mut ticker)| {
+ let heartbeat = heartbeat_payload.clone();
+ let opencode = opencode.clone();
+ async move {
+ loop {
+ if let Some(record) = pending.pop_front() {
+ last_sequence = record.sequence;
+ let sse_event = Event::default()
+ .id(record.sequence.to_string())
+ .json_data(&record.payload)
+ .unwrap_or_else(|_| Event::default().data("{}"));
+ return Some((Ok(sse_event), (rx, pending, last_sequence, ticker)));
+ }
+ tokio::select! {
+ _ = ticker.tick() => {
let sse_event = Event::default()
- .json_data(&event)
+ .json_data(&heartbeat)
.unwrap_or_else(|_| Event::default().data("{}"));
- Some((Ok(sse_event), (rx, ticker)))
+ return Some((Ok(sse_event), (rx, pending, last_sequence, ticker)));
}
- Err(broadcast::error::RecvError::Lagged(_)) => {
- Some((Ok(Event::default().comment("lagged")), (rx, ticker)))
+ event = rx.recv() => {
+ match event {
+ Ok(record) => {
+ if record.sequence <= last_sequence {
+ continue;
+ }
+ last_sequence = record.sequence;
+ let sse_event = Event::default()
+ .id(record.sequence.to_string())
+ .json_data(&record.payload)
+ .unwrap_or_else(|_| Event::default().data("{}"));
+ return Some((Ok(sse_event), (rx, pending, last_sequence, ticker)));
+ }
+ Err(broadcast::error::RecvError::Lagged(_)) => {
+ pending = opencode.events_since(last_sequence).into();
+ continue;
+ }
+ Err(broadcast::error::RecvError::Closed) => return None,
+ }
}
- Err(broadcast::error::RecvError::Closed) => None,
}
}
}
- }
- });
+ },
+ );
Sse::new(stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
}
@@ -2358,9 +2826,20 @@ async fn oc_event_subscribe(
async fn oc_global_event(
State(state): State>,
headers: HeaderMap,
- Query(query): Query,
+ Query(query): Query,
) -> Sse>> {
let receiver = state.opencode.subscribe();
+ let offset = query
+ .offset
+ .or_else(|| {
+ headers
+ .get("last-event-id")
+ .and_then(|value| value.to_str().ok())
+ .and_then(|value| value.parse::().ok())
+ })
+ .unwrap_or(0);
+ let mut pending_events: VecDeque =
+ state.opencode.events_since(offset).into();
let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
let branch = state.opencode.branch_name();
state.opencode.emit_event(json!({
@@ -2381,35 +2860,62 @@ async fn oc_global_event(
"properties": {}
}
});
- let stream = stream::unfold((receiver, interval(std::time::Duration::from_secs(30))), move |(mut rx, mut ticker)| {
- let directory = directory.clone();
- let heartbeat = heartbeat_payload.clone();
- async move {
- tokio::select! {
- _ = ticker.tick() => {
- let sse_event = Event::default()
- .json_data(&heartbeat)
- .unwrap_or_else(|_| Event::default().data("{}"));
- Some((Ok(sse_event), (rx, ticker)))
- }
- event = rx.recv() => {
- match event {
- Ok(event) => {
- let payload = json!({"directory": directory, "payload": event});
+ let opencode = state.opencode.clone();
+ let stream = stream::unfold(
+ (
+ receiver,
+ pending_events,
+ offset,
+ interval(std::time::Duration::from_secs(30)),
+ ),
+ move |(mut rx, mut pending, mut last_sequence, mut ticker)| {
+ let directory = directory.clone();
+ let heartbeat = heartbeat_payload.clone();
+ let opencode = opencode.clone();
+ async move {
+ loop {
+ if let Some(record) = pending.pop_front() {
+ last_sequence = record.sequence;
+ let payload = json!({"directory": directory, "payload": record.payload});
+ let sse_event = Event::default()
+ .id(record.sequence.to_string())
+ .json_data(&payload)
+ .unwrap_or_else(|_| Event::default().data("{}"));
+ return Some((Ok(sse_event), (rx, pending, last_sequence, ticker)));
+ }
+ tokio::select! {
+ _ = ticker.tick() => {
let sse_event = Event::default()
- .json_data(&payload)
+ .json_data(&heartbeat)
.unwrap_or_else(|_| Event::default().data("{}"));
- Some((Ok(sse_event), (rx, ticker)))
+ return Some((Ok(sse_event), (rx, pending, last_sequence, ticker)));
}
- Err(broadcast::error::RecvError::Lagged(_)) => {
- Some((Ok(Event::default().comment("lagged")), (rx, ticker)))
+ event = rx.recv() => {
+ match event {
+ Ok(record) => {
+ if record.sequence <= last_sequence {
+ continue;
+ }
+ last_sequence = record.sequence;
+ let payload = json!({"directory": directory, "payload": record.payload});
+ let sse_event = Event::default()
+ .id(record.sequence.to_string())
+ .json_data(&payload)
+ .unwrap_or_else(|_| Event::default().data("{}"));
+ return Some((Ok(sse_event), (rx, pending, last_sequence, ticker)));
+ }
+ Err(broadcast::error::RecvError::Lagged(_)) => {
+ pending = opencode.events_since(last_sequence).into();
+ continue;
+ }
+ Err(broadcast::error::RecvError::Closed) => return None,
+ }
}
- Err(broadcast::error::RecvError::Closed) => None,
}
}
}
- }
- });
+ },
+ );
Sse::new(stream).keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
}
@@ -2487,8 +2993,14 @@ async fn oc_log() -> impl IntoResponse {
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_lsp_status() -> impl IntoResponse {
- (StatusCode::OK, Json(json!([])))
+async fn oc_lsp_status(
+ State(state): State>,
+ headers: HeaderMap,
+ Query(query): Query,
+) -> impl IntoResponse {
+ let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
+ let status = state.inner.session_manager().lsp_status(&directory);
+ (StatusCode::OK, Json(status))
}
#[utoipa::path(
@@ -2497,8 +3009,14 @@ async fn oc_lsp_status() -> impl IntoResponse {
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_formatter_status() -> impl IntoResponse {
- (StatusCode::OK, Json(json!([])))
+async fn oc_formatter_status(
+ State(state): State>,
+ headers: HeaderMap,
+ Query(query): Query,
+) -> impl IntoResponse {
+ let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
+ let status = state.inner.session_manager().formatter_status(&directory);
+ (StatusCode::OK, Json(status))
}
#[utoipa::path(
@@ -2631,6 +3149,10 @@ async fn oc_session_create(
created_at: now,
updated_at: now,
share_url: None,
+ status: OpenCodeSessionStatus {
+ status: "idle".to_string(),
+ updated_at: now,
+ },
};
let session_value = record.to_value();
@@ -2642,6 +3164,7 @@ async fn oc_session_create(
state
.opencode
.emit_event(session_event("session.created", &session_value));
+ state.opencode.persist_sessions().await;
(StatusCode::OK, Json(session_value))
}
@@ -2654,8 +3177,15 @@ async fn oc_session_create(
)]
async fn oc_session_list(State(state): State>) -> impl IntoResponse {
let sessions = state.opencode.sessions.lock().await;
- let values: Vec = sessions.values().map(|s| s.to_value()).collect();
- (StatusCode::OK, Json(json!(values)))
+ let mut values: Vec = sessions.values().cloned().collect();
+ drop(sessions);
+ values.sort_by(|a, b| {
+ a.created_at
+ .cmp(&b.created_at)
+ .then_with(|| a.id.cmp(&b.id))
+ });
+ let response: Vec = values.into_iter().map(|s| s.to_value()).collect();
+ (StatusCode::OK, Json(json!(response)))
}
#[utoipa::path(
@@ -2691,16 +3221,22 @@ async fn oc_session_update(
Path(session_id): Path,
Json(body): Json,
) -> impl IntoResponse {
- let mut sessions = state.opencode.sessions.lock().await;
- if let Some(session) = sessions.get_mut(&session_id) {
- if let Some(title) = body.title {
- session.title = title;
- session.updated_at = state.opencode.now_ms();
- }
+ let now = state.opencode.now_ms();
+ let updated = state
+ .opencode
+ .mutate_session(&session_id, |session| {
+ if let Some(title) = body.title {
+ session.title = title;
+ session.updated_at = now;
+ }
+ })
+ .await;
+ if let Some(session) = updated {
let value = session.to_value();
state
.opencode
.emit_event(session_event("session.updated", &value));
+ state.opencode.persist_sessions().await;
return (StatusCode::OK, Json(value)).into_response();
}
not_found("Session not found").into_response()
@@ -2719,9 +3255,20 @@ async fn oc_session_delete(
) -> impl IntoResponse {
let mut sessions = state.opencode.sessions.lock().await;
if let Some(session) = sessions.remove(&session_id) {
+ drop(sessions);
+ let mut messages = state.opencode.messages.lock().await;
+ messages.remove(&session_id);
+ drop(messages);
+ let mut runtimes = state.opencode.session_runtime.lock().await;
+ runtimes.remove(&session_id);
+ drop(runtimes);
+ let mut streams = state.opencode.session_streams.lock().await;
+ streams.remove(&session_id);
+ drop(streams);
state
.opencode
.emit_event(session_event("session.deleted", &session.to_value()));
+ state.opencode.persist_sessions().await;
return bool_ok(true).into_response();
}
not_found("Session not found").into_response()
@@ -2736,8 +3283,14 @@ async fn oc_session_delete(
async fn oc_session_status(State(state): State>) -> impl IntoResponse {
let sessions = state.opencode.sessions.lock().await;
let mut status_map = serde_json::Map::new();
- for id in sessions.keys() {
- status_map.insert(id.clone(), json!({"type": "idle"}));
+ for (id, session) in sessions.iter() {
+ status_map.insert(
+ id.clone(),
+ json!({
+ "type": session.status.status,
+ "updated": session.status.updated_at,
+ }),
+ );
}
(StatusCode::OK, Json(Value::Object(status_map)))
}
@@ -2750,10 +3303,25 @@ async fn oc_session_status(State(state): State>) -> impl I
tag = "opencode"
)]
async fn oc_session_abort(
- State(_state): State>,
- Path(_session_id): Path,
+ State(state): State>,
+ Path(session_id): Path,
) -> impl IntoResponse {
- bool_ok(true)
+ let updated = state
+ .opencode
+ .update_session_status(&session_id, "idle")
+ .await;
+ if updated.is_none() {
+ return not_found("Session not found").into_response();
+ }
+ state.opencode.emit_event(json!({
+ "type": "session.status",
+ "properties": {"sessionID": session_id, "status": {"type": "idle"}}
+ }));
+ state.opencode.emit_event(json!({
+ "type": "session.idle",
+ "properties": {"sessionID": session_id}
+ }));
+ bool_ok(true).into_response()
}
#[utoipa::path(
@@ -2763,8 +3331,27 @@ async fn oc_session_abort(
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_session_children() -> impl IntoResponse {
- (StatusCode::OK, Json(json!([])))
+async fn oc_session_children(
+ State(state): State>,
+ Path(session_id): Path,
+) -> impl IntoResponse {
+ let sessions = state.opencode.sessions.lock().await;
+ if !sessions.contains_key(&session_id) {
+ return not_found("Session not found").into_response();
+ }
+ let mut children: Vec = sessions
+ .values()
+ .filter(|session| session.parent_id.as_deref() == Some(&session_id))
+ .cloned()
+ .collect();
+ drop(sessions);
+ children.sort_by(|a, b| {
+ a.created_at
+ .cmp(&b.created_at)
+ .then_with(|| a.id.cmp(&b.id))
+ });
+ let response: Vec = children.into_iter().map(|s| s.to_value()).collect();
+ (StatusCode::OK, Json(json!(response))).into_response()
}
#[utoipa::path(
@@ -2774,8 +3361,19 @@ async fn oc_session_children() -> impl IntoResponse {
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_session_init() -> impl IntoResponse {
- bool_ok(true)
+async fn oc_session_init(
+ State(state): State>,
+ Path(session_id): Path,
+ headers: HeaderMap,
+ Query(query): Query,
+) -> impl IntoResponse {
+ let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
+ let _ = state.opencode.ensure_session(&session_id, directory).await;
+ let _ = state
+ .opencode
+ .update_session_status(&session_id, "idle")
+ .await;
+ bool_ok(true).into_response()
}
#[utoipa::path(
@@ -2789,25 +3387,38 @@ async fn oc_session_init() -> impl IntoResponse {
async fn oc_session_fork(
State(state): State>,
Path(session_id): Path,
- headers: HeaderMap,
- Query(query): Query,
) -> impl IntoResponse {
- let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
let now = state.opencode.now_ms();
+ let parent = {
+ let sessions = state.opencode.sessions.lock().await;
+ sessions.get(&session_id).cloned()
+ };
+ let Some(parent) = parent else {
+ return not_found("Session not found").into_response();
+ };
+ let (directory, project_id, title, version) = (
+ parent.directory,
+ parent.project_id,
+ format!("Fork of {}", parent.title),
+ parent.version,
+ );
let id = next_id("ses_", &SESSION_COUNTER);
let slug = format!("session-{}", id);
- let title = format!("Fork of {}", session_id);
let record = OpenCodeSessionRecord {
id: id.clone(),
slug,
- project_id: state.opencode.default_project_id.clone(),
+ project_id,
directory,
parent_id: Some(session_id),
title,
- version: "0".to_string(),
+ version,
created_at: now,
updated_at: now,
share_url: None,
+ status: OpenCodeSessionStatus {
+ status: "idle".to_string(),
+ updated_at: now,
+ },
};
let value = record.to_value();
@@ -2818,6 +3429,7 @@ async fn oc_session_fork(
state
.opencode
.emit_event(session_event("session.created", &value));
+ state.opencode.persist_sessions().await;
(StatusCode::OK, Json(value))
}
@@ -2829,8 +3441,15 @@ async fn oc_session_fork(
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_session_diff() -> impl IntoResponse {
- (StatusCode::OK, Json(json!([])))
+async fn oc_session_diff(
+ State(state): State>,
+ Path(session_id): Path,
+) -> impl IntoResponse {
+ let sessions = state.opencode.sessions.lock().await;
+ if !sessions.contains_key(&session_id) {
+ return not_found("Session not found").into_response();
+ }
+ (StatusCode::OK, Json(json!([]))).into_response()
}
#[utoipa::path(
@@ -2926,6 +3545,10 @@ async fn oc_session_message_create(
"status": {"type": "busy"}
}
}));
+ let _ = state
+ .opencode
+ .update_session_status(&session_id, "busy")
+ .await;
let mut user_message = build_user_message(
&session_id,
@@ -3246,6 +3869,22 @@ async fn oc_session_revert(
headers: HeaderMap,
Query(query): Query,
) -> impl IntoResponse {
+ let now = state.opencode.now_ms();
+ let updated = state
+ .opencode
+ .mutate_session(&session_id, |session| {
+ session.version = bump_version(&session.version);
+ session.updated_at = now;
+ })
+ .await;
+ if let Some(session) = updated {
+ let value = session.to_value();
+ state
+ .opencode
+ .emit_event(session_event("session.updated", &value));
+ state.opencode.persist_sessions().await;
+ return (StatusCode::OK, Json(value)).into_response();
+ }
oc_session_get(State(state), Path(session_id), headers, Query(query)).await
}
@@ -3263,6 +3902,22 @@ async fn oc_session_unrevert(
headers: HeaderMap,
Query(query): Query,
) -> impl IntoResponse {
+ let now = state.opencode.now_ms();
+ let updated = state
+ .opencode
+ .mutate_session(&session_id, |session| {
+ session.version = bump_version(&session.version);
+ session.updated_at = now;
+ })
+ .await;
+ if let Some(session) = updated {
+ let value = session.to_value();
+ state
+ .opencode
+ .emit_event(session_event("session.updated", &value));
+ state.opencode.persist_sessions().await;
+ return (StatusCode::OK, Json(value)).into_response();
+ }
oc_session_get(State(state), Path(session_id), headers, Query(query)).await
}
@@ -3308,10 +3963,20 @@ async fn oc_session_share(
State(state): State>,
Path(session_id): Path,
) -> impl IntoResponse {
- let mut sessions = state.opencode.sessions.lock().await;
- if let Some(session) = sessions.get_mut(&session_id) {
- session.share_url = Some(format!("https://share.local/{}", session_id));
+ let now = state.opencode.now_ms();
+ let updated = state
+ .opencode
+ .mutate_session(&session_id, |session| {
+ session.share_url = Some(format!("https://share.local/{}", session_id));
+ session.updated_at = now;
+ })
+ .await;
+ if let Some(session) = updated {
let value = session.to_value();
+ state
+ .opencode
+ .emit_event(session_event("session.updated", &value));
+ state.opencode.persist_sessions().await;
return (StatusCode::OK, Json(value)).into_response();
}
not_found("Session not found").into_response()
@@ -3328,10 +3993,20 @@ async fn oc_session_unshare(
State(state): State>,
Path(session_id): Path,
) -> impl IntoResponse {
- let mut sessions = state.opencode.sessions.lock().await;
- if let Some(session) = sessions.get_mut(&session_id) {
- session.share_url = None;
+ let now = state.opencode.now_ms();
+ let updated = state
+ .opencode
+ .mutate_session(&session_id, |session| {
+ session.share_url = None;
+ session.updated_at = now;
+ })
+ .await;
+ if let Some(session) = updated {
let value = session.to_value();
+ state
+ .opencode
+ .emit_event(session_event("session.updated", &value));
+ state.opencode.persist_sessions().await;
return (StatusCode::OK, Json(value)).into_response();
}
not_found("Session not found").into_response()
@@ -3344,8 +4019,15 @@ async fn oc_session_unshare(
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_session_todo() -> impl IntoResponse {
- (StatusCode::OK, Json(json!([])))
+async fn oc_session_todo(
+ State(state): State>,
+ Path(session_id): Path,
+) -> impl IntoResponse {
+ let sessions = state.opencode.sessions.lock().await;
+ if !sessions.contains_key(&session_id) {
+ return not_found("Session not found").into_response();
+ }
+ (StatusCode::OK, Json(json!([]))).into_response()
}
#[utoipa::path(
@@ -3621,8 +4303,8 @@ async fn oc_auth_remove(Path(_provider_id): Path) -> impl IntoResponse {
tag = "opencode"
)]
async fn oc_pty_list(State(state): State>) -> impl IntoResponse {
- let ptys = state.opencode.ptys.lock().await;
- let values: Vec = ptys.values().map(|p| p.to_value()).collect();
+ let ptys = state.inner.session_manager().pty_manager().list().await;
+ let values: Vec = ptys.iter().map(pty_to_value).collect();
(StatusCode::OK, Json(json!(values)))
}
@@ -3641,23 +4323,34 @@ async fn oc_pty_create(
) -> impl IntoResponse {
let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
let id = next_id("pty_", &PTY_COUNTER);
- let record = OpenCodePtyRecord {
+ let owner_session_id = headers
+ .get("x-opencode-session")
+ .and_then(|value| value.to_str().ok())
+ .map(|value| value.to_string());
+ let options = PtyCreateOptions {
id: id.clone(),
title: body.title.unwrap_or_else(|| "PTY".to_string()),
command: body.command.unwrap_or_else(|| "bash".to_string()),
args: body.args.unwrap_or_default(),
cwd: body.cwd.unwrap_or_else(|| directory),
- status: "running".to_string(),
- pid: 0,
+ env: body.env.unwrap_or_default(),
+ owner_session_id,
};
- let value = record.to_value();
- let mut ptys = state.opencode.ptys.lock().await;
- ptys.insert(id, record);
- drop(ptys);
+ let record = match state
+ .inner
+ .session_manager()
+ .pty_manager()
+ .create(options)
+ .await
+ {
+ Ok(record) => record,
+ Err(err) => return internal_error(&err.to_string()).into_response(),
+ };
+ let value = pty_to_value(&record);
state
.opencode
- .emit_event(json!({"type": "pty.created", "properties": {"pty": value}}));
+ .emit_event(json!({"type": "pty.created", "properties": {"info": value}}));
(StatusCode::OK, Json(value))
}
@@ -3673,9 +4366,14 @@ async fn oc_pty_get(
State(state): State>,
Path(pty_id): Path,
) -> impl IntoResponse {
- let ptys = state.opencode.ptys.lock().await;
- if let Some(pty) = ptys.get(&pty_id) {
- return (StatusCode::OK, Json(pty.to_value())).into_response();
+ if let Some(pty) = state
+ .inner
+ .session_manager()
+ .pty_manager()
+ .get(&pty_id)
+ .await
+ {
+ return (StatusCode::OK, Json(pty_to_value(&pty))).into_response();
}
not_found("PTY not found").into_response()
}
@@ -3684,33 +4382,37 @@ async fn oc_pty_get(
put,
path = "/pty/{ptyID}",
params(("ptyID" = String, Path, description = "Pty ID")),
- request_body = String,
+ request_body = PtyUpdateRequest,
responses((status = 200), (status = 404)),
tag = "opencode"
)]
async fn oc_pty_update(
State(state): State>,
Path(pty_id): Path,
- Json(body): Json,
+ Json(body): Json,
) -> impl IntoResponse {
- let mut ptys = state.opencode.ptys.lock().await;
- if let Some(pty) = ptys.get_mut(&pty_id) {
- if let Some(title) = body.title {
- pty.title = title;
- }
- if let Some(command) = body.command {
- pty.command = command;
- }
- if let Some(args) = body.args {
- pty.args = args;
- }
- if let Some(cwd) = body.cwd {
- pty.cwd = cwd;
- }
- let value = pty.to_value();
+ let options = PtyUpdateOptions {
+ title: body.title,
+ size: body.size.map(|size| PtySizeSpec {
+ rows: size.rows,
+ cols: size.cols,
+ }),
+ };
+ let updated = match state
+ .inner
+ .session_manager()
+ .pty_manager()
+ .update(&pty_id, options)
+ .await
+ {
+ Ok(updated) => updated,
+ Err(err) => return internal_error(&err.to_string()).into_response(),
+ };
+ if let Some(pty) = updated {
+ let value = pty_to_value(&pty);
state
.opencode
- .emit_event(json!({"type": "pty.updated", "properties": {"pty": value}}));
+ .emit_event(json!({"type": "pty.updated", "properties": {"info": value}}));
return (StatusCode::OK, Json(value)).into_response();
}
not_found("PTY not found").into_response()
@@ -3727,11 +4429,17 @@ async fn oc_pty_delete(
State(state): State>,
Path(pty_id): Path,
) -> impl IntoResponse {
- let mut ptys = state.opencode.ptys.lock().await;
- if let Some(pty) = ptys.remove(&pty_id) {
+ if state
+ .inner
+ .session_manager()
+ .pty_manager()
+ .remove(&pty_id)
+ .await
+ .is_some()
+ {
state
.opencode
- .emit_event(json!({"type": "pty.deleted", "properties": {"pty": pty.to_value()}}));
+ .emit_event(json!({"type": "pty.deleted", "properties": {"id": pty_id}}));
return bool_ok(true).into_response();
}
not_found("PTY not found").into_response()
@@ -3744,8 +4452,70 @@ async fn oc_pty_delete(
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_pty_connect(Path(_pty_id): Path) -> impl IntoResponse {
- bool_ok(true)
+async fn oc_pty_connect(
+ State(state): State>,
+ Path(pty_id): Path,
+ ws: Option,
+) -> impl IntoResponse {
+ let io = match state
+ .inner
+ .session_manager()
+ .pty_manager()
+ .connect(&pty_id)
+ .await
+ {
+ Some(io) => io,
+ None => return not_found("PTY not found").into_response(),
+ };
+
+ if let Some(ws) = ws {
+ return ws.on_upgrade(move |socket| handle_pty_socket(socket, io)).into_response();
+ }
+
+ let stream = ReceiverStream::new(io.output).map(|chunk| {
+ let text = String::from_utf8_lossy(&chunk).to_string();
+ Ok::(Event::default().data(text))
+ });
+ Sse::new(stream)
+ .keep_alive(KeepAlive::new().interval(std::time::Duration::from_secs(15)))
+ .into_response()
+}
+
+async fn handle_pty_socket(socket: WebSocket, io: PtyIo) {
+ let (mut sender, mut receiver) = socket.split();
+ let mut output_rx = io.output;
+ let input_tx = io.input;
+
+ let output_task = tokio::spawn(async move {
+ while let Some(chunk) = output_rx.recv().await {
+ let text = String::from_utf8_lossy(&chunk).to_string();
+ if sender.send(Message::Text(text)).await.is_err() {
+ break;
+ }
+ }
+ });
+
+ let input_task = tokio::spawn(async move {
+ while let Some(message) = receiver.next().await {
+ match message {
+ Ok(Message::Text(text)) => {
+ if input_tx.send(text.into_bytes()).await.is_err() {
+ break;
+ }
+ }
+ Ok(Message::Binary(bytes)) => {
+ if input_tx.send(bytes).await.is_err() {
+ break;
+ }
+ }
+ Ok(Message::Close(_)) => break,
+ Ok(Message::Ping(_)) | Ok(Message::Pong(_)) => {}
+ Err(_) => break,
+ }
+ }
+ });
+
+ let _ = tokio::join!(output_task, input_task);
}
#[utoipa::path(
@@ -3788,17 +4558,308 @@ async fn oc_file_status() -> impl IntoResponse {
(StatusCode::OK, Json(json!([]))).into_response()
}
+fn parse_find_limit(limit: Option) -> Result)> {
+ let limit = limit.unwrap_or(FIND_MAX_RESULTS);
+ if limit == 0 || limit > FIND_MAX_RESULTS {
+ return Err(bad_request("limit must be between 1 and 200"));
+ }
+ Ok(limit)
+}
+
+fn resolve_find_root(
+ state: &OpenCodeAppState,
+ headers: &HeaderMap,
+ directory: Option<&String>,
+) -> Result)> {
+ let directory = state.opencode.directory_for(headers, directory);
+ let root = PathBuf::from(directory);
+ let root = root
+ .canonicalize()
+ .map_err(|_| bad_request("directory not found"))?;
+ if !root.is_dir() {
+ return Err(bad_request("directory not found"));
+ }
+ Ok(root)
+}
+
+fn normalize_path(path: &FsPath) -> String {
+ path.to_string_lossy().replace('\\', "/")
+}
+
+fn opencode_symbol_kind(kind: &str) -> u32 {
+ match kind {
+ "class" => 5,
+ "interface" | "trait" => 11,
+ "struct" => 23,
+ "enum" => 10,
+ "function" => 12,
+ _ => 12,
+ }
+}
+
+fn has_wildcards(query: &str) -> bool {
+ query.contains('*') || query.contains('?')
+}
+
+fn wildcard_match(pattern: &str, text: &str) -> bool {
+ let pattern = pattern.as_bytes();
+ let text = text.as_bytes();
+ let mut pattern_index = 0;
+ let mut text_index = 0;
+ let mut star_index: Option = None;
+ let mut match_index = 0;
+
+ while text_index < text.len() {
+ if pattern_index < pattern.len()
+ && (pattern[pattern_index] == b'?' || pattern[pattern_index] == text[text_index])
+ {
+ pattern_index += 1;
+ text_index += 1;
+ continue;
+ }
+
+ if pattern_index < pattern.len() && pattern[pattern_index] == b'*' {
+ star_index = Some(pattern_index);
+ match_index = text_index;
+ pattern_index += 1;
+ continue;
+ }
+
+ if let Some(star) = star_index {
+ pattern_index = star + 1;
+ match_index += 1;
+ text_index = match_index;
+ continue;
+ }
+
+ return false;
+ }
+
+ while pattern_index < pattern.len() && pattern[pattern_index] == b'*' {
+ pattern_index += 1;
+ }
+
+ pattern_index == pattern.len()
+}
+
+fn matches_find_query(candidate: &str, query: &str, use_wildcards: bool) -> bool {
+ if use_wildcards {
+ return wildcard_match(query, candidate);
+ }
+ candidate.contains(query)
+}
+
+fn parse_find_entry_type(
+ entry_type: Option<&str>,
+ dirs: Option,
+) -> Result<(bool, bool), (StatusCode, Json)> {
+ match entry_type {
+ Some("file") => Ok((true, false)),
+ Some("directory") => Ok((false, true)),
+ Some(_) => Err(bad_request("type must be file or directory")),
+ None => Ok((true, dirs.unwrap_or(false))),
+ }
+}
+
+fn find_files_in_root(
+ root: &FsPath,
+ query: &str,
+ include_files: bool,
+ include_dirs: bool,
+ limit: usize,
+) -> Vec {
+ let mut results = Vec::new();
+ let mut queue = VecDeque::new();
+ let query_lower = query.to_ascii_lowercase();
+ let use_wildcards = has_wildcards(&query_lower);
+
+ queue.push_back(root.to_path_buf());
+
+ while let Some(dir) = queue.pop_front() {
+ let entries = match fs::read_dir(&dir) {
+ Ok(entries) => entries,
+ Err(_) => continue,
+ };
+
+ for entry in entries.flatten() {
+ let path = entry.path();
+ let file_name = entry.file_name().to_string_lossy().to_string();
+ let file_name_lower = file_name.to_ascii_lowercase();
+
+ if path.is_dir() {
+ if FIND_IGNORE_DIRS.iter().any(|name| *name == file_name) {
+ continue;
+ }
+ queue.push_back(path.clone());
+ }
+
+ let is_file = path.is_file();
+ let is_dir = path.is_dir();
+ if (is_file && !include_files) || (is_dir && !include_dirs) {
+ continue;
+ }
+
+ let relative = path.strip_prefix(root).unwrap_or(&path);
+ let relative_text = normalize_path(relative);
+ if relative_text.is_empty() {
+ continue;
+ }
+ let relative_lower = relative_text.to_ascii_lowercase();
+
+ if matches_find_query(&relative_lower, &query_lower, use_wildcards)
+ || matches_find_query(&file_name_lower, &query_lower, use_wildcards)
+ {
+ results.push(relative_text);
+ if results.len() >= limit {
+ return results;
+ }
+ }
+ }
+ }
+
+ results
+}
+
+async fn rg_matches(
+ root: &FsPath,
+ pattern: &str,
+ limit: usize,
+ case_sensitive: Option,
+) -> Result, (StatusCode, Json)> {
+ let mut command = Command::new("rg");
+ command
+ .arg("--json")
+ .arg("--line-number")
+ .arg("--byte-offset")
+ .arg("--with-filename")
+ .arg("--max-count")
+ .arg(limit.to_string());
+ if case_sensitive == Some(false) {
+ command.arg("--ignore-case");
+ }
+ command.arg(pattern);
+ command.current_dir(root);
+
+ let output = command
+ .output()
+ .await
+ .map_err(|_| internal_error("ripgrep failed"))?;
+ if !output.status.success() {
+ if output.status.code() != Some(1) {
+ return Err(internal_error("ripgrep failed"));
+ }
+ }
+
+ let mut results = Vec::new();
+ for line in output.stdout.split(|b| *b == b'\n') {
+ if line.is_empty() {
+ continue;
+ }
+ let value: Value = serde_json::from_slice(line)
+ .map_err(|_| internal_error("invalid ripgrep output"))?;
+ if value.get("type").and_then(|v| v.as_str()) != Some("match") {
+ continue;
+ }
+ if let Some(data) = value.get("data") {
+ results.push(data.clone());
+ if results.len() >= limit {
+ break;
+ }
+ }
+ }
+
+ Ok(results)
+}
+
+fn symbol_from_match(root: &FsPath, data: &Value) -> Option {
+ let path_text = data
+ .get("path")
+ .and_then(|v| v.get("text"))
+ .and_then(|v| v.as_str())?;
+ let line_number = data.get("line_number").and_then(|v| v.as_u64())?;
+ let submatch = data
+ .get("submatches")
+ .and_then(|v| v.as_array())
+ .and_then(|v| v.first())?;
+ let match_text = submatch
+ .get("match")
+ .and_then(|v| v.get("text"))
+ .and_then(|v| v.as_str())?;
+ let start = submatch.get("start").and_then(|v| v.as_u64())?;
+ let end = submatch.get("end").and_then(|v| v.as_u64())?;
+
+ let path = FsPath::new(path_text);
+ let absolute = if path.is_absolute() {
+ path.to_path_buf()
+ } else {
+ root.join(path)
+ };
+ let uri = format!("file://{}", normalize_path(&absolute));
+ let line = line_number.saturating_sub(1);
+
+ Some(json!({
+ "name": match_text,
+ "kind": 12,
+ "location": {
+ "uri": uri,
+ "range": {
+ "start": {"line": line, "character": start},
+ "end": {"line": line, "character": end}
+ }
+ }
+ }))
+}
+
+async fn rg_symbols(
+ root: &FsPath,
+ query: &str,
+ limit: usize,
+) -> Result, (StatusCode, Json)> {
+ let matches = rg_matches(root, query, limit, None).await?;
+ let mut symbols = Vec::new();
+ for data in matches {
+ if let Some(symbol) = symbol_from_match(root, &data) {
+ symbols.push(symbol);
+ if symbols.len() >= limit {
+ break;
+ }
+ }
+ }
+ Ok(symbols)
+}
+
#[utoipa::path(
get,
path = "/find",
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_find_text(Query(query): Query) -> impl IntoResponse {
- if query.pattern.is_none() {
- return bad_request("pattern is required").into_response();
+async fn oc_find_text(
+ State(state): State>,
+ headers: HeaderMap,
+ Query(query): Query,
+) -> impl IntoResponse {
+ let pattern = match query.pattern.as_deref().map(str::trim).filter(|v| !v.is_empty()) {
+ Some(value) => value,
+ None => return bad_request("pattern is required").into_response(),
+ };
+ let limit = match query.limit {
+ Some(value) if value == 0 || value > 200 => {
+ return bad_request("limit must be between 1 and 200").into_response();
+ }
+ Some(value) => Some(value),
+ None => None,
+ };
+ let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
+ match state
+ .inner
+ .session_manager()
+ .find_text(&directory, pattern, query.case_sensitive, limit)
+ .await
+ {
+ Ok(results) => (StatusCode::OK, Json(results)).into_response(),
+ Err(err) => sandbox_error_response(err).into_response(),
}
- (StatusCode::OK, Json(json!([]))).into_response()
}
#[utoipa::path(
@@ -3807,11 +4868,43 @@ async fn oc_find_text(Query(query): Query) -> impl IntoResponse {
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_find_files(Query(query): Query) -> impl IntoResponse {
- if query.query.is_none() {
- return bad_request("query is required").into_response();
+async fn oc_find_files(
+ State(state): State>,
+ headers: HeaderMap,
+ Query(query): Query,
+) -> impl IntoResponse {
+ let query_value = match query.query.as_deref().map(str::trim).filter(|v| !v.is_empty()) {
+ Some(value) => value,
+ None => return bad_request("query is required").into_response(),
+ };
+ let kind = match query.entry_type.as_deref() {
+ Some("file") => FindFileKind::File,
+ Some("directory") => FindFileKind::Directory,
+ Some(_) => return bad_request("type must be file or directory").into_response(),
+ None => {
+ if query.dirs.unwrap_or(false) {
+ FindFileKind::Any
+ } else {
+ FindFileKind::File
+ }
+ }
+ };
+ let limit = query.limit.unwrap_or(200).min(200).max(1);
+ let options = FindFileOptions {
+ kind,
+ case_sensitive: false,
+ limit: Some(limit),
+ };
+ let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
+ match state
+ .inner
+ .session_manager()
+ .find_files(&directory, query_value, options)
+ .await
+ {
+ Ok(results) => (StatusCode::OK, Json(results)).into_response(),
+ Err(err) => sandbox_error_response(err).into_response(),
}
- (StatusCode::OK, Json(json!([]))).into_response()
}
#[utoipa::path(
@@ -3820,11 +4913,54 @@ async fn oc_find_files(Query(query): Query) -> impl IntoResponse
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_find_symbols(Query(query): Query) -> impl IntoResponse {
- if query.query.is_none() {
- return bad_request("query is required").into_response();
+async fn oc_find_symbols(
+ State(state): State>,
+ headers: HeaderMap,
+ Query(query): Query,
+) -> impl IntoResponse {
+ let query_value = match query.query.as_deref().map(str::trim).filter(|v| !v.is_empty()) {
+ Some(value) => value,
+ None => return bad_request("query is required").into_response(),
+ };
+ let limit = match query.limit {
+ Some(value) if value == 0 || value > 200 => {
+ return bad_request("limit must be between 1 and 200").into_response();
+ }
+ Some(value) => Some(value),
+ None => None,
+ };
+ let directory = state.opencode.directory_for(&headers, query.directory.as_ref());
+ match state
+ .inner
+ .session_manager()
+ .find_symbols(&directory, query_value, limit)
+ .await
+ {
+ Ok(results) => {
+ let root = PathBuf::from(&directory);
+ let symbols: Vec = results
+ .into_iter()
+ .map(|symbol| {
+ let path = PathBuf::from(&symbol.path);
+ let absolute = if path.is_absolute() { path } else { root.join(&symbol.path) };
+ let uri = format!("file://{}", normalize_path(&absolute));
+ json!({
+ "name": symbol.name,
+ "kind": opencode_symbol_kind(&symbol.kind),
+ "location": {
+ "uri": uri,
+ "range": {
+ "start": {"line": symbol.line.saturating_sub(1), "character": symbol.column.saturating_sub(1)},
+ "end": {"line": symbol.line.saturating_sub(1), "character": symbol.column.saturating_sub(1)}
+ }
+ }
+ })
+ })
+ .collect();
+ (StatusCode::OK, Json(symbols)).into_response()
+ }
+ Err(err) => sandbox_error_response(err).into_response(),
}
- (StatusCode::OK, Json(json!([]))).into_response()
}
#[utoipa::path(
@@ -3833,18 +4969,41 @@ async fn oc_find_symbols(Query(query): Query) -> impl IntoResp
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_mcp_list() -> impl IntoResponse {
- (StatusCode::OK, Json(json!({})))
+async fn oc_mcp_list(State(state): State>) -> impl IntoResponse {
+ let statuses = state.inner.session_manager().mcp_statuses().await;
+ let mut map = serde_json::Map::new();
+ for (name, status) in statuses {
+ map.insert(name, status.to_value());
+ }
+ (StatusCode::OK, Json(Value::Object(map)))
}
#[utoipa::path(
post,
path = "/mcp",
+ request_body = OpenCodeMcpRegisterRequest,
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_mcp_register() -> impl IntoResponse {
- (StatusCode::OK, Json(json!({})))
+async fn oc_mcp_register(
+ State(state): State>,
+ Json(body): Json,
+) -> impl IntoResponse {
+ match state
+ .inner
+ .session_manager()
+ .mcp_register(body.name, body.config)
+ .await
+ {
+ Ok(statuses) => {
+ let mut map = serde_json::Map::new();
+ for (name, status) in statuses {
+ map.insert(name, status.to_value());
+ }
+ (StatusCode::OK, Json(Value::Object(map))).into_response()
+ }
+ Err(err) => mcp_error_response(err).into_response(),
+ }
}
#[utoipa::path(
@@ -3855,10 +5014,18 @@ async fn oc_mcp_register() -> impl IntoResponse {
tag = "opencode"
)]
async fn oc_mcp_auth(
- Path(_name): Path,
+ State(state): State>,
+ Path(name): Path,
_body: Option>,
) -> impl IntoResponse {
- (StatusCode::OK, Json(json!({"status": "needs_auth"})))
+ match state.inner.session_manager().mcp_auth_start(&name).await {
+ Ok(url) => (
+ StatusCode::OK,
+ Json(json!({"authorizationUrl": url})),
+ )
+ .into_response(),
+ Err(err) => mcp_error_response(err).into_response(),
+ }
}
#[utoipa::path(
@@ -3868,37 +5035,56 @@ async fn oc_mcp_auth(
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_mcp_auth_remove(Path(_name): Path) -> impl IntoResponse {
- (StatusCode::OK, Json(json!({"status": "disabled"})))
+async fn oc_mcp_auth_remove(
+ State(state): State>,
+ Path(name): Path,
+) -> impl IntoResponse {
+ match state.inner.session_manager().mcp_auth_remove(&name).await {
+ Ok(_) => (StatusCode::OK, Json(json!({"success": true}))).into_response(),
+ Err(err) => mcp_error_response(err).into_response(),
+ }
}
#[utoipa::path(
post,
path = "/mcp/{name}/auth/callback",
params(("name" = String, Path, description = "MCP server name")),
+ request_body = OpenCodeMcpAuthCallbackRequest,
responses((status = 200)),
tag = "opencode"
)]
async fn oc_mcp_auth_callback(
- Path(_name): Path,
- _body: Option>,
+ State(state): State>,
+ Path(name): Path,
+ Json(body): Json,
) -> impl IntoResponse {
- (StatusCode::OK, Json(json!({"status": "needs_auth"})))
+ match state
+ .inner
+ .session_manager()
+ .mcp_auth_callback(&name, body.code)
+ .await
+ {
+ Ok(status) => (StatusCode::OK, Json(status.to_value())).into_response(),
+ Err(err) => mcp_error_response(err).into_response(),
+ }
}
#[utoipa::path(
post,
path = "/mcp/{name}/auth/authenticate",
params(("name" = String, Path, description = "MCP server name")),
- request_body = String,
responses((status = 200)),
tag = "opencode"
)]
async fn oc_mcp_authenticate(
- Path(_name): Path,
+ State(state): State>,
+ Path(name): Path,
_body: Option>,
) -> impl IntoResponse {
- (StatusCode::OK, Json(json!({"status": "needs_auth"})))
+ match state.inner.session_manager().mcp_auth_authenticate(&name).await {
+ Ok(status) => (StatusCode::OK, Json(status.to_value())).into_response(),
+ Err(err) => mcp_error_response(err).into_response(),
+ }
}
#[utoipa::path(
@@ -3908,8 +5094,14 @@ async fn oc_mcp_authenticate(
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_mcp_connect(Path(_name): Path) -> impl IntoResponse {
- bool_ok(true)
+async fn oc_mcp_connect(
+ State(state): State>,
+ Path(name): Path,
+) -> impl IntoResponse {
+ match state.inner.session_manager().mcp_connect(&name).await {
+ Ok(_) => bool_ok(true).into_response(),
+ Err(err) => mcp_error_response(err).into_response(),
+ }
}
#[utoipa::path(
@@ -3919,8 +5111,14 @@ async fn oc_mcp_connect(Path(_name): Path) -> impl IntoResponse {
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_mcp_disconnect(Path(_name): Path) -> impl IntoResponse {
- bool_ok(true)
+async fn oc_mcp_disconnect(
+ State(state): State>,
+ Path(name): Path,
+) -> impl IntoResponse {
+ match state.inner.session_manager().mcp_disconnect(&name).await {
+ Ok(_) => bool_ok(true).into_response(),
+ Err(err) => mcp_error_response(err).into_response(),
+ }
}
#[utoipa::path(
@@ -3929,8 +5127,9 @@ async fn oc_mcp_disconnect(Path(_name): Path) -> impl IntoResponse {
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_tool_ids() -> impl IntoResponse {
- (StatusCode::OK, Json(json!([])))
+async fn oc_tool_ids(State(state): State>) -> impl IntoResponse {
+ let tool_ids = state.inner.session_manager().mcp_tool_ids().await;
+ (StatusCode::OK, Json(json!(tool_ids)))
}
#[utoipa::path(
@@ -3939,11 +5138,16 @@ async fn oc_tool_ids() -> impl IntoResponse {
responses((status = 200)),
tag = "opencode"
)]
-async fn oc_tool_list(Query(query): Query) -> impl IntoResponse {
+async fn oc_tool_list(
+ State(state): State>,
+ Query(query): Query,
+) -> impl IntoResponse {
if query.provider.is_none() || query.model.is_none() {
return bad_request("provider and model are required").into_response();
}
- (StatusCode::OK, Json(json!([]))).into_response()
+ let tools = state.inner.session_manager().mcp_tools().await;
+ let values: Vec = tools.into_iter().map(|tool| tool.to_tool_list_item()).collect();
+ (StatusCode::OK, Json(json!(values))).into_response()
}
#[utoipa::path(
@@ -4267,7 +5471,9 @@ async fn oc_tui_select_session() -> impl IntoResponse {
PermissionReplyRequest,
PermissionGlobalReplyRequest,
QuestionReplyBody,
- PtyCreateRequest
+ PtyCreateRequest,
+ PtySizeRequest,
+ PtyUpdateRequest
)),
tags((name = "opencode", description = "OpenCode compatibility API"))
)]
diff --git a/server/packages/sandbox-agent/src/pty.rs b/server/packages/sandbox-agent/src/pty.rs
new file mode 100644
index 0000000..64a4ecd
--- /dev/null
+++ b/server/packages/sandbox-agent/src/pty.rs
@@ -0,0 +1,354 @@
+use std::collections::HashMap;
+use std::io::{Read, Write};
+use std::sync::{Arc, Mutex};
+use std::time::Duration;
+
+use portable_pty::{native_pty_system, CommandBuilder, MasterPty, PtySize};
+use tokio::sync::{broadcast, mpsc, Mutex as AsyncMutex};
+
+use sandbox_agent_error::SandboxError;
+
+const DEFAULT_ROWS: u16 = 24;
+const DEFAULT_COLS: u16 = 80;
+const OUTPUT_BUFFER_SIZE: usize = 8192;
+const OUTPUT_CHANNEL_CAPACITY: usize = 256;
+const INPUT_CHANNEL_CAPACITY: usize = 256;
+const EXIT_POLL_INTERVAL_MS: u64 = 200;
+
+#[derive(Debug, Clone)]
+pub struct PtyRecord {
+ pub id: String,
+ pub title: String,
+ pub command: String,
+ pub args: Vec,
+ pub cwd: String,
+ pub status: PtyStatus,
+ pub pid: i64,
+}
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum PtyStatus {
+ Running,
+ Exited,
+}
+
+impl PtyStatus {
+ pub fn as_str(&self) -> &'static str {
+ match self {
+ PtyStatus::Running => "running",
+ PtyStatus::Exited => "exited",
+ }
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct PtySizeSpec {
+ pub rows: u16,
+ pub cols: u16,
+}
+
+#[derive(Debug, Clone)]
+pub struct PtyCreateOptions {
+ pub id: String,
+ pub title: String,
+ pub command: String,
+ pub args: Vec,
+ pub cwd: String,
+ pub env: HashMap,
+ pub owner_session_id: Option,
+}
+
+#[derive(Debug, Clone)]
+pub struct PtyUpdateOptions {
+ pub title: Option,
+ pub size: Option,
+}
+
+#[derive(Debug)]
+pub struct PtyIo {
+ pub output: mpsc::Receiver>,
+ pub input: mpsc::Sender>,
+}
+
+#[derive(Debug, Clone)]
+pub enum PtyEvent {
+ Exited { id: String, exit_code: i32 },
+}
+
+#[derive(Debug)]
+pub struct PtyManager {
+ ptys: AsyncMutex>>,
+ event_tx: broadcast::Sender,
+}
+
+#[derive(Debug)]
+struct PtyHandle {
+ record: Mutex,
+ master: Mutex>,
+ input_tx: mpsc::Sender>,
+ output_listeners: Mutex>>>,
+ owner_session_id: Option,
+ child: Mutex>,
+}
+
+#[derive(Debug, Clone)]
+struct PtyRecordState {
+ record: PtyRecord,
+ exit_code: Option,
+}
+
+impl PtyRecordState {
+ fn snapshot(&self) -> PtyRecord {
+ self.record.clone()
+ }
+}
+
+impl PtyManager {
+ pub fn new() -> Self {
+ let (event_tx, _) = broadcast::channel(128);
+ Self {
+ ptys: AsyncMutex::new(HashMap::new()),
+ event_tx,
+ }
+ }
+
+ pub fn subscribe(&self) -> broadcast::Receiver {
+ self.event_tx.subscribe()
+ }
+
+ pub async fn list(&self) -> Vec {
+ let ptys = self.ptys.lock().await;
+ ptys.values()
+ .map(|handle| handle.record.lock().unwrap().snapshot())
+ .collect()
+ }
+
+ pub async fn get(&self, id: &str) -> Option {
+ let ptys = self.ptys.lock().await;
+ ptys.get(id)
+ .map(|handle| handle.record.lock().unwrap().snapshot())
+ }
+
+ pub async fn connect(&self, id: &str) -> Option {
+ let ptys = self.ptys.lock().await;
+ let handle = ptys.get(id)?.clone();
+ drop(ptys);
+
+ let (output_tx, output_rx) = mpsc::channel(OUTPUT_CHANNEL_CAPACITY);
+ handle
+ .output_listeners
+ .lock()
+ .unwrap()
+ .push(output_tx);
+
+ Some(PtyIo {
+ output: output_rx,
+ input: handle.input_tx.clone(),
+ })
+ }
+
+ pub async fn create(&self, options: PtyCreateOptions) -> Result {
+ let pty_system = native_pty_system();
+ let pty_size = PtySize {
+ rows: DEFAULT_ROWS,
+ cols: DEFAULT_COLS,
+ pixel_width: 0,
+ pixel_height: 0,
+ };
+ let pair = pty_system.openpty(pty_size).map_err(|err| SandboxError::StreamError {
+ message: format!("failed to open pty: {err}"),
+ })?;
+
+ let mut cmd = CommandBuilder::new(&options.command);
+ cmd.args(&options.args);
+ cmd.cwd(&options.cwd);
+ for (key, value) in &options.env {
+ cmd.env(key, value);
+ }
+
+ let child = pair
+ .slave
+ .spawn_command(cmd)
+ .map_err(|err| SandboxError::StreamError {
+ message: format!("failed to spawn pty command: {err}"),
+ })?;
+ let pid = child
+ .process_id()
+ .map(|value| value as i64)
+ .unwrap_or(0);
+
+ let record = PtyRecord {
+ id: options.id.clone(),
+ title: options.title.clone(),
+ command: options.command.clone(),
+ args: options.args.clone(),
+ cwd: options.cwd.clone(),
+ status: PtyStatus::Running,
+ pid,
+ };
+ let record_state = PtyRecordState {
+ record: record.clone(),
+ exit_code: None,
+ };
+
+ let mut master = pair.master;
+ let reader = master
+ .try_clone_reader()
+ .map_err(|err| SandboxError::StreamError {
+ message: format!("failed to clone pty reader: {err}"),
+ })?;
+ let writer = master
+ .take_writer()
+ .map_err(|err| SandboxError::StreamError {
+ message: format!("failed to take pty writer: {err}"),
+ })?;
+
+ let (input_tx, input_rx) = mpsc::channel::>(INPUT_CHANNEL_CAPACITY);
+ let output_listeners = Mutex::new(Vec::new());
+ let handle = Arc::new(PtyHandle {
+ record: Mutex::new(record_state),
+ master: Mutex::new(master),
+ input_tx,
+ output_listeners,
+ owner_session_id: options.owner_session_id.clone(),
+ child: Mutex::new(child),
+ });
+
+ self.spawn_output_reader(handle.clone(), reader);
+ self.spawn_input_writer(writer, input_rx);
+ self.spawn_exit_watcher(handle.clone());
+
+ let mut ptys = self.ptys.lock().await;
+ ptys.insert(options.id.clone(), handle);
+ drop(ptys);
+
+ Ok(record)
+ }
+
+ pub async fn update(&self, id: &str, options: PtyUpdateOptions) -> Result