feat: add private mcp tunnel tool

This commit is contained in:
Nathan Flurry 2026-02-04 14:04:51 -08:00
parent cc5a9e0d73
commit e3aee90cf4
13 changed files with 1923 additions and 614 deletions

View file

@ -63,6 +63,7 @@ Universal schema guidance:
- `sandbox-agent api sessions reply-question``POST /v1/sessions/{sessionId}/questions/{questionId}/reply`
- `sandbox-agent api sessions reject-question``POST /v1/sessions/{sessionId}/questions/{questionId}/reject`
- `sandbox-agent api sessions reply-permission``POST /v1/sessions/{sessionId}/permissions/{permissionId}/reply`
- `sandbox-agent api sessions reply-mcp-tunnel``POST /v1/sessions/{sessionId}/mcp-tunnel/calls/{callId}/response`
## Post-Release Testing

View file

@ -162,6 +162,7 @@ sandbox-agent api sessions create <SESSION_ID> [OPTIONS]
| `-m, --model <MODEL>` | Model override |
| `-v, --variant <VARIANT>` | Model variant |
| `-A, --agent-version <VERSION>` | Agent version |
| `--mcp-tunnel-tools <JSON>` | JSON array of MCP tool definitions |
```bash
sandbox-agent api sessions create my-session \
@ -289,6 +290,22 @@ sandbox-agent api sessions reply-permission <SESSION_ID> <PERMISSION_ID> [OPTION
sandbox-agent api sessions reply-permission my-session perm1 --reply once
```
#### Reply to MCP Tunnel Tool Call
```bash
sandbox-agent api sessions reply-mcp-tunnel <SESSION_ID> <CALL_ID> [OPTIONS]
```
| Option | Description |
|--------|-------------|
| `-o, --output <TEXT>` | Tool output (required) |
| `--is-error` | Mark tool result as error |
| `--content <JSON>` | Optional MCP content payload |
```bash
sandbox-agent api sessions reply-mcp-tunnel my-session call-1 --output "ok"
```
---
## CLI to HTTP Mapping
@ -308,3 +325,4 @@ sandbox-agent api sessions reply-permission my-session perm1 --reply once
| `api sessions reply-question` | `POST /v1/sessions/{sessionId}/questions/{questionId}/reply` |
| `api sessions reject-question` | `POST /v1/sessions/{sessionId}/questions/{questionId}/reject` |
| `api sessions reply-permission` | `POST /v1/sessions/{sessionId}/permissions/{permissionId}/reply` |
| `api sessions reply-mcp-tunnel` | `POST /v1/sessions/{sessionId}/mcp-tunnel/calls/{callId}/response` |

View file

@ -365,6 +365,69 @@
}
}
},
"/v1/sessions/{session_id}/mcp-tunnel/calls/{call_id}/response": {
"post": {
"tags": [
"sessions"
],
"operationId": "reply_mcp_tunnel_call",
"parameters": [
{
"name": "session_id",
"in": "path",
"description": "Session id",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "call_id",
"in": "path",
"description": "MCP call id",
"required": true,
"schema": {
"type": "string"
}
}
],
"requestBody": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/McpTunnelToolResponseRequest"
}
}
},
"required": true
},
"responses": {
"204": {
"description": "MCP tool call responded"
},
"400": {
"description": "",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ProblemDetails"
}
}
}
},
"404": {
"description": "",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ProblemDetails"
}
}
}
}
}
}
},
"/v1/sessions/{session_id}/messages": {
"post": {
"tags": [
@ -1063,6 +1126,14 @@
"type": "string",
"nullable": true
},
"mcpTunnel": {
"allOf": [
{
"$ref": "#/components/schemas/McpTunnelConfig"
}
],
"nullable": true
},
"model": {
"type": "string",
"nullable": true
@ -1258,6 +1329,56 @@
"failed"
]
},
"McpTunnelConfig": {
"type": "object",
"required": [
"tools"
],
"properties": {
"tools": {
"type": "array",
"items": {
"$ref": "#/components/schemas/McpTunnelTool"
}
}
}
},
"McpTunnelTool": {
"type": "object",
"required": [
"name"
],
"properties": {
"description": {
"type": "string",
"nullable": true
},
"inputSchema": {
"nullable": true
},
"name": {
"type": "string"
}
}
},
"McpTunnelToolResponseRequest": {
"type": "object",
"required": [
"output"
],
"properties": {
"content": {
"nullable": true
},
"isError": {
"type": "boolean",
"nullable": true
},
"output": {
"type": "string"
}
}
},
"MessageRequest": {
"type": "object",
"required": [

View file

@ -9,6 +9,7 @@ import type {
EventsResponse,
HealthResponse,
MessageRequest,
McpTunnelToolResponseRequest,
PermissionReplyRequest,
ProblemDetails,
QuestionReplyRequest,
@ -207,6 +208,18 @@ export class SandboxAgent {
);
}
async replyMcpTunnelCall(
sessionId: string,
callId: string,
request: McpTunnelToolResponseRequest,
): Promise<void> {
await this.requestJson(
"POST",
`${API_PREFIX}/sessions/${encodeURIComponent(sessionId)}/mcp-tunnel/calls/${encodeURIComponent(callId)}/response`,
{ body: request },
);
}
async terminateSession(sessionId: string): Promise<void> {
await this.requestJson("POST", `${API_PREFIX}/sessions/${encodeURIComponent(sessionId)}/terminate`);
}

File diff suppressed because it is too large Load diff

View file

@ -28,6 +28,9 @@ export type {
ItemRole,
ItemStatus,
MessageRequest,
McpTunnelConfig,
McpTunnelTool,
McpTunnelToolResponseRequest,
PermissionEventData,
PermissionReply,
PermissionReplyRequest,

View file

@ -24,6 +24,9 @@ export type ItemKind = S["ItemKind"];
export type ItemRole = S["ItemRole"];
export type ItemStatus = S["ItemStatus"];
export type MessageRequest = S["MessageRequest"];
export type McpTunnelConfig = S["McpTunnelConfig"];
export type McpTunnelTool = S["McpTunnelTool"];
export type McpTunnelToolResponseRequest = S["McpTunnelToolResponseRequest"];
export type PermissionEventData = S["PermissionEventData"];
export type PermissionReply = S["PermissionReply"];
export type PermissionReplyRequest = S["PermissionReplyRequest"];

View file

@ -13,8 +13,9 @@ use reqwest::blocking::Client as HttpClient;
use reqwest::Method;
use sandbox_agent::router::{build_router_with_state, shutdown_servers};
use sandbox_agent::router::{
AgentInstallRequest, AppState, AuthConfig, CreateSessionRequest, MessageRequest,
PermissionReply, PermissionReplyRequest, QuestionReplyRequest,
AgentInstallRequest, AppState, AuthConfig, CreateSessionRequest, McpTunnelConfig,
McpTunnelTool, McpTunnelToolResponseRequest, MessageRequest, PermissionReply,
PermissionReplyRequest, QuestionReplyRequest,
};
use sandbox_agent::router::{
AgentListResponse, AgentModesResponse, CreateSessionResponse, EventsResponse,
@ -172,6 +173,9 @@ enum SessionsCommand {
#[command(name = "reply-permission")]
/// Reply to a permission request.
ReplyPermission(PermissionReplyArgs),
#[command(name = "reply-mcp-tunnel")]
/// Reply to an MCP tunnel tool call.
ReplyMcpTunnel(McpTunnelReplyArgs),
}
#[derive(Args, Debug, Clone)]
@ -218,6 +222,8 @@ struct CreateSessionArgs {
variant: Option<String>,
#[arg(long, short = 'A')]
agent_version: Option<String>,
#[arg(long)]
mcp_tunnel_tools: Option<String>,
#[command(flatten)]
client: ClientArgs,
}
@ -301,6 +307,20 @@ struct PermissionReplyArgs {
client: ClientArgs,
}
#[derive(Args, Debug)]
struct McpTunnelReplyArgs {
session_id: String,
call_id: String,
#[arg(long, short = 'o')]
output: String,
#[arg(long)]
is_error: bool,
#[arg(long)]
content: Option<String>,
#[command(flatten)]
client: ClientArgs,
}
#[derive(Args, Debug)]
struct CredentialsExtractArgs {
#[arg(long, short = 'a', value_enum)]
@ -486,6 +506,12 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> {
}
SessionsCommand::Create(args) => {
let ctx = ClientContext::new(cli, &args.client)?;
let mcp_tunnel = if let Some(raw) = args.mcp_tunnel_tools.as_deref() {
let tools: Vec<McpTunnelTool> = serde_json::from_str(raw)?;
Some(McpTunnelConfig { tools })
} else {
None
};
let body = CreateSessionRequest {
agent: args.agent.clone(),
agent_mode: args.agent_mode.clone(),
@ -493,6 +519,7 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> {
model: args.model.clone(),
variant: args.variant.clone(),
agent_version: args.agent_version.clone(),
mcp_tunnel,
};
let path = format!("{API_PREFIX}/sessions/{}", args.session_id);
let response = ctx.post(&path, &body)?;
@ -604,6 +631,24 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> {
let response = ctx.post(&path, &body)?;
print_empty_response(response)
}
SessionsCommand::ReplyMcpTunnel(args) => {
let ctx = ClientContext::new(cli, &args.client)?;
let content = match args.content.as_deref() {
Some(value) => Some(serde_json::from_str(value)?),
None => None,
};
let body = McpTunnelToolResponseRequest {
output: args.output.clone(),
is_error: if args.is_error { Some(true) } else { None },
content,
};
let path = format!(
"{API_PREFIX}/sessions/{}/mcp-tunnel/calls/{}/response",
args.session_id, args.call_id
);
let response = ctx.post(&path, &body)?;
print_empty_response(response)
}
}
}

View file

@ -117,6 +117,10 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
"/sessions/:session_id/permissions/:permission_id/reply",
post(reply_permission),
)
.route(
"/sessions/:session_id/mcp-tunnel/calls/:call_id/response",
post(reply_mcp_tunnel_call),
)
.with_state(shared.clone());
if shared.auth.token.is_some() {
@ -128,13 +132,16 @@ pub fn build_router_with_state(shared: Arc<AppState>) -> (Router, Arc<AppState>)
let mut router = Router::new()
.route("/", get(get_root))
.route("/mcp/:session_id", post(mcp_tunnel_request))
.nest("/v1", v1_router)
.fallback(not_found);
if ui::is_enabled() {
router = router.merge(ui::router());
router = router.merge(ui::router().with_state::<Arc<AppState>>(()));
}
let router = router.with_state(shared.clone());
(router.layer(TraceLayer::new_for_http()), shared)
}
@ -158,7 +165,8 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
get_events_sse,
reply_question,
reject_question,
reply_permission
reply_permission,
reply_mcp_tunnel_call
),
components(
schemas(
@ -173,9 +181,12 @@ pub async fn shutdown_servers(state: &Arc<AppState>) {
SessionInfo,
SessionListResponse,
HealthResponse,
McpTunnelTool,
McpTunnelConfig,
CreateSessionRequest,
CreateSessionResponse,
MessageRequest,
McpTunnelToolResponseRequest,
EventsQuery,
TurnStreamQuery,
EventsResponse,
@ -264,6 +275,7 @@ struct SessionState {
events: Vec<UniversalEvent>,
pending_questions: HashMap<String, PendingQuestion>,
pending_permissions: HashMap<String, PendingPermission>,
mcp_tunnel: Option<McpTunnelState>,
item_started: HashSet<String>,
item_delta_seen: HashSet<String>,
item_map: HashMap<String, String>,
@ -291,6 +303,34 @@ struct PendingQuestion {
options: Vec<String>,
}
#[derive(Debug)]
struct McpTunnelState {
tools: Vec<McpTunnelTool>,
pending_calls: HashMap<String, PendingMcpCall>,
}
#[derive(Debug)]
struct PendingMcpCall {
responder: oneshot::Sender<McpTunnelToolResponse>,
}
#[derive(Debug, Clone)]
struct McpTunnelToolResponse {
output: String,
is_error: bool,
content: Option<Value>,
}
impl From<McpTunnelToolResponseRequest> for McpTunnelToolResponse {
fn from(request: McpTunnelToolResponseRequest) -> Self {
Self {
output: request.output,
is_error: request.is_error.unwrap_or(false),
content: request.content,
}
}
}
impl SessionState {
fn new(
session_id: String,
@ -302,6 +342,16 @@ impl SessionState {
request.agent_mode.as_deref(),
request.permission_mode.as_deref(),
)?;
let mut mcp_tool_names = HashSet::new();
if let Some(mcp_tunnel) = request.mcp_tunnel.as_ref() {
for tool in &mcp_tunnel.tools {
if !mcp_tool_names.insert(tool.name.clone()) {
return Err(SandboxError::InvalidRequest {
message: format!("duplicate MCP tool name: {}", tool.name),
});
}
}
}
let (broadcaster, _rx) = broadcast::channel(256);
Ok(Self {
@ -322,6 +372,10 @@ impl SessionState {
events: Vec::new(),
pending_questions: HashMap::new(),
pending_permissions: HashMap::new(),
mcp_tunnel: request.mcp_tunnel.as_ref().map(|config| McpTunnelState {
tools: config.tools.clone(),
pending_calls: HashMap::new(),
}),
item_started: HashSet::new(),
item_delta_seen: HashSet::new(),
item_map: HashMap::new(),
@ -1529,6 +1583,7 @@ impl SessionManager {
model: session.model.clone(),
variant: session.variant.clone(),
native_session_id: None,
mcp_tunnel: session.mcp_tunnel.is_some(),
};
let thread_id = self.create_codex_thread(&session_id, &snapshot).await?;
session.native_session_id = Some(thread_id);
@ -1537,13 +1592,21 @@ impl SessionManager {
session.native_session_id = Some(format!("mock-{session_id}"));
}
let metadata = json!({
let mut metadata = json!({
"agent": request.agent,
"agentMode": session.agent_mode,
"permissionMode": session.permission_mode,
"model": request.model,
"variant": request.variant,
});
if request.mcp_tunnel.is_some() {
if let Some(map) = metadata.as_object_mut() {
map.insert(
"mcpTunnel".to_string(),
json!({ "url": mcp_tunnel_url(&session_id) }),
);
}
}
let started = EventConversion::new(
UniversalEventType::SessionStarted,
UniversalEventData::SessionStarted(SessionStartedData {
@ -2189,6 +2252,222 @@ impl SessionManager {
Ok(())
}
async fn handle_mcp_tunnel_request(
&self,
session_id: &str,
request: McpJsonRpcRequest,
) -> Result<Option<Value>, SandboxError> {
if request.id.is_none() && request.method == "initialized" {
return Ok(None);
}
if let Some(version) = request.jsonrpc.as_deref() {
if version != "2.0" {
return Err(SandboxError::InvalidRequest {
message: format!("unsupported JSON-RPC version: {version}"),
});
}
}
let request_id = request.id.ok_or_else(|| SandboxError::InvalidRequest {
message: "missing JSON-RPC id".to_string(),
})?;
match request.method.as_str() {
"initialize" => {
let protocol_version = request
.params
.as_ref()
.and_then(|params| params.get("protocolVersion"))
.and_then(Value::as_str)
.unwrap_or("2024-11-05");
let result = json!({
"protocolVersion": protocol_version,
"serverInfo": {
"name": "sandbox-agent",
"version": env!("CARGO_PKG_VERSION"),
},
"capabilities": {
"tools": {},
}
});
Ok(Some(mcp_jsonrpc_result(request_id, result)))
}
"tools/list" => {
let tools = self.mcp_tunnel_list_tools(session_id).await?;
let tool_defs = tools
.into_iter()
.map(|tool| {
let mut map = serde_json::Map::new();
map.insert("name".to_string(), Value::String(tool.name));
if let Some(description) = tool.description {
map.insert("description".to_string(), Value::String(description));
}
if let Some(input_schema) = tool.input_schema {
map.insert("inputSchema".to_string(), input_schema);
}
Value::Object(map)
})
.collect::<Vec<_>>();
Ok(Some(mcp_jsonrpc_result(
request_id,
json!({ "tools": tool_defs }),
)))
}
"tools/call" => {
let params = request.params.ok_or_else(|| SandboxError::InvalidRequest {
message: "missing tools/call params".to_string(),
})?;
let tool_name = params
.get("name")
.and_then(Value::as_str)
.ok_or_else(|| SandboxError::InvalidRequest {
message: "missing tools/call name".to_string(),
})?
.to_string();
let arguments = params.get("arguments").cloned().unwrap_or(Value::Null);
let call_id = mcp_call_id_from_request(&request_id)?;
let response = self
.mcp_tunnel_call(session_id, &call_id, tool_name, arguments)
.await?;
let content = response
.content
.clone()
.unwrap_or_else(|| json!([{ "type": "text", "text": response.output }]));
let mut result = json!({ "content": content });
if response.is_error {
if let Some(map) = result.as_object_mut() {
map.insert("isError".to_string(), Value::Bool(true));
}
}
Ok(Some(mcp_jsonrpc_result(request_id, result)))
}
_ => Err(SandboxError::InvalidRequest {
message: format!("unknown MCP method: {}", request.method),
}),
}
}
async fn mcp_tunnel_list_tools(
&self,
session_id: &str,
) -> Result<Vec<McpTunnelTool>, SandboxError> {
let sessions = self.sessions.lock().await;
let session = Self::session_ref(&sessions, session_id).ok_or_else(|| {
SandboxError::SessionNotFound {
session_id: session_id.to_string(),
}
})?;
let tunnel = session.mcp_tunnel.as_ref().ok_or_else(|| SandboxError::InvalidRequest {
message: "MCP tunnel not configured".to_string(),
})?;
Ok(tunnel.tools.clone())
}
async fn mcp_tunnel_call(
&self,
session_id: &str,
call_id: &str,
tool_name: String,
arguments: Value,
) -> Result<McpTunnelToolResponse, SandboxError> {
let receiver = {
let mut sessions = self.sessions.lock().await;
let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| {
SandboxError::SessionNotFound {
session_id: session_id.to_string(),
}
})?;
if let Some(err) = session.ended_error() {
return Err(err);
}
let tunnel = session
.mcp_tunnel
.as_mut()
.ok_or_else(|| SandboxError::InvalidRequest {
message: "MCP tunnel not configured".to_string(),
})?;
if !tunnel.tools.iter().any(|tool| tool.name == tool_name) {
return Err(SandboxError::InvalidRequest {
message: format!("unknown MCP tool: {tool_name}"),
});
}
if tunnel.pending_calls.contains_key(call_id) {
return Err(SandboxError::InvalidRequest {
message: format!("duplicate MCP call id: {call_id}"),
});
}
let (tx, rx) = oneshot::channel();
tunnel.pending_calls.insert(
call_id.to_string(),
PendingMcpCall { responder: tx },
);
rx
};
let tool_call_events = mcp_tool_call_events(call_id, &tool_name, &arguments);
let _ = self.record_conversions(session_id, tool_call_events).await?;
let response = match tokio::time::timeout(Duration::from_secs(120), receiver).await {
Ok(Ok(response)) => response,
Ok(Err(_)) => {
self.remove_mcp_tunnel_call(session_id, call_id).await;
return Err(SandboxError::InvalidRequest {
message: "MCP call response dropped".to_string(),
});
}
Err(_) => {
self.remove_mcp_tunnel_call(session_id, call_id).await;
return Err(SandboxError::InvalidRequest {
message: "MCP call timed out".to_string(),
});
}
};
Ok(response)
}
async fn reply_mcp_tunnel_call(
&self,
session_id: &str,
call_id: &str,
response: McpTunnelToolResponse,
) -> Result<(), SandboxError> {
let pending = {
let mut sessions = self.sessions.lock().await;
let session = Self::session_mut(&mut sessions, session_id).ok_or_else(|| {
SandboxError::SessionNotFound {
session_id: session_id.to_string(),
}
})?;
let tunnel = session
.mcp_tunnel
.as_mut()
.ok_or_else(|| SandboxError::InvalidRequest {
message: "MCP tunnel not configured".to_string(),
})?;
tunnel.pending_calls.remove(call_id).ok_or_else(|| {
SandboxError::InvalidRequest {
message: format!("unknown MCP call id: {call_id}"),
}
})?
};
let _ = pending.responder.send(response.clone());
let tool_result_events = mcp_tool_result_events(call_id, &response);
let _ = self.record_conversions(session_id, tool_result_events).await?;
Ok(())
}
async fn remove_mcp_tunnel_call(&self, session_id: &str, call_id: &str) {
let mut sessions = self.sessions.lock().await;
let session = Self::session_mut(&mut sessions, session_id);
if let Some(session) = session {
if let Some(tunnel) = session.mcp_tunnel.as_mut() {
tunnel.pending_calls.remove(call_id);
}
}
}
/// Gets a session snapshot for sending a new message.
/// Uses the `for_new_message` check which allows agents that support resumption
/// (Claude, Amp, OpenCode) to continue after their process exits successfully.
@ -2232,16 +2511,63 @@ impl SessionManager {
if !trimmed.is_empty() {
conversions.extend(mock_user_message(&prefix, trimmed));
}
conversions.extend(mock_command_conversions(&prefix, trimmed));
let is_mcp_command = trimmed.eq_ignore_ascii_case("mcp");
if !is_mcp_command {
conversions.extend(mock_command_conversions(&prefix, trimmed));
}
let manager = Arc::clone(self);
let events_session_id = session_id.clone();
tokio::spawn(async move {
manager.emit_mock_events(session_id, conversions).await;
manager.emit_mock_events(events_session_id, conversions).await;
});
if is_mcp_command {
let manager = Arc::clone(self);
let session_id = session_id.clone();
let prefix = prefix.clone();
tokio::spawn(async move {
manager.emit_mock_mcp_call(session_id, prefix).await;
});
}
Ok(())
}
async fn emit_mock_mcp_call(&self, session_id: String, prefix: String) {
let tool_name = {
let sessions = self.sessions.lock().await;
let session = Self::session_ref(&sessions, &session_id);
session
.and_then(|session| session.mcp_tunnel.as_ref())
.and_then(|tunnel| tunnel.tools.first())
.map(|tool| tool.name.clone())
};
let Some(tool_name) = tool_name else {
self.record_error(
&session_id,
"mock MCP tool call requested without MCP tunnel".to_string(),
Some("mcp_tunnel".to_string()),
None,
)
.await;
return;
};
let request = McpJsonRpcRequest {
jsonrpc: Some("2.0".to_string()),
id: Some(Value::String(format!("{prefix}_mcp_call"))),
method: "tools/call".to_string(),
params: Some(json!({
"name": tool_name,
"arguments": { "query": "example" }
})),
};
let _ = self.handle_mcp_tunnel_request(&session_id, request).await;
}
async fn emit_mock_events(
self: Arc<Self>,
session_id: String,
@ -3420,6 +3746,22 @@ pub struct HealthResponse {
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct McpTunnelTool {
pub name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub input_schema: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct McpTunnelConfig {
pub tools: Vec<McpTunnelTool>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct CreateSessionRequest {
@ -3434,6 +3776,8 @@ pub struct CreateSessionRequest {
pub variant: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub agent_version: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mcp_tunnel: Option<McpTunnelConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
@ -3452,6 +3796,27 @@ pub struct MessageRequest {
pub message: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct McpTunnelToolResponseRequest {
pub output: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub is_error: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub content: Option<Value>,
}
#[derive(Debug, Clone, Deserialize)]
struct McpJsonRpcRequest {
#[serde(default)]
jsonrpc: Option<String>,
#[serde(default)]
id: Option<Value>,
method: String,
#[serde(default)]
params: Option<Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct EventsQuery {
@ -3589,6 +3954,30 @@ async fn get_root() -> &'static str {
SERVER_INFO
}
async fn mcp_tunnel_request(
State(state): State<Arc<AppState>>,
Path(session_id): Path<String>,
Json(request): Json<McpJsonRpcRequest>,
) -> Result<Response, ApiError> {
let request_id = request.id.clone();
let response = state
.session_manager
.handle_mcp_tunnel_request(&session_id, request)
.await;
match response {
Ok(Some(payload)) => Ok((StatusCode::OK, Json(payload)).into_response()),
Ok(None) => Ok(StatusCode::NO_CONTENT.into_response()),
Err(err) => {
if request_id.is_none() {
return Ok(StatusCode::NO_CONTENT.into_response());
}
let payload = mcp_jsonrpc_error(request_id, mcp_error_code(&err), err.to_string());
Ok((StatusCode::OK, Json(payload)).into_response())
}
}
}
async fn not_found() -> (StatusCode, String) {
(
StatusCode::NOT_FOUND,
@ -3933,6 +4322,33 @@ async fn reply_permission(
Ok(StatusCode::NO_CONTENT)
}
#[utoipa::path(
post,
path = "/v1/sessions/{session_id}/mcp-tunnel/calls/{call_id}/response",
request_body = McpTunnelToolResponseRequest,
responses(
(status = 204, description = "MCP tool call responded"),
(status = 400, body = ProblemDetails),
(status = 404, body = ProblemDetails)
),
params(
("session_id" = String, Path, description = "Session id"),
("call_id" = String, Path, description = "MCP call id")
),
tag = "sessions"
)]
async fn reply_mcp_tunnel_call(
State(state): State<Arc<AppState>>,
Path((session_id, call_id)): Path<(String, String)>,
Json(request): Json<McpTunnelToolResponseRequest>,
) -> Result<StatusCode, ApiError> {
state
.session_manager
.reply_mcp_tunnel_call(&session_id, &call_id, request.into())
.await?;
Ok(StatusCode::NO_CONTENT)
}
fn all_agents() -> [AgentId; 5] {
[
AgentId::Claude,
@ -4313,6 +4729,11 @@ fn build_spawn_options(
.entry("CODEX_API_KEY".to_string())
.or_insert(openai.api_key);
}
if session.mcp_tunnel {
options.env.entry("SANDBOX_AGENT_MCP_TUNNEL_URL".to_string()).or_insert(
mcp_tunnel_url(&session.session_id),
);
}
options
}
@ -5143,6 +5564,7 @@ pub mod test_utils {
model: None,
variant: None,
agent_version: None,
mcp_tunnel: None,
};
let mut session =
SessionState::new(session_id.to_string(), agent, &request).expect("session");
@ -5414,6 +5836,101 @@ fn text_delta_from_parts(parts: &[ContentPart]) -> Option<String> {
}
}
fn mcp_call_id_from_request(id: &Value) -> Result<String, SandboxError> {
if let Some(value) = id.as_str() {
return Ok(value.to_string());
}
if let Some(value) = id.as_i64() {
return Ok(value.to_string());
}
if let Some(value) = id.as_u64() {
return Ok(value.to_string());
}
Err(SandboxError::InvalidRequest {
message: "invalid JSON-RPC id".to_string(),
})
}
fn mcp_arguments_to_string(arguments: &Value) -> String {
if let Some(value) = arguments.as_str() {
return value.to_string();
}
serde_json::to_string(arguments).unwrap_or_else(|_| arguments.to_string())
}
fn mcp_tool_call_events(call_id: &str, tool_name: &str, arguments: &Value) -> Vec<EventConversion> {
let call_id = call_id.to_string();
let arguments = mcp_arguments_to_string(arguments);
let tool_call_part = ContentPart::ToolCall {
name: tool_name.to_string(),
arguments,
call_id: call_id.clone(),
};
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(call_id.clone()),
parent_id: None,
kind: ItemKind::ToolCall,
role: Some(ItemRole::Assistant),
content: vec![tool_call_part.clone()],
status: ItemStatus::InProgress,
};
let completed_item = UniversalItem {
status: ItemStatus::Completed,
..item.clone()
};
vec![
EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
),
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData {
item: completed_item,
}),
),
]
}
fn mcp_tool_result_events(call_id: &str, response: &McpTunnelToolResponse) -> Vec<EventConversion> {
let result_id = format!("{call_id}_result");
let output = response.output.clone();
let tool_result_part = ContentPart::ToolResult {
call_id: call_id.to_string(),
output,
};
let item = UniversalItem {
item_id: String::new(),
native_item_id: Some(result_id),
parent_id: Some(call_id.to_string()),
kind: ItemKind::ToolResult,
role: Some(ItemRole::Tool),
content: vec![tool_result_part.clone()],
status: ItemStatus::InProgress,
};
let completed_item = UniversalItem {
status: if response.is_error {
ItemStatus::Failed
} else {
ItemStatus::Completed
},
..item.clone()
};
vec![
EventConversion::new(
UniversalEventType::ItemStarted,
UniversalEventData::Item(ItemEventData { item }),
),
EventConversion::new(
UniversalEventType::ItemCompleted,
UniversalEventData::Item(ItemEventData {
item: completed_item,
}),
),
]
}
const MOCK_OK_PROMPT: &str = "Reply with exactly the single word OK.";
const MOCK_FIRST_PROMPT: &str = "Reply with exactly the word FIRST.";
const MOCK_SECOND_PROMPT: &str = "Reply with exactly the word SECOND.";
@ -5564,6 +6081,7 @@ fn mock_help_message(prefix: &str) -> Vec<EventConversion> {
"- demo: run a full UI coverage sequence with markers.",
"- markdown: streaming markdown fixture.",
"- tool: tool call + tool result with file refs.",
"- mcp: call MCP tunnel tool.",
"- status: status item updates.",
"- image: message with image content part.",
"- unknown: item.kind=unknown example.",
@ -6311,6 +6829,33 @@ fn to_sse_event(event: UniversalEvent) -> Event {
.unwrap_or_else(|_| Event::default().data("{}"))
}
fn mcp_jsonrpc_result(id: Value, result: Value) -> Value {
json!({
"jsonrpc": "2.0",
"id": id,
"result": result,
})
}
fn mcp_jsonrpc_error(id: Option<Value>, code: i64, message: String) -> Value {
json!({
"jsonrpc": "2.0",
"id": id.unwrap_or(Value::Null),
"error": {
"code": code,
"message": message,
}
})
}
fn mcp_error_code(error: &SandboxError) -> i64 {
match error {
SandboxError::SessionNotFound { .. } => -32001,
SandboxError::InvalidRequest { .. } => -32602,
_ => -32603,
}
}
#[derive(Clone, Debug)]
struct SessionSnapshot {
session_id: String,
@ -6320,6 +6865,7 @@ struct SessionSnapshot {
model: Option<String>,
variant: Option<String>,
native_session_id: Option<String>,
mcp_tunnel: bool,
}
impl From<&SessionState> for SessionSnapshot {
@ -6332,6 +6878,7 @@ impl From<&SessionState> for SessionSnapshot {
model: session.model.clone(),
variant: session.variant.clone(),
native_session_id: session.native_session_id.clone(),
mcp_tunnel: session.mcp_tunnel.is_some(),
}
}
}
@ -6342,3 +6889,9 @@ pub fn add_token_header(headers: &mut HeaderMap, token: &str) {
headers.insert(axum::http::header::AUTHORIZATION, header);
}
}
fn mcp_tunnel_url(session_id: &str) -> String {
let base = std::env::var("SANDBOX_AGENT_MCP_BASE_URL")
.unwrap_or_else(|_| "http://127.0.0.1:2468".to_string());
format!("{}/mcp/{}", base.trim_end_matches('/'), session_id)
}

View file

@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::collections::HashMap;
use std::time::{Duration, Instant};
use axum::body::Body;

View file

@ -0,0 +1,125 @@
include!("../common/http.rs");
fn session_snapshot_suffix(prefix: &str) -> String {
snapshot_name(prefix, Some(AgentId::Mock))
}
fn assert_session_snapshot(prefix: &str, value: Value) {
insta::with_settings!({
snapshot_suffix => session_snapshot_suffix(prefix),
}, {
insta::assert_yaml_snapshot!(value);
});
}
fn has_item_kind(events: &[Value], kind: &str) -> bool {
events.iter().any(|event| {
event
.get("type")
.and_then(Value::as_str)
.is_some_and(|event_type| event_type == "item.completed")
&& event
.get("data")
.and_then(|data| data.get("item"))
.and_then(|item| item.get("kind"))
.and_then(Value::as_str)
.is_some_and(|item_kind| item_kind == kind)
})
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn mcp_tunnel_end_to_end() {
let app = TestApp::new();
let session_id = "mcp-tunnel";
let (status, _created) = send_json(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_id}"),
Some(json!({
"agent": "mock",
"permissionMode": "bypass",
"mcpTunnel": {
"tools": [
{
"name": "private.lookup",
"description": "Lookup data",
"inputSchema": {
"type": "object",
"properties": {
"id": { "type": "string" }
},
"required": ["id"]
}
}
]
}
})),
)
.await;
assert_eq!(status, StatusCode::OK, "create session");
let (status, list_response) = send_json(
&app.app,
Method::POST,
&format!("/mcp/{session_id}"),
Some(json!({
"jsonrpc": "2.0",
"id": "list",
"method": "tools/list"
})),
)
.await;
assert_eq!(status, StatusCode::OK, "list tools");
let tools = list_response
.get("result")
.and_then(|result| result.get("tools"))
.and_then(Value::as_array)
.cloned()
.unwrap_or_default();
assert_eq!(tools.len(), 1, "tools list length");
let call_request = json!({
"jsonrpc": "2.0",
"id": "call-1",
"method": "tools/call",
"params": {
"name": "private.lookup",
"arguments": { "id": "123" }
}
});
let app_clone = app.app.clone();
let call_task = tokio::spawn(async move {
send_json(
&app_clone,
Method::POST,
&format!("/mcp/{session_id}"),
Some(call_request),
)
.await
});
let _ = poll_events_until_match(&app.app, session_id, Duration::from_secs(10), |events| {
has_item_kind(events, "tool_call")
})
.await;
let status = send_status(
&app.app,
Method::POST,
&format!("/v1/sessions/{session_id}/mcp-tunnel/calls/call-1/response"),
Some(json!({ "output": "lookup ok" })),
)
.await;
assert_eq!(status, StatusCode::NO_CONTENT, "reply mcp tunnel");
let (status, call_response) = call_task.await.expect("call task");
assert_eq!(status, StatusCode::OK, "mcp call response");
assert!(call_response.get("result").is_some(), "mcp result missing");
let events = poll_events_until_match(&app.app, session_id, Duration::from_secs(10), |events| {
has_item_kind(events, "tool_result")
})
.await;
assert_session_snapshot("mcp_tunnel", normalize_events(&events));
}

View file

@ -1,4 +1,5 @@
mod multi_turn;
mod mcp_tunnel;
mod permissions;
mod questions;
mod reasoning;

View file

@ -0,0 +1,40 @@
---
source: server/packages/sandbox-agent/tests/sessions/mcp_tunnel.rs
expression: value
---
- metadata: true
seq: 1
session: started
type: session.started
- item:
content_types:
- tool_call
kind: tool_call
role: assistant
status: in_progress
seq: 2
type: item.started
- item:
content_types:
- tool_call
kind: tool_call
role: assistant
status: completed
seq: 3
type: item.completed
- item:
content_types:
- tool_result
kind: tool_result
role: tool
status: in_progress
seq: 4
type: item.started
- item:
content_types:
- tool_result
kind: tool_result
role: tool
status: completed
seq: 5
type: item.completed