feat(opencode): add SSE event replay with Last-Event-ID support

This commit is contained in:
Nathan Flurry 2026-02-07 12:57:58 -08:00
parent 52f5d07185
commit 783e2d6692
19 changed files with 337 additions and 69 deletions

View file

@ -410,7 +410,9 @@ pub fn stop(host: &str, port: u16) -> Result<(), CliError> {
// No PID file - but check if daemon is actually running via health check
// This can happen if PID file was deleted but daemon is still running
if check_health(&base_url, None)? {
eprintln!("daemon is running but PID file missing; finding process on port {port}...");
eprintln!(
"daemon is running but PID file missing; finding process on port {port}..."
);
if let Some(pid) = find_process_on_port(port) {
eprintln!("found daemon process {pid}");
return stop_process(pid, host, port, &pid_path);

View file

@ -4,11 +4,12 @@
//! stubbed responses with deterministic helpers for snapshot testing. A minimal
//! in-memory state tracks sessions/messages/ptys to keep behavior coherent.
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::convert::Infallible;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::Mutex as StdMutex;
use axum::body::Body;
use axum::extract::{Path, Query, State};
@ -45,10 +46,18 @@ static MESSAGE_COUNTER: AtomicU64 = AtomicU64::new(1);
static PART_COUNTER: AtomicU64 = AtomicU64::new(1);
static PTY_COUNTER: AtomicU64 = AtomicU64::new(1);
static PROJECT_COUNTER: AtomicU64 = AtomicU64::new(1);
const OPENCODE_EVENT_CHANNEL_SIZE: usize = 2048;
const OPENCODE_EVENT_LOG_SIZE: usize = 4096;
const OPENCODE_DEFAULT_MODEL_ID: &str = "mock";
const OPENCODE_DEFAULT_PROVIDER_ID: &str = "mock";
const OPENCODE_DEFAULT_AGENT_MODE: &str = "build";
#[derive(Clone, Debug)]
struct OpenCodeStreamEvent {
id: u64,
payload: Value,
}
#[derive(Clone, Debug)]
struct OpenCodeCompatConfig {
fixed_time_ms: Option<i64>,
@ -278,13 +287,15 @@ pub struct OpenCodeState {
questions: Mutex<HashMap<String, OpenCodeQuestionRecord>>,
session_runtime: Mutex<HashMap<String, OpenCodeSessionRuntime>>,
session_streams: Mutex<HashMap<String, bool>>,
event_broadcaster: broadcast::Sender<Value>,
event_broadcaster: broadcast::Sender<OpenCodeStreamEvent>,
event_log: StdMutex<VecDeque<OpenCodeStreamEvent>>,
next_event_id: AtomicU64,
model_cache: Mutex<Option<OpenCodeModelCache>>,
}
impl OpenCodeState {
pub fn new() -> Self {
let (event_broadcaster, _) = broadcast::channel(256);
let (event_broadcaster, _) = broadcast::channel(OPENCODE_EVENT_CHANNEL_SIZE);
let project_id = format!("proj_{}", PROJECT_COUNTER.fetch_add(1, Ordering::Relaxed));
Self {
config: OpenCodeCompatConfig::from_env(),
@ -297,16 +308,44 @@ impl OpenCodeState {
session_runtime: Mutex::new(HashMap::new()),
session_streams: Mutex::new(HashMap::new()),
event_broadcaster,
event_log: StdMutex::new(VecDeque::new()),
next_event_id: AtomicU64::new(1),
model_cache: Mutex::new(None),
}
}
pub fn subscribe(&self) -> broadcast::Receiver<Value> {
fn subscribe(&self) -> broadcast::Receiver<OpenCodeStreamEvent> {
self.event_broadcaster.subscribe()
}
pub fn emit_event(&self, event: Value) {
let _ = self.event_broadcaster.send(event);
let stream_event = OpenCodeStreamEvent {
id: self.next_event_id.fetch_add(1, Ordering::Relaxed),
payload: event,
};
if let Ok(mut log) = self.event_log.lock() {
log.push_back(stream_event.clone());
if log.len() > OPENCODE_EVENT_LOG_SIZE {
let overflow = log.len() - OPENCODE_EVENT_LOG_SIZE;
for _ in 0..overflow {
let _ = log.pop_front();
}
}
}
let _ = self.event_broadcaster.send(stream_event);
}
fn buffered_events_after(&self, last_event_id: Option<u64>) -> Vec<OpenCodeStreamEvent> {
let Some(last_event_id) = last_event_id else {
return Vec::new();
};
let Ok(log) = self.event_log.lock() else {
return Vec::new();
};
log.iter()
.filter(|event| event.id > last_event_id)
.cloned()
.collect()
}
fn now_ms(&self) -> i64 {
@ -2986,6 +3025,13 @@ async fn oc_config_providers(State(state): State<Arc<OpenCodeAppState>>) -> impl
(StatusCode::OK, Json(providers))
}
fn parse_last_event_id(headers: &HeaderMap) -> Option<u64> {
headers
.get("last-event-id")
.and_then(|value| value.to_str().ok())
.and_then(|value| value.trim().parse::<u64>().ok())
}
#[utoipa::path(
get,
path = "/event",
@ -2997,6 +3043,7 @@ async fn oc_event_subscribe(
headers: HeaderMap,
Query(query): Query<DirectoryQuery>,
) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
let last_event_id = parse_last_event_id(&headers);
let receiver = state.opencode.subscribe();
let directory = state
.opencode
@ -3013,35 +3060,61 @@ async fn oc_event_subscribe(
"branch": branch,
}
}));
let replay_events = state.opencode.buffered_events_after(last_event_id);
let replay_cursor = replay_events
.last()
.map(|event| event.id)
.or(last_event_id)
.unwrap_or(0);
let heartbeat_payload = json!({
"type": "server.heartbeat",
"properties": {}
});
let stream = stream::unfold(
(receiver, interval(std::time::Duration::from_secs(30))),
move |(mut rx, mut ticker)| {
(
receiver,
interval(std::time::Duration::from_secs(30)),
VecDeque::from(replay_events),
replay_cursor,
),
move |(mut rx, mut ticker, mut replay, replay_cursor)| {
let heartbeat = heartbeat_payload.clone();
async move {
tokio::select! {
_ = ticker.tick() => {
let sse_event = Event::default()
.json_data(&heartbeat)
.unwrap_or_else(|_| Event::default().data("{}"));
Some((Ok(sse_event), (rx, ticker)))
}
event = rx.recv() => {
match event {
Ok(event) => {
let sse_event = Event::default()
.json_data(&event)
.unwrap_or_else(|_| Event::default().data("{}"));
Some((Ok(sse_event), (rx, ticker)))
if let Some(event) = replay.pop_front() {
let sse_event = Event::default()
.id(event.id.to_string())
.json_data(&event.payload)
.unwrap_or_else(|_| Event::default().data("{}"));
return Some((Ok(sse_event), (rx, ticker, replay, replay_cursor)));
}
loop {
tokio::select! {
_ = ticker.tick() => {
let sse_event = Event::default()
.json_data(&heartbeat)
.unwrap_or_else(|_| Event::default().data("{}"));
return Some((Ok(sse_event), (rx, ticker, replay, replay_cursor)));
}
event = rx.recv() => {
match event {
Ok(event) => {
if event.id <= replay_cursor {
continue;
}
let sse_event = Event::default()
.id(event.id.to_string())
.json_data(&event.payload)
.unwrap_or_else(|_| Event::default().data("{}"));
return Some((Ok(sse_event), (rx, ticker, replay, replay_cursor)));
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!(skipped, "opencode event stream lagged");
return Some((Ok(Event::default().comment("lagged")), (rx, ticker, replay, replay_cursor)));
}
Err(broadcast::error::RecvError::Closed) => return None,
}
Err(broadcast::error::RecvError::Lagged(_)) => {
Some((Ok(Event::default().comment("lagged")), (rx, ticker)))
}
Err(broadcast::error::RecvError::Closed) => None,
}
}
}
@ -3063,6 +3136,7 @@ async fn oc_global_event(
headers: HeaderMap,
Query(query): Query<DirectoryQuery>,
) -> Sse<impl futures::Stream<Item = Result<Event, Infallible>>> {
let last_event_id = parse_last_event_id(&headers);
let receiver = state.opencode.subscribe();
let directory = state
.opencode
@ -3079,6 +3153,12 @@ async fn oc_global_event(
"branch": branch,
}
}));
let replay_events = state.opencode.buffered_events_after(last_event_id);
let replay_cursor = replay_events
.last()
.map(|event| event.id)
.or(last_event_id)
.unwrap_or(0);
let heartbeat_payload = json!({
"payload": {
@ -3087,31 +3167,52 @@ async fn oc_global_event(
}
});
let stream = stream::unfold(
(receiver, interval(std::time::Duration::from_secs(30))),
move |(mut rx, mut ticker)| {
(
receiver,
interval(std::time::Duration::from_secs(30)),
VecDeque::from(replay_events),
replay_cursor,
),
move |(mut rx, mut ticker, mut replay, replay_cursor)| {
let directory = directory.clone();
let heartbeat = heartbeat_payload.clone();
async move {
tokio::select! {
_ = ticker.tick() => {
let sse_event = Event::default()
.json_data(&heartbeat)
.unwrap_or_else(|_| Event::default().data("{}"));
Some((Ok(sse_event), (rx, ticker)))
}
event = rx.recv() => {
match event {
Ok(event) => {
let payload = json!({"directory": directory, "payload": event});
let sse_event = Event::default()
.json_data(&payload)
.unwrap_or_else(|_| Event::default().data("{}"));
Some((Ok(sse_event), (rx, ticker)))
if let Some(event) = replay.pop_front() {
let payload = json!({"directory": directory, "payload": event.payload});
let sse_event = Event::default()
.id(event.id.to_string())
.json_data(&payload)
.unwrap_or_else(|_| Event::default().data("{}"));
return Some((Ok(sse_event), (rx, ticker, replay, replay_cursor)));
}
loop {
tokio::select! {
_ = ticker.tick() => {
let sse_event = Event::default()
.json_data(&heartbeat)
.unwrap_or_else(|_| Event::default().data("{}"));
return Some((Ok(sse_event), (rx, ticker, replay, replay_cursor)));
}
event = rx.recv() => {
match event {
Ok(event) => {
if event.id <= replay_cursor {
continue;
}
let payload = json!({"directory": directory, "payload": event.payload});
let sse_event = Event::default()
.id(event.id.to_string())
.json_data(&payload)
.unwrap_or_else(|_| Event::default().data("{}"));
return Some((Ok(sse_event), (rx, ticker, replay, replay_cursor)));
}
Err(broadcast::error::RecvError::Lagged(skipped)) => {
warn!(skipped, "opencode global event stream lagged");
return Some((Ok(Event::default().comment("lagged")), (rx, ticker, replay, replay_cursor)));
}
Err(broadcast::error::RecvError::Closed) => return None,
}
Err(broadcast::error::RecvError::Lagged(_)) => {
Some((Ok(Event::default().comment("lagged")), (rx, ticker)))
}
Err(broadcast::error::RecvError::Closed) => None,
}
}
}