- Session
+ {sessionId ? "Session" : "No Session"}
{sessionId && {sessionId}}
+ {sessionId && {agentLabel}}
+
+
+
+
+
+
- {polling &&
Live}
@@ -109,6 +156,7 @@ const ChatPanel = ({
)}
@@ -135,27 +183,24 @@ const ChatPanel = ({
onSendMessage={onSendMessage}
onKeyDown={onKeyDown}
placeholder={sessionId ? "Send a message..." : "Select or create a session first"}
- disabled={!sessionId}
+ disabled={!sessionId || turnStreaming}
/>
);
diff --git a/frontend/packages/inspector/src/components/chat/ChatSetup.tsx b/frontend/packages/inspector/src/components/chat/ChatSetup.tsx
index 00f7341..0a0eb97 100644
--- a/frontend/packages/inspector/src/components/chat/ChatSetup.tsx
+++ b/frontend/packages/inspector/src/components/chat/ChatSetup.tsx
@@ -1,60 +1,53 @@
-import { PauseCircle, PlayCircle } from "lucide-react";
import type { AgentModeInfo } from "sandbox-agent";
const ChatSetup = ({
- agentId,
+ agentLabel,
agentMode,
permissionMode,
model,
variant,
- streamMode,
- polling,
- availableAgents,
activeModes,
currentAgentVersion,
- onAgentChange,
+ hasSession,
+ modesLoading,
+ modesError,
onAgentModeChange,
onPermissionModeChange,
onModelChange,
- onVariantChange,
- onStreamModeChange,
- onToggleStream
+ onVariantChange
}: {
- agentId: string;
+ agentLabel: string;
agentMode: string;
permissionMode: string;
model: string;
variant: string;
- streamMode: "poll" | "sse";
- polling: boolean;
- availableAgents: string[];
activeModes: AgentModeInfo[];
currentAgentVersion?: string | null;
- onAgentChange: (value: string) => void;
+ hasSession: boolean;
+ modesLoading: boolean;
+ modesError: string | null;
onAgentModeChange: (value: string) => void;
onPermissionModeChange: (value: string) => void;
onModelChange: (value: string) => void;
onVariantChange: (value: string) => void;
- onStreamModeChange: (value: "poll" | "sse") => void;
- onToggleStream: () => void;
}) => {
+ const agentVersionLabel = currentAgentVersion
+ ? `${agentLabel} v${currentAgentVersion}`
+ : agentLabel;
return (
-
-
diff --git a/frontend/packages/inspector/src/components/debug/AgentsTab.tsx b/frontend/packages/inspector/src/components/debug/AgentsTab.tsx
index 6485c62..545b734 100644
--- a/frontend/packages/inspector/src/components/debug/AgentsTab.tsx
+++ b/frontend/packages/inspector/src/components/debug/AgentsTab.tsx
@@ -8,23 +8,31 @@ const AgentsTab = ({
defaultAgents,
modesByAgent,
onRefresh,
- onInstall
+ onInstall,
+ loading,
+ error
}: {
agents: AgentInfo[];
defaultAgents: string[];
modesByAgent: Record
;
onRefresh: () => void;
onInstall: (agentId: string, reinstall: boolean) => void;
+ loading: boolean;
+ error: string | null;
}) => {
return (
<>
-
- {agents.length === 0 && No agents reported. Click refresh to check.
}
+ {error && {error}
}
+ {loading && Loading agents...
}
+ {!loading && agents.length === 0 && (
+ No agents reported. Click refresh to check.
+ )}
{(agents.length
? agents
diff --git a/frontend/packages/inspector/src/components/debug/DebugPanel.tsx b/frontend/packages/inspector/src/components/debug/DebugPanel.tsx
index ff44e73..7a1beba 100644
--- a/frontend/packages/inspector/src/components/debug/DebugPanel.tsx
+++ b/frontend/packages/inspector/src/components/debug/DebugPanel.tsx
@@ -14,6 +14,8 @@ const DebugPanel = ({
offset,
onFetchEvents,
onResetEvents,
+ eventsLoading,
+ eventsError,
requestLog,
copiedLogId,
onClearRequestLog,
@@ -22,7 +24,9 @@ const DebugPanel = ({
defaultAgents,
modesByAgent,
onRefreshAgents,
- onInstallAgent
+ onInstallAgent,
+ agentsLoading,
+ agentsError
}: {
debugTab: DebugTab;
onDebugTabChange: (tab: DebugTab) => void;
@@ -30,6 +34,8 @@ const DebugPanel = ({
offset: number;
onFetchEvents: () => void;
onResetEvents: () => void;
+ eventsLoading: boolean;
+ eventsError: string | null;
requestLog: RequestLog[];
copiedLogId: number | null;
onClearRequestLog: () => void;
@@ -39,6 +45,8 @@ const DebugPanel = ({
modesByAgent: Record;
onRefreshAgents: () => void;
onInstallAgent: (agentId: string, reinstall: boolean) => void;
+ agentsLoading: boolean;
+ agentsError: string | null;
}) => {
return (
@@ -69,7 +77,14 @@ const DebugPanel = ({
)}
{debugTab === "events" && (
-
+
)}
{debugTab === "agents" && (
@@ -79,6 +94,8 @@ const DebugPanel = ({
modesByAgent={modesByAgent}
onRefresh={onRefreshAgents}
onInstall={onInstallAgent}
+ loading={agentsLoading}
+ error={agentsError}
/>
)}
diff --git a/frontend/packages/inspector/src/components/debug/EventsTab.tsx b/frontend/packages/inspector/src/components/debug/EventsTab.tsx
index 9f992d6..0f68fdc 100644
--- a/frontend/packages/inspector/src/components/debug/EventsTab.tsx
+++ b/frontend/packages/inspector/src/components/debug/EventsTab.tsx
@@ -8,12 +8,16 @@ const EventsTab = ({
events,
offset,
onFetch,
- onClear
+ onClear,
+ loading,
+ error
}: {
events: UniversalEvent[];
offset: number;
onFetch: () => void;
onClear: () => void;
+ loading: boolean;
+ error: string | null;
}) => {
const [collapsedEvents, setCollapsedEvents] = useState>({});
@@ -28,8 +32,8 @@ const EventsTab = ({
Offset: {offset}
-
- Fetch
+
+ {loading ? "Loading..." : "Fetch"}
Clear
@@ -37,8 +41,12 @@ const EventsTab = ({
+ {error && {error}
}
+
{events.length === 0 ? (
- No events yet. Start streaming to receive events.
+
+ {loading ? "Loading events..." : "No events yet. Start streaming to receive events."}
+
) : (
{[...events].reverse().map((event) => {
diff --git a/research/agents/codex.md b/research/agents/codex.md
index 2686a4a..b0e4098 100644
--- a/research/agents/codex.md
+++ b/research/agents/codex.md
@@ -254,6 +254,70 @@ Codex output is converted via `convertCodexOutput()`:
- Use `resumeThread(threadId)` to continue conversation
- Thread ID is captured from `thread.started` event or thread object
+## Shared App-Server Architecture (Daemon Implementation)
+
+The sandbox daemon uses a **single shared Codex app-server process** to handle multiple sessions, similar to OpenCode's server model. This differs from Claude/Amp which spawn a new process per turn.
+
+### Architecture Comparison
+
+| Agent | Model | Process Lifetime | Session ID |
+|-------|-------|------------------|------------|
+| Claude | Subprocess | Per-turn (killed on TurnCompleted) | `--resume` flag |
+| Amp | Subprocess | Per-turn | `--continue` flag |
+| OpenCode | HTTP Server | Daemon lifetime | Session ID via API |
+| **Codex** | **Stdio Server** | **Daemon lifetime** | **Thread ID via JSON-RPC** |
+
+### Daemon Flow
+
+1. **First Codex session created**: Spawns `codex app-server` process, performs `initialize`/`initialized` handshake
+2. **Session creation**: Sends `thread/start` request, captures `thread_id` as `native_session_id`
+3. **Message sent**: Sends `turn/start` request with `thread_id`, streams notifications back to session
+4. **Multi-turn**: Reuses same `thread_id`, process stays alive, no respawn needed
+5. **Daemon shutdown**: Process terminated with daemon
+
+### Why This Approach?
+
+1. **Performance**: No process spawn overhead per message
+2. **Multi-turn support**: Thread persists in server memory, no resume needed
+3. **Consistent with OpenCode**: Similar server-based pattern reduces code complexity
+4. **API alignment**: Matches Codex's intended app-server usage pattern
+
+### Protocol Details
+
+The shared server uses JSON-RPC 2.0 for request/response correlation:
+
+```
+Daemon Codex App-Server
+ | |
+ |-- initialize {id: 1} ------------>|
+ |<-- response {id: 1} --------------|
+ |-- initialized (notification) ---->|
+ | |
+ |-- thread/start {id: 2} ---------->|
+ |<-- response {id: 2, thread.id} ---|
+ |<-- thread/started (notification) -|
+ | |
+ |-- turn/start {id: 3, threadId} -->|
+ |<-- turn/started (notification) ---|
+ |<-- item/* (notifications) --------|
+ |<-- turn/completed (notification) -|
+```
+
+### Thread-to-Session Routing
+
+Notifications are routed to the correct session by extracting `threadId` from each notification:
+
+```rust
+fn codex_thread_id_from_server_notification(notification) -> Option
{
+ // All thread-scoped notifications include threadId field
+ match notification {
+ TurnStarted(params) => Some(params.thread_id),
+ ItemCompleted(params) => Some(params.thread_id),
+ // ... etc
+ }
+}
+```
+
## Notes
- SDK is dynamically imported to reduce bundle size
diff --git a/scripts/release/main.ts b/scripts/release/main.ts
index 179e756..b0558c1 100755
--- a/scripts/release/main.ts
+++ b/scripts/release/main.ts
@@ -148,6 +148,19 @@ function isStable(version: string) {
return parseSemver(version).prerelease.length === 0;
}
+function getNpmTag(version: string, latest: boolean) {
+ if (latest) return null;
+ const prerelease = parseSemver(version).prerelease;
+ if (prerelease.length === 0) {
+ return "next";
+ }
+ const hasRc = prerelease.some((part) => part.toLowerCase().startsWith("rc"));
+ if (hasRc) {
+ return "rc";
+ }
+ throw new Error(`Prerelease versions must use rc tag when not latest: ${version}`);
+}
+
function getAllGitVersions() {
try {
execFileSync("git", ["fetch", "--tags", "--force", "--quiet"], {
@@ -411,18 +424,22 @@ function publishCrates(rootDir: string, version: string) {
}
}
-function publishNpmSdk(rootDir: string, version: string) {
+function publishNpmSdk(rootDir: string, version: string, latest: boolean) {
const sdkDir = path.join(rootDir, "sdks", "typescript");
console.log("==> Publishing TypeScript SDK to npm");
+ const npmTag = getNpmTag(version, latest);
run("npm", ["version", version, "--no-git-tag-version", "--allow-same-version"], { cwd: sdkDir });
run("pnpm", ["install"], { cwd: sdkDir });
run("pnpm", ["run", "build"], { cwd: sdkDir });
- run("npm", ["publish", "--access", "public"], { cwd: sdkDir });
+ const publishArgs = ["publish", "--access", "public"];
+ if (npmTag) publishArgs.push("--tag", npmTag);
+ run("npm", publishArgs, { cwd: sdkDir });
}
-function publishNpmCli(rootDir: string, version: string) {
+function publishNpmCli(rootDir: string, version: string, latest: boolean) {
const cliDir = path.join(rootDir, "sdks", "cli");
const distDir = path.join(rootDir, "dist");
+ const npmTag = getNpmTag(version, latest);
for (const [target, info] of Object.entries(PLATFORM_MAP)) {
const platformDir = path.join(cliDir, "platforms", info.pkg);
@@ -436,7 +453,9 @@ function publishNpmCli(rootDir: string, version: string) {
console.log(`==> Publishing @sandbox-agent/cli-${info.pkg}`);
run("npm", ["version", version, "--no-git-tag-version", "--allow-same-version"], { cwd: platformDir });
- run("npm", ["publish", "--access", "public"], { cwd: platformDir });
+ const publishArgs = ["publish", "--access", "public"];
+ if (npmTag) publishArgs.push("--tag", npmTag);
+ run("npm", publishArgs, { cwd: platformDir });
}
console.log("==> Publishing @sandbox-agent/cli");
@@ -447,7 +466,9 @@ function publishNpmCli(rootDir: string, version: string) {
pkg.optionalDependencies[dep] = version;
}
fs.writeFileSync(pkgPath, JSON.stringify(pkg, null, 2) + "\n");
- run("npm", ["publish", "--access", "public"], { cwd: cliDir });
+ const publishArgs = ["publish", "--access", "public"];
+ if (npmTag) publishArgs.push("--tag", npmTag);
+ run("npm", publishArgs, { cwd: cliDir });
}
function validateGit(rootDir: string) {
@@ -542,10 +563,10 @@ async function main() {
publishCrates(rootDir, version);
}
if (flags.has("--publish-npm-sdk")) {
- publishNpmSdk(rootDir, version);
+ publishNpmSdk(rootDir, version, latest);
}
if (flags.has("--publish-npm-cli")) {
- publishNpmCli(rootDir, version);
+ publishNpmCli(rootDir, version, latest);
}
if (flags.has("--upload-typescript")) {
uploadTypescriptArtifacts(rootDir, version, latest);
@@ -626,11 +647,11 @@ async function main() {
}
if (shouldRun("publish-npm-sdk")) {
- publishNpmSdk(rootDir, version);
+ publishNpmSdk(rootDir, version, latest);
}
if (shouldRun("publish-npm-cli")) {
- publishNpmCli(rootDir, version);
+ publishNpmCli(rootDir, version, latest);
}
if (shouldRun("upload-typescript")) {
diff --git a/sdks/typescript/src/client.ts b/sdks/typescript/src/client.ts
index 5ad325d..2e96cf8 100644
--- a/sdks/typescript/src/client.ts
+++ b/sdks/typescript/src/client.ts
@@ -13,6 +13,7 @@ import type {
ProblemDetails,
QuestionReplyRequest,
SessionListResponse,
+ TurnStreamQuery,
UniversalEvent,
} from "./types.ts";
@@ -142,45 +143,37 @@ export class SandboxAgent {
});
}
+ async postMessageStream(
+ sessionId: string,
+ request: MessageRequest,
+ query?: TurnStreamQuery,
+ signal?: AbortSignal,
+ ): Promise {
+ return this.requestRaw("POST", `${API_PREFIX}/sessions/${encodeURIComponent(sessionId)}/messages/stream`, {
+ query,
+ body: request,
+ accept: "text/event-stream",
+ signal,
+ });
+ }
+
async *streamEvents(
sessionId: string,
query?: EventsQuery,
signal?: AbortSignal,
): AsyncGenerator {
const response = await this.getEventsSse(sessionId, query, signal);
- if (!response.body) {
- throw new Error("SSE stream is not readable in this environment.");
- }
+ yield* this.parseSseStream(response);
+ }
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let buffer = "";
-
- while (true) {
- const { done, value } = await reader.read();
- if (done) {
- break;
- }
- // Normalize CRLF to LF for consistent parsing
- buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n");
- let index = buffer.indexOf("\n\n");
- while (index !== -1) {
- const chunk = buffer.slice(0, index);
- buffer = buffer.slice(index + 2);
- const dataLines = chunk
- .split("\n")
- .filter((line) => line.startsWith("data:"));
- if (dataLines.length > 0) {
- const payload = dataLines
- .map((line) => line.slice(5).trim())
- .join("\n");
- if (payload) {
- yield JSON.parse(payload) as UniversalEvent;
- }
- }
- index = buffer.indexOf("\n\n");
- }
- }
+ async *streamTurn(
+ sessionId: string,
+ request: MessageRequest,
+ query?: TurnStreamQuery,
+ signal?: AbortSignal,
+ ): AsyncGenerator {
+ const response = await this.postMessageStream(sessionId, request, query, signal);
+ yield* this.parseSseStream(response);
}
async replyQuestion(
@@ -297,6 +290,42 @@ export class SandboxAgent {
return undefined;
}
}
+
+ private async *parseSseStream(response: Response): AsyncGenerator {
+ if (!response.body) {
+ throw new Error("SSE stream is not readable in this environment.");
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let buffer = "";
+
+ while (true) {
+ const { done, value } = await reader.read();
+ if (done) {
+ break;
+ }
+ // Normalize CRLF to LF for consistent parsing
+ buffer += decoder.decode(value, { stream: true }).replace(/\r\n/g, "\n");
+ let index = buffer.indexOf("\n\n");
+ while (index !== -1) {
+ const chunk = buffer.slice(0, index);
+ buffer = buffer.slice(index + 2);
+ const dataLines = chunk
+ .split("\n")
+ .filter((line) => line.startsWith("data:"));
+ if (dataLines.length > 0) {
+ const payload = dataLines
+ .map((line) => line.slice(5).trim())
+ .join("\n");
+ if (payload) {
+ yield JSON.parse(payload) as UniversalEvent;
+ }
+ }
+ index = buffer.indexOf("\n\n");
+ }
+ }
+ }
}
const normalizeSpawnOptions = (
diff --git a/sdks/typescript/src/generated/openapi.ts b/sdks/typescript/src/generated/openapi.ts
index 3d7eb62..bbdc5ea 100644
--- a/sdks/typescript/src/generated/openapi.ts
+++ b/sdks/typescript/src/generated/openapi.ts
@@ -32,6 +32,9 @@ export interface paths {
"/v1/sessions/{session_id}/messages": {
post: operations["post_message"];
};
+ "/v1/sessions/{session_id}/messages/stream": {
+ post: operations["post_message_stream"];
+ };
"/v1/sessions/{session_id}/permissions/{permission_id}/reply": {
post: operations["reply_permission"];
};
@@ -258,6 +261,9 @@ export interface components {
};
/** @enum {string} */
TerminatedBy: "agent" | "daemon";
+ TurnStreamQuery: {
+ includeRaw?: boolean | null;
+ };
UniversalEvent: {
data: components["schemas"]["UniversalEventData"];
event_id: string;
@@ -480,6 +486,34 @@ export interface operations {
};
};
};
+ post_message_stream: {
+ parameters: {
+ query?: {
+ /** @description Include raw provider payloads */
+ include_raw?: boolean | null;
+ };
+ path: {
+ /** @description Session id */
+ session_id: string;
+ };
+ };
+ requestBody: {
+ content: {
+ "application/json": components["schemas"]["MessageRequest"];
+ };
+ };
+ responses: {
+ /** @description SSE event stream */
+ 200: {
+ content: never;
+ };
+ 404: {
+ content: {
+ "application/json": components["schemas"]["ProblemDetails"];
+ };
+ };
+ };
+ };
reply_permission: {
parameters: {
path: {
diff --git a/sdks/typescript/src/index.ts b/sdks/typescript/src/index.ts
index 8bc9f91..e734f42 100644
--- a/sdks/typescript/src/index.ts
+++ b/sdks/typescript/src/index.ts
@@ -41,6 +41,7 @@ export type {
SessionListResponse,
SessionStartedData,
TerminatedBy,
+ TurnStreamQuery,
UniversalEvent,
UniversalEventData,
UniversalEventType,
diff --git a/sdks/typescript/src/types.ts b/sdks/typescript/src/types.ts
index ebdf04f..e0c43df 100644
--- a/sdks/typescript/src/types.ts
+++ b/sdks/typescript/src/types.ts
@@ -39,6 +39,7 @@ export type SessionInfo = S["SessionInfo"];
export type SessionListResponse = S["SessionListResponse"];
export type SessionStartedData = S["SessionStartedData"];
export type TerminatedBy = S["TerminatedBy"];
+export type TurnStreamQuery = S["TurnStreamQuery"];
export type UniversalEvent = S["UniversalEvent"];
export type UniversalEventData = S["UniversalEventData"];
export type UniversalEventType = S["UniversalEventType"];
diff --git a/sdks/typescript/tests/client.test.ts b/sdks/typescript/tests/client.test.ts
index 8aab690..f7d6314 100644
--- a/sdks/typescript/tests/client.test.ts
+++ b/sdks/typescript/tests/client.test.ts
@@ -164,6 +164,31 @@ describe("SandboxAgent", () => {
});
});
+ describe("postMessageStream", () => {
+ it("posts message and requests SSE", async () => {
+ const mockFetch = vi.fn().mockResolvedValue(
+ new Response("", {
+ status: 200,
+ headers: { "Content-Type": "text/event-stream" },
+ })
+ );
+ const client = await SandboxAgent.connect({
+ baseUrl: "http://localhost:8080",
+ fetch: mockFetch,
+ });
+
+ await client.postMessageStream("test-session", { message: "Hello" }, { includeRaw: true });
+
+ expect(mockFetch).toHaveBeenCalledWith(
+ "http://localhost:8080/v1/sessions/test-session/messages/stream?includeRaw=true",
+ expect.objectContaining({
+ method: "POST",
+ body: JSON.stringify({ message: "Hello" }),
+ })
+ );
+ });
+ });
+
describe("getEvents", () => {
it("returns events", async () => {
const events = { events: [], hasMore: false };
diff --git a/server/packages/sandbox-agent/src/main.rs b/server/packages/sandbox-agent/src/main.rs
index ab5ba71..972c697 100644
--- a/server/packages/sandbox-agent/src/main.rs
+++ b/server/packages/sandbox-agent/src/main.rs
@@ -122,6 +122,9 @@ enum SessionsCommand {
#[command(name = "send-message")]
/// Send a message to an existing session.
SendMessage(SessionMessageArgs),
+ #[command(name = "send-message-stream")]
+ /// Send a message and stream the response for one turn.
+ SendMessageStream(SessionMessageStreamArgs),
#[command(name = "terminate")]
/// Terminate a session.
Terminate(SessionTerminateArgs),
@@ -195,6 +198,17 @@ struct SessionMessageArgs {
client: ClientArgs,
}
+#[derive(Args, Debug)]
+struct SessionMessageStreamArgs {
+ session_id: String,
+ #[arg(long, short = 'm')]
+ message: String,
+ #[arg(long)]
+ include_raw: bool,
+ #[command(flatten)]
+ client: ClientArgs,
+}
+
#[derive(Args, Debug)]
struct SessionEventsArgs {
session_id: String,
@@ -443,6 +457,22 @@ fn run_sessions(command: &SessionsCommand, cli: &Cli) -> Result<(), CliError> {
let response = ctx.post(&path, &body)?;
print_empty_response(response)
}
+ SessionsCommand::SendMessageStream(args) => {
+ let ctx = ClientContext::new(cli, &args.client)?;
+ let body = MessageRequest {
+ message: args.message.clone(),
+ };
+ let path = format!("{API_PREFIX}/sessions/{}/messages/stream", args.session_id);
+ let response = ctx.post_with_query(
+ &path,
+ &body,
+ &[(
+ "include_raw",
+ if args.include_raw { Some("true".to_string()) } else { None },
+ )],
+ )?;
+ print_text_response(response)
+ }
SessionsCommand::Terminate(args) => {
let ctx = ClientContext::new(cli, &args.client)?;
let path = format!("{API_PREFIX}/sessions/{}/terminate", args.session_id);
@@ -850,6 +880,21 @@ impl ClientContext {
Ok(self.request(Method::POST, path).json(body).send()?)
}
+ fn post_with_query(
+ &self,
+ path: &str,
+ body: &T,
+ query: &[(&str, Option)],
+ ) -> Result {
+ let mut request = self.request(Method::POST, path).json(body);
+ for (key, value) in query {
+ if let Some(value) = value {
+ request = request.query(&[(key, value)]);
+ }
+ }
+ Ok(request.send()?)
+ }
+
fn post_empty(&self, path: &str) -> Result {
Ok(self.request(Method::POST, path).send()?)
}
diff --git a/server/packages/sandbox-agent/src/router.rs b/server/packages/sandbox-agent/src/router.rs
index 5e18db2..b3fc715 100644
--- a/server/packages/sandbox-agent/src/router.rs
+++ b/server/packages/sandbox-agent/src/router.rs
@@ -1,8 +1,9 @@
-use std::collections::{HashMap, HashSet};
+use std::collections::{HashMap, HashSet, VecDeque};
use std::convert::Infallible;
use std::io::{BufRead, BufReader, Write};
use std::net::TcpListener;
use std::process::Stdio;
+use std::sync::atomic::{AtomicI64, Ordering};
use std::sync::Arc;
use std::time::Duration;
@@ -28,7 +29,7 @@ use sandbox_agent_universal_agent_schema::{
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
-use tokio::sync::{broadcast, mpsc, Mutex};
+use tokio::sync::{broadcast, mpsc, oneshot, Mutex};
use tokio::time::sleep;
use tokio_stream::wrappers::BroadcastStream;
use tower_http::trace::TraceLayer;
@@ -89,6 +90,7 @@ pub fn build_router(state: AppState) -> Router {
.route("/sessions", get(list_sessions))
.route("/sessions/:session_id", post(create_session))
.route("/sessions/:session_id/messages", post(post_message))
+ .route("/sessions/:session_id/messages/stream", post(post_message_stream))
.route("/sessions/:session_id/terminate", post(terminate_session))
.route("/sessions/:session_id/events", get(get_events))
.route("/sessions/:session_id/events/sse", get(get_events_sse))
@@ -129,6 +131,7 @@ pub fn build_router(state: AppState) -> Router {
list_sessions,
create_session,
post_message,
+ post_message_stream,
terminate_session,
get_events,
get_events_sse,
@@ -151,6 +154,7 @@ pub fn build_router(state: AppState) -> Router {
CreateSessionResponse,
MessageRequest,
EventsQuery,
+ TurnStreamQuery,
EventsResponse,
UniversalEvent,
UniversalEventData,
@@ -488,6 +492,14 @@ impl SessionState {
}
fn ended_error(&self) -> Option {
+ self.ended_error_for_messages(false)
+ }
+
+ /// Returns an error if the session cannot accept new messages.
+ /// `for_new_message` should be true when checking before sending a new message -
+ /// this allows agents that support resumption (Claude, Amp, OpenCode) to continue
+ /// after their process exits successfully.
+ fn ended_error_for_messages(&self, for_new_message: bool) -> Option {
if !self.ended {
return None;
}
@@ -496,6 +508,15 @@ impl SessionState {
message: "session terminated".to_string(),
});
}
+ // For agents that support resumption (Claude, Amp, OpenCode), allow new messages
+ // after the process exits with success (Completed reason). The new message will
+ // spawn a fresh process with --resume/--continue to continue the conversation.
+ if for_new_message
+ && matches!(self.ended_reason, Some(SessionEndReason::Completed))
+ && agent_supports_resume(self.agent)
+ {
+ return None;
+ }
Some(SandboxError::AgentProcessExited {
agent: self.agent.as_str().to_string(),
exit_code: self.ended_exit_code,
@@ -542,8 +563,9 @@ impl SessionState {
#[derive(Debug)]
struct SessionManager {
agent_manager: Arc,
- sessions: Mutex>,
+ sessions: Mutex>,
opencode_server: Mutex