clanker-agent/packages/tui/test/stdin-buffer.test.ts
Harivansh Rathi 0250f72976 move pi-mono into companion-cloud as apps/companion-os
- Copy all pi-mono source into apps/companion-os/
- Update Dockerfile to COPY pre-built binary instead of downloading from GitHub Releases
- Update deploy-staging.yml to build pi from source (bun compile) before Docker build
- Add apps/companion-os/** to path triggers
- No more cross-repo dispatch needed

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 09:22:50 -08:00

450 lines
13 KiB
TypeScript

/**
* Tests for StdinBuffer
*
* Based on code from OpenTUI (https://github.com/anomalyco/opentui)
* MIT License - Copyright (c) 2025 opentui
*/
import assert from "node:assert";
import { beforeEach, describe, it } from "node:test";
import { StdinBuffer } from "../src/stdin-buffer.js";
describe("StdinBuffer", () => {
let buffer: StdinBuffer;
let emittedSequences: string[];
beforeEach(() => {
buffer = new StdinBuffer({ timeout: 10 });
// Collect emitted sequences
emittedSequences = [];
buffer.on("data", (sequence) => {
emittedSequences.push(sequence);
});
});
// Helper to process data through the buffer
function processInput(data: string | Buffer): void {
buffer.process(data);
}
// Helper to wait for async operations
async function wait(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
describe("Regular Characters", () => {
it("should pass through regular characters immediately", () => {
processInput("a");
assert.deepStrictEqual(emittedSequences, ["a"]);
});
it("should pass through multiple regular characters", () => {
processInput("abc");
assert.deepStrictEqual(emittedSequences, ["a", "b", "c"]);
});
it("should handle unicode characters", () => {
processInput("hello 世界");
assert.deepStrictEqual(emittedSequences, [
"h",
"e",
"l",
"l",
"o",
" ",
"世",
"界",
]);
});
});
describe("Complete Escape Sequences", () => {
it("should pass through complete mouse SGR sequences", () => {
const mouseSeq = "\x1b[<35;20;5m";
processInput(mouseSeq);
assert.deepStrictEqual(emittedSequences, [mouseSeq]);
});
it("should pass through complete arrow key sequences", () => {
const upArrow = "\x1b[A";
processInput(upArrow);
assert.deepStrictEqual(emittedSequences, [upArrow]);
});
it("should pass through complete function key sequences", () => {
const f1 = "\x1b[11~";
processInput(f1);
assert.deepStrictEqual(emittedSequences, [f1]);
});
it("should pass through meta key sequences", () => {
const metaA = "\x1ba";
processInput(metaA);
assert.deepStrictEqual(emittedSequences, [metaA]);
});
it("should pass through SS3 sequences", () => {
const ss3 = "\x1bOA";
processInput(ss3);
assert.deepStrictEqual(emittedSequences, [ss3]);
});
});
describe("Partial Escape Sequences", () => {
it("should buffer incomplete mouse SGR sequence", async () => {
processInput("\x1b");
assert.deepStrictEqual(emittedSequences, []);
assert.strictEqual(buffer.getBuffer(), "\x1b");
processInput("[<35");
assert.deepStrictEqual(emittedSequences, []);
assert.strictEqual(buffer.getBuffer(), "\x1b[<35");
processInput(";20;5m");
assert.deepStrictEqual(emittedSequences, ["\x1b[<35;20;5m"]);
assert.strictEqual(buffer.getBuffer(), "");
});
it("should buffer incomplete CSI sequence", () => {
processInput("\x1b[");
assert.deepStrictEqual(emittedSequences, []);
processInput("1;");
assert.deepStrictEqual(emittedSequences, []);
processInput("5H");
assert.deepStrictEqual(emittedSequences, ["\x1b[1;5H"]);
});
it("should buffer split across many chunks", () => {
processInput("\x1b");
processInput("[");
processInput("<");
processInput("3");
processInput("5");
processInput(";");
processInput("2");
processInput("0");
processInput(";");
processInput("5");
processInput("m");
assert.deepStrictEqual(emittedSequences, ["\x1b[<35;20;5m"]);
});
it("should flush incomplete sequence after timeout", async () => {
processInput("\x1b[<35");
assert.deepStrictEqual(emittedSequences, []);
// Wait for timeout
await wait(15);
assert.deepStrictEqual(emittedSequences, ["\x1b[<35"]);
});
});
describe("Mixed Content", () => {
it("should handle characters followed by escape sequence", () => {
processInput("abc\x1b[A");
assert.deepStrictEqual(emittedSequences, ["a", "b", "c", "\x1b[A"]);
});
it("should handle escape sequence followed by characters", () => {
processInput("\x1b[Aabc");
assert.deepStrictEqual(emittedSequences, ["\x1b[A", "a", "b", "c"]);
});
it("should handle multiple complete sequences", () => {
processInput("\x1b[A\x1b[B\x1b[C");
assert.deepStrictEqual(emittedSequences, ["\x1b[A", "\x1b[B", "\x1b[C"]);
});
it("should handle partial sequence with preceding characters", () => {
processInput("abc\x1b[<35");
assert.deepStrictEqual(emittedSequences, ["a", "b", "c"]);
assert.strictEqual(buffer.getBuffer(), "\x1b[<35");
processInput(";20;5m");
assert.deepStrictEqual(emittedSequences, [
"a",
"b",
"c",
"\x1b[<35;20;5m",
]);
});
});
describe("Kitty Keyboard Protocol", () => {
it("should handle Kitty CSI u press events", () => {
// Press 'a' in Kitty protocol
processInput("\x1b[97u");
assert.deepStrictEqual(emittedSequences, ["\x1b[97u"]);
});
it("should handle Kitty CSI u release events", () => {
// Release 'a' in Kitty protocol
processInput("\x1b[97;1:3u");
assert.deepStrictEqual(emittedSequences, ["\x1b[97;1:3u"]);
});
it("should handle batched Kitty press and release", () => {
// Press 'a', release 'a' batched together (common over SSH)
processInput("\x1b[97u\x1b[97;1:3u");
assert.deepStrictEqual(emittedSequences, ["\x1b[97u", "\x1b[97;1:3u"]);
});
it("should handle multiple batched Kitty events", () => {
// Press 'a', release 'a', press 'b', release 'b'
processInput("\x1b[97u\x1b[97;1:3u\x1b[98u\x1b[98;1:3u");
assert.deepStrictEqual(emittedSequences, [
"\x1b[97u",
"\x1b[97;1:3u",
"\x1b[98u",
"\x1b[98;1:3u",
]);
});
it("should handle Kitty arrow keys with event type", () => {
// Up arrow press with event type
processInput("\x1b[1;1:1A");
assert.deepStrictEqual(emittedSequences, ["\x1b[1;1:1A"]);
});
it("should handle Kitty functional keys with event type", () => {
// Delete key release
processInput("\x1b[3;1:3~");
assert.deepStrictEqual(emittedSequences, ["\x1b[3;1:3~"]);
});
it("should handle plain characters mixed with Kitty sequences", () => {
// Plain 'a' followed by Kitty release
processInput("a\x1b[97;1:3u");
assert.deepStrictEqual(emittedSequences, ["a", "\x1b[97;1:3u"]);
});
it("should handle Kitty sequence followed by plain characters", () => {
processInput("\x1b[97ua");
assert.deepStrictEqual(emittedSequences, ["\x1b[97u", "a"]);
});
it("should handle rapid typing simulation with Kitty protocol", () => {
// Simulates typing "hi" quickly with releases interleaved
processInput("\x1b[104u\x1b[104;1:3u\x1b[105u\x1b[105;1:3u");
assert.deepStrictEqual(emittedSequences, [
"\x1b[104u",
"\x1b[104;1:3u",
"\x1b[105u",
"\x1b[105;1:3u",
]);
});
});
describe("Mouse Events", () => {
it("should handle mouse press event", () => {
processInput("\x1b[<0;10;5M");
assert.deepStrictEqual(emittedSequences, ["\x1b[<0;10;5M"]);
});
it("should handle mouse release event", () => {
processInput("\x1b[<0;10;5m");
assert.deepStrictEqual(emittedSequences, ["\x1b[<0;10;5m"]);
});
it("should handle mouse move event", () => {
processInput("\x1b[<35;20;5m");
assert.deepStrictEqual(emittedSequences, ["\x1b[<35;20;5m"]);
});
it("should handle split mouse events", () => {
processInput("\x1b[<3");
processInput("5;1");
processInput("5;");
processInput("10m");
assert.deepStrictEqual(emittedSequences, ["\x1b[<35;15;10m"]);
});
it("should handle multiple mouse events", () => {
processInput("\x1b[<35;1;1m\x1b[<35;2;2m\x1b[<35;3;3m");
assert.deepStrictEqual(emittedSequences, [
"\x1b[<35;1;1m",
"\x1b[<35;2;2m",
"\x1b[<35;3;3m",
]);
});
it("should handle old-style mouse sequence (ESC[M + 3 bytes)", () => {
processInput("\x1b[M abc");
assert.deepStrictEqual(emittedSequences, ["\x1b[M ab", "c"]);
});
it("should buffer incomplete old-style mouse sequence", () => {
processInput("\x1b[M");
assert.strictEqual(buffer.getBuffer(), "\x1b[M");
processInput(" a");
assert.strictEqual(buffer.getBuffer(), "\x1b[M a");
processInput("b");
assert.deepStrictEqual(emittedSequences, ["\x1b[M ab"]);
});
});
describe("Edge Cases", () => {
it("should handle empty input", () => {
processInput("");
// Empty string emits an empty data event
assert.deepStrictEqual(emittedSequences, [""]);
});
it("should handle lone escape character with timeout", async () => {
processInput("\x1b");
assert.deepStrictEqual(emittedSequences, []);
// After timeout, should emit
await wait(15);
assert.deepStrictEqual(emittedSequences, ["\x1b"]);
});
it("should handle lone escape character with explicit flush", () => {
processInput("\x1b");
assert.deepStrictEqual(emittedSequences, []);
const flushed = buffer.flush();
assert.deepStrictEqual(flushed, ["\x1b"]);
});
it("should handle buffer input", () => {
processInput(Buffer.from("\x1b[A"));
assert.deepStrictEqual(emittedSequences, ["\x1b[A"]);
});
it("should handle very long sequences", () => {
const longSeq = `\x1b[${"1;".repeat(50)}H`;
processInput(longSeq);
assert.deepStrictEqual(emittedSequences, [longSeq]);
});
});
describe("Flush", () => {
it("should flush incomplete sequences", () => {
processInput("\x1b[<35");
const flushed = buffer.flush();
assert.deepStrictEqual(flushed, ["\x1b[<35"]);
assert.strictEqual(buffer.getBuffer(), "");
});
it("should return empty array if nothing to flush", () => {
const flushed = buffer.flush();
assert.deepStrictEqual(flushed, []);
});
it("should emit flushed data via timeout", async () => {
processInput("\x1b[<35");
assert.deepStrictEqual(emittedSequences, []);
// Wait for timeout to flush
await wait(15);
assert.deepStrictEqual(emittedSequences, ["\x1b[<35"]);
});
});
describe("Clear", () => {
it("should clear buffered content without emitting", () => {
processInput("\x1b[<35");
assert.strictEqual(buffer.getBuffer(), "\x1b[<35");
buffer.clear();
assert.strictEqual(buffer.getBuffer(), "");
assert.deepStrictEqual(emittedSequences, []);
});
});
describe("Bracketed Paste", () => {
let emittedPaste: string[] = [];
beforeEach(() => {
buffer = new StdinBuffer({ timeout: 10 });
// Collect emitted sequences
emittedSequences = [];
buffer.on("data", (sequence) => {
emittedSequences.push(sequence);
});
// Collect paste events
emittedPaste = [];
buffer.on("paste", (data) => {
emittedPaste.push(data);
});
});
it("should emit paste event for complete bracketed paste", () => {
const pasteStart = "\x1b[200~";
const pasteEnd = "\x1b[201~";
const content = "hello world";
processInput(pasteStart + content + pasteEnd);
assert.deepStrictEqual(emittedPaste, ["hello world"]);
assert.deepStrictEqual(emittedSequences, []); // No data events during paste
});
it("should handle paste arriving in chunks", () => {
processInput("\x1b[200~");
assert.deepStrictEqual(emittedPaste, []);
processInput("hello ");
assert.deepStrictEqual(emittedPaste, []);
processInput("world\x1b[201~");
assert.deepStrictEqual(emittedPaste, ["hello world"]);
assert.deepStrictEqual(emittedSequences, []);
});
it("should handle paste with input before and after", () => {
processInput("a");
processInput("\x1b[200~pasted\x1b[201~");
processInput("b");
assert.deepStrictEqual(emittedSequences, ["a", "b"]);
assert.deepStrictEqual(emittedPaste, ["pasted"]);
});
it("should handle paste with newlines", () => {
processInput("\x1b[200~line1\nline2\nline3\x1b[201~");
assert.deepStrictEqual(emittedPaste, ["line1\nline2\nline3"]);
assert.deepStrictEqual(emittedSequences, []);
});
it("should handle paste with unicode", () => {
processInput("\x1b[200~Hello 世界 🎉\x1b[201~");
assert.deepStrictEqual(emittedPaste, ["Hello 世界 🎉"]);
assert.deepStrictEqual(emittedSequences, []);
});
});
describe("Destroy", () => {
it("should clear buffer on destroy", () => {
processInput("\x1b[<35");
assert.strictEqual(buffer.getBuffer(), "\x1b[<35");
buffer.destroy();
assert.strictEqual(buffer.getBuffer(), "");
});
it("should clear pending timeouts on destroy", async () => {
processInput("\x1b[<35");
buffer.destroy();
// Wait longer than timeout
await wait(15);
// Should not have emitted anything
assert.deepStrictEqual(emittedSequences, []);
});
});
});