stabilize (#3)

* specs

* Stabilize deskctl runtime foundation

Co-authored-by: Codex <noreply@openai.com>

* opsx archive

---------

Co-authored-by: Codex <noreply@openai.com>
This commit is contained in:
Hari 2026-03-25 18:31:08 -04:00 committed by GitHub
parent d487a60209
commit 6dce22eaef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 1289 additions and 295 deletions

View file

@ -1,41 +1,56 @@
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::os::unix::process::CommandExt;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
use std::thread;
use std::time::Duration;
use anyhow::{bail, Context, Result};
use anyhow::{Context, Result, bail};
use crate::cli::GlobalOpts;
use crate::core::doctor::{DoctorReport, run as run_doctor_report};
use crate::core::paths::{pid_path_for_session, socket_dir, socket_path_for_session};
use crate::core::protocol::{Request, Response};
fn socket_dir() -> PathBuf {
if let Ok(dir) = std::env::var("DESKCTL_SOCKET_DIR") {
return PathBuf::from(dir);
}
if let Ok(runtime) = std::env::var("XDG_RUNTIME_DIR") {
return PathBuf::from(runtime).join("deskctl");
}
dirs::home_dir()
.unwrap_or_else(|| PathBuf::from("/tmp"))
.join(".deskctl")
}
fn socket_path(opts: &GlobalOpts) -> PathBuf {
if let Some(ref path) = opts.socket {
return path.clone();
}
socket_dir().join(format!("{}.sock", opts.session))
socket_path_for_session(&opts.session)
}
fn pid_path(opts: &GlobalOpts) -> PathBuf {
socket_dir().join(format!("{}.pid", opts.session))
pid_path_for_session(&opts.session)
}
fn try_connect(opts: &GlobalOpts) -> Option<UnixStream> {
UnixStream::connect(socket_path(opts)).ok()
fn connect_socket(path: &Path) -> Result<UnixStream> {
UnixStream::connect(path).with_context(|| format!("Failed to connect to {}", path.display()))
}
fn is_stale_socket_error(error: &std::io::Error) -> bool {
matches!(
error.kind(),
std::io::ErrorKind::ConnectionRefused | std::io::ErrorKind::NotFound
)
}
fn cleanup_stale_socket(opts: &GlobalOpts) -> Result<bool> {
let path = socket_path(opts);
if !path.exists() {
return Ok(false);
}
match UnixStream::connect(&path) {
Ok(_) => Ok(false),
Err(error) if is_stale_socket_error(&error) => {
std::fs::remove_file(&path)
.with_context(|| format!("Failed to remove stale socket {}", path.display()))?;
Ok(true)
}
Err(error) => Err(error)
.with_context(|| format!("Failed to inspect daemon socket {}", path.display())),
}
}
fn spawn_daemon(opts: &GlobalOpts) -> Result<()> {
@ -51,9 +66,8 @@ fn spawn_daemon(opts: &GlobalOpts) -> Result<()> {
.env("DESKCTL_PID_PATH", pid_path(opts))
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::piped());
.stderr(Stdio::null());
// Detach the daemon process on Unix
unsafe {
cmd.pre_exec(|| {
libc::setsid();
@ -65,82 +79,120 @@ fn spawn_daemon(opts: &GlobalOpts) -> Result<()> {
Ok(())
}
fn send_request_over_stream(mut stream: UnixStream, request: &Request) -> Result<Response> {
stream.set_read_timeout(Some(Duration::from_secs(30)))?;
stream.set_write_timeout(Some(Duration::from_secs(5)))?;
let json = serde_json::to_string(request)?;
writeln!(stream, "{json}")?;
stream.flush()?;
let mut reader = BufReader::new(&stream);
let mut line = String::new();
reader.read_line(&mut line)?;
serde_json::from_str(line.trim()).context("Failed to parse daemon response")
}
fn ping_daemon(opts: &GlobalOpts) -> Result<()> {
let response = send_request_over_stream(connect_socket(&socket_path(opts))?, &Request::new("ping"))?;
if response.success {
Ok(())
} else {
bail!(
"{}",
response
.error
.unwrap_or_else(|| "Daemon health probe failed".to_string())
);
}
}
fn ensure_daemon(opts: &GlobalOpts) -> Result<UnixStream> {
// Try connecting first
if let Some(stream) = try_connect(opts) {
return Ok(stream);
if ping_daemon(opts).is_ok() {
return connect_socket(&socket_path(opts));
}
let removed_stale_socket = cleanup_stale_socket(opts)?;
if removed_stale_socket && ping_daemon(opts).is_ok() {
return connect_socket(&socket_path(opts));
}
// Spawn daemon
spawn_daemon(opts)?;
// Retry with backoff
let max_retries = 20;
let base_delay = Duration::from_millis(50);
for i in 0..max_retries {
thread::sleep(base_delay * (i + 1).min(4));
if let Some(stream) = try_connect(opts) {
return Ok(stream);
for attempt in 0..max_retries {
thread::sleep(base_delay * (attempt + 1).min(4));
if ping_daemon(opts).is_ok() {
return connect_socket(&socket_path(opts));
}
}
bail!(
"Failed to connect to daemon after {} retries.\n\
Socket path: {}",
"Failed to start a healthy daemon after {} retries.\nSocket path: {}",
max_retries,
socket_path(opts).display()
);
}
pub fn send_command(opts: &GlobalOpts, request: &Request) -> Result<Response> {
let mut stream = ensure_daemon(opts)?;
stream.set_read_timeout(Some(Duration::from_secs(30)))?;
stream.set_write_timeout(Some(Duration::from_secs(5)))?;
// Send NDJSON request
let json = serde_json::to_string(request)?;
writeln!(stream, "{json}")?;
stream.flush()?;
// Read NDJSON response
let mut reader = BufReader::new(&stream);
let mut line = String::new();
reader.read_line(&mut line)?;
let response: Response =
serde_json::from_str(line.trim()).context("Failed to parse daemon response")?;
Ok(response)
send_request_over_stream(ensure_daemon(opts)?, request)
}
pub fn start_daemon(opts: &GlobalOpts) -> Result<()> {
if try_connect(opts).is_some() {
println!("Daemon already running ({})", socket_path(opts).display());
return Ok(());
}
spawn_daemon(opts)?;
// Wait briefly and verify
thread::sleep(Duration::from_millis(200));
if try_connect(opts).is_some() {
println!("Daemon started ({})", socket_path(opts).display());
} else {
bail!("Daemon failed to start");
pub fn run_doctor(opts: &GlobalOpts) -> Result<()> {
let report = run_doctor_report(&socket_path(opts));
print_doctor_report(&report, opts.json)?;
if !report.healthy {
std::process::exit(1);
}
Ok(())
}
pub fn stop_daemon(opts: &GlobalOpts) -> Result<()> {
match try_connect(opts) {
Some(mut stream) => {
let req = Request::new("shutdown");
let json = serde_json::to_string(&req)?;
writeln!(stream, "{json}")?;
stream.flush()?;
println!("Daemon stopped");
pub fn start_daemon(opts: &GlobalOpts) -> Result<()> {
if ping_daemon(opts).is_ok() {
println!("Daemon already running ({})", socket_path(opts).display());
return Ok(());
}
if cleanup_stale_socket(opts)? {
println!("Removed stale socket: {}", socket_path(opts).display());
}
spawn_daemon(opts)?;
let max_retries = 20;
for attempt in 0..max_retries {
thread::sleep(Duration::from_millis(50 * (attempt + 1).min(4) as u64));
if ping_daemon(opts).is_ok() {
println!("Daemon started ({})", socket_path(opts).display());
return Ok(());
}
None => {
// Try to clean up stale socket
let path = socket_path(opts);
}
bail!(
"Daemon failed to become healthy.\nSocket path: {}",
socket_path(opts).display()
);
}
pub fn stop_daemon(opts: &GlobalOpts) -> Result<()> {
let path = socket_path(opts);
match UnixStream::connect(&path) {
Ok(stream) => {
let response = send_request_over_stream(stream, &Request::new("shutdown"))?;
if response.success {
println!("Daemon stopped");
} else {
bail!(
"{}",
response
.error
.unwrap_or_else(|| "Failed to stop daemon".to_string())
);
}
}
Err(error) if is_stale_socket_error(&error) => {
if path.exists() {
std::fs::remove_file(&path)?;
println!("Removed stale socket: {}", path.display());
@ -148,15 +200,72 @@ pub fn stop_daemon(opts: &GlobalOpts) -> Result<()> {
println!("Daemon not running");
}
}
Err(error) => {
return Err(error)
.with_context(|| format!("Failed to inspect daemon socket {}", path.display()));
}
}
Ok(())
}
pub fn daemon_status(opts: &GlobalOpts) -> Result<()> {
if try_connect(opts).is_some() {
println!("Daemon running ({})", socket_path(opts).display());
} else {
println!("Daemon not running");
let path = socket_path(opts);
match ping_daemon(opts) {
Ok(()) => println!("Daemon running ({})", path.display()),
Err(_) if path.exists() => println!("Daemon socket exists but is unhealthy ({})", path.display()),
Err(_) => println!("Daemon not running"),
}
Ok(())
}
fn print_doctor_report(report: &DoctorReport, json_output: bool) -> Result<()> {
if json_output {
println!("{}", serde_json::to_string_pretty(report)?);
return Ok(());
}
println!(
"deskctl doctor: {}",
if report.healthy { "healthy" } else { "issues found" }
);
for check in &report.checks {
let status = if check.ok { "OK" } else { "FAIL" };
println!("[{status}] {}: {}", check.name, check.details);
if let Some(fix) = &check.fix {
println!(" fix: {fix}");
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::{cleanup_stale_socket, socket_path};
use crate::cli::GlobalOpts;
use std::time::{SystemTime, UNIX_EPOCH};
#[test]
fn cleanup_stale_socket_removes_refused_socket_path() {
let temp = std::env::temp_dir().join(format!(
"deskctl-test-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos()
));
std::fs::create_dir_all(&temp).unwrap();
let opts = GlobalOpts {
socket: Some(temp.join("stale.sock")),
session: "test".to_string(),
json: false,
};
let listener = std::os::unix::net::UnixListener::bind(socket_path(&opts)).unwrap();
drop(listener);
assert!(cleanup_stale_socket(&opts).unwrap());
assert!(!socket_path(&opts).exists());
let _ = std::fs::remove_dir_all(&temp);
}
}

View file

@ -100,6 +100,8 @@ pub enum Command {
GetScreenSize,
/// Get current mouse position
GetMousePosition,
/// Diagnose X11 runtime, screenshot, and daemon health
Doctor,
/// Take a screenshot without window tree
Screenshot {
/// Save path (default: /tmp/deskctl-{timestamp}.png)
@ -179,6 +181,10 @@ pub fn run() -> Result<()> {
};
}
if let Command::Doctor = app.command {
return connection::run_doctor(&app.global);
}
// All other commands need a daemon connection
let request = build_request(&app.command)?;
let response = connection::send_command(&app.global, &request)?;
@ -237,6 +243,7 @@ fn build_request(cmd: &Command) -> Result<Request> {
Command::ListWindows => Request::new("list-windows"),
Command::GetScreenSize => Request::new("get-screen-size"),
Command::GetMousePosition => Request::new("get-mouse-position"),
Command::Doctor => unreachable!(),
Command::Screenshot { path, annotate } => {
let mut req = Request::new("screenshot").with_extra("annotate", json!(annotate));
if let Some(p) = path {
@ -261,7 +268,7 @@ fn print_response(cmd: &Command, response: &Response) -> Result<()> {
}
if let Some(ref data) = response.data {
// For snapshot, print compact text format
if matches!(cmd, Command::Snapshot { .. }) {
if matches!(cmd, Command::Snapshot { .. } | Command::ListWindows) {
if let Some(screenshot) = data.get("screenshot").and_then(|v| v.as_str()) {
println!("Screenshot: {screenshot}");
}