diff --git a/.gitignore b/.gitignore index a06c1377..7b88789e 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,6 @@ pi-*.html out.html packages/coding-agent/binaries/ todo.md + +# Riptide artifacts (cloud-synced) +.humanlayer/tasks/ diff --git a/packages/coding-agent/src/core/gateway-runtime.ts b/packages/coding-agent/src/core/gateway-runtime.ts index 84cd28bc..f3c03e54 100644 --- a/packages/coding-agent/src/core/gateway-runtime.ts +++ b/packages/coding-agent/src/core/gateway-runtime.ts @@ -4,7 +4,12 @@ import { URL } from "node:url"; import type { ImageContent } from "@mariozechner/pi-ai"; import type { AgentSession, AgentSessionEvent } from "./agent-session.js"; import { SessionManager } from "./session-manager.js"; -import { createVercelStreamListener, errorVercelStream, extractUserText, finishVercelStream } from "./vercel-ai-stream.js"; +import { + createVercelStreamListener, + errorVercelStream, + extractUserText, + finishVercelStream, +} from "./vercel-ai-stream.js"; export interface GatewayConfig { bind: string; @@ -59,6 +64,8 @@ export interface GatewayRuntimeOptions { interface GatewayQueuedMessage { request: GatewayMessageRequest; resolve: (result: GatewayMessageResult) => void; + onStart?: () => void; + onFinish?: () => void; } type GatewayEvent = @@ -186,18 +193,26 @@ export class GatewayRuntime { } async enqueueMessage(request: GatewayMessageRequest): Promise { - const managedSession = await this.ensureSession(request.sessionKey); + return this.enqueueManagedMessage({ request }); + } + + private async enqueueManagedMessage(queuedMessage: { + request: GatewayMessageRequest; + onStart?: () => void; + onFinish?: () => void; + }): Promise { + const managedSession = await this.ensureSession(queuedMessage.request.sessionKey); if (managedSession.queue.length >= this.config.session.maxQueuePerSession) { return { ok: false, response: "", error: `Queue full (${this.config.session.maxQueuePerSession} pending).`, - sessionKey: request.sessionKey, + sessionKey: queuedMessage.request.sessionKey, }; } return new Promise((resolve) => { - managedSession.queue.push({ request, resolve }); + managedSession.queue.push({ ...queuedMessage, resolve }); this.emitState(managedSession); void this.processNext(managedSession); }); @@ -303,6 +318,7 @@ export class GatewayRuntime { this.emitState(managedSession); try { + queued.onStart?.(); await managedSession.session.prompt(queued.request.text, { images: queued.request.images, source: queued.request.source ?? "extension", @@ -327,6 +343,7 @@ export class GatewayRuntime { sessionKey: managedSession.sessionKey, }); } finally { + queued.onFinish?.(); managedSession.processing = false; managedSession.lastActiveAt = Date.now(); this.emitState(managedSession); @@ -593,11 +610,7 @@ export class GatewayRuntime { }); } - private async handleChat( - sessionKey: string, - request: IncomingMessage, - response: ServerResponse, - ): Promise { + private async handleChat(sessionKey: string, request: IncomingMessage, response: ServerResponse): Promise { const body = await this.readJsonBody(request); const text = extractUserText(body); if (!text) { @@ -614,27 +627,44 @@ export class GatewayRuntime { }); response.write("\n"); - // Subscribe to session events for Vercel AI SDK translation const managedSession = await this.ensureSession(sessionKey); const listener = createVercelStreamListener(response); - const unsubscribe = managedSession.session.subscribe(listener); + let unsubscribe: (() => void) | undefined; + let streamingActive = false; + + const stopStreaming = () => { + if (!streamingActive) return; + streamingActive = false; + unsubscribe?.(); + unsubscribe = undefined; + }; // Clean up on client disconnect let clientDisconnected = false; request.on("close", () => { clientDisconnected = true; - unsubscribe(); + stopStreaming(); }); // Drive the session through the existing queue infrastructure try { - const result = await this.enqueueMessage({ - sessionKey, - text, - source: "extension", + const result = await this.enqueueManagedMessage({ + request: { + sessionKey, + text, + source: "extension", + }, + onStart: () => { + if (clientDisconnected || streamingActive) return; + unsubscribe = managedSession.session.subscribe(listener); + streamingActive = true; + }, + onFinish: () => { + stopStreaming(); + }, }); if (!clientDisconnected) { - unsubscribe(); + stopStreaming(); if (result.ok) { finishVercelStream(response, "stop"); } else { @@ -648,7 +678,7 @@ export class GatewayRuntime { } } catch (error) { if (!clientDisconnected) { - unsubscribe(); + stopStreaming(); const message = error instanceof Error ? error.message : String(error); errorVercelStream(response, message); } diff --git a/packages/coding-agent/src/core/vercel-ai-stream.ts b/packages/coding-agent/src/core/vercel-ai-stream.ts index 3f6dfdea..2326cbb6 100644 --- a/packages/coding-agent/src/core/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/vercel-ai-stream.ts @@ -58,20 +58,32 @@ export function createVercelStreamListener( response: ServerResponse, messageId?: string, ): (event: AgentSessionEvent) => void { - let started = false; + // Gate: only forward events within a single prompt's agent_start -> agent_end lifecycle. + // handleChat now subscribes this listener immediately before the queued prompt starts, + // so these guards only need to bound the stream to that prompt's event span. + let active = false; const msgId = messageId ?? randomUUID(); return (event: AgentSessionEvent) => { if (response.writableEnded) return; - switch (event.type) { - case "agent_start": - if (!started) { - writeChunk(response, { type: "start", messageId: msgId }); - started = true; - } - return; + // Activate on our agent_start, deactivate on agent_end + if (event.type === "agent_start") { + if (!active) { + active = true; + writeChunk(response, { type: "start", messageId: msgId }); + } + return; + } + if (event.type === "agent_end") { + active = false; + return; + } + // Drop events that don't belong to our message + if (!active) return; + + switch (event.type) { case "turn_start": writeChunk(response, { type: "start-step" }); return; @@ -169,10 +181,7 @@ export function createVercelStreamListener( /** * Write the terminal finish sequence and end the response. */ -export function finishVercelStream( - response: ServerResponse, - finishReason: string = "stop", -): void { +export function finishVercelStream(response: ServerResponse, finishReason: string = "stop"): void { if (response.writableEnded) return; writeChunk(response, { type: "finish", finishReason }); writeChunk(response, "[DONE]"); @@ -182,10 +191,7 @@ export function finishVercelStream( /** * Write an error chunk and end the response. */ -export function errorVercelStream( - response: ServerResponse, - errorText: string, -): void { +export function errorVercelStream(response: ServerResponse, errorText: string): void { if (response.writableEnded) return; writeChunk(response, { type: "error", errorText }); writeChunk(response, "[DONE]"); diff --git a/packages/coding-agent/test/vercel-ai-stream.test.ts b/packages/coding-agent/test/vercel-ai-stream.test.ts index c90ca646..1c3ea750 100644 --- a/packages/coding-agent/test/vercel-ai-stream.test.ts +++ b/packages/coding-agent/test/vercel-ai-stream.test.ts @@ -1,13 +1,11 @@ -import { describe, it, expect } from "vitest"; +import { describe, expect, it } from "vitest"; import type { AgentSessionEvent } from "../src/core/agent-session.js"; -import { extractUserText, createVercelStreamListener, finishVercelStream } from "../src/core/vercel-ai-stream.js"; +import { createVercelStreamListener, extractUserText } from "../src/core/vercel-ai-stream.js"; describe("extractUserText", () => { it("extracts text from useChat v5+ format with parts", () => { const body = { - messages: [ - { role: "user", parts: [{ type: "text", text: "hello world" }] }, - ], + messages: [{ role: "user", parts: [{ type: "text", text: "hello world" }] }], }; expect(extractUserText(body)).toBe("hello world"); }); @@ -70,7 +68,9 @@ describe("createVercelStreamListener", () => { this.writableEnded = true; }, chunks, - get ended() { return ended; }, + get ended() { + return ended; + }, } as any; } @@ -79,8 +79,11 @@ describe("createVercelStreamListener", () => { .filter((c) => c.startsWith("data: ")) .map((c) => { const payload = c.replace(/^data: /, "").replace(/\n\n$/, ""); - try { return JSON.parse(payload); } - catch { return payload; } + try { + return JSON.parse(payload); + } catch { + return payload; + } }); } @@ -129,4 +132,18 @@ describe("createVercelStreamListener", () => { const parsed = parseChunks(response.chunks); expect(parsed).toEqual([{ type: "start", messageId: "test-msg-id" }]); }); + + it("ignores events outside the active prompt lifecycle", () => { + const response = createMockResponse(); + const listener = createVercelStreamListener(response, "test-msg-id"); + + listener({ type: "turn_start", turnIndex: 0, timestamp: Date.now() } as AgentSessionEvent); + listener({ type: "agent_start" } as AgentSessionEvent); + listener({ type: "turn_start", turnIndex: 0, timestamp: Date.now() } as AgentSessionEvent); + listener({ type: "agent_end", messages: [] } as AgentSessionEvent); + listener({ type: "turn_start", turnIndex: 1, timestamp: Date.now() } as AgentSessionEvent); + + const parsed = parseChunks(response.chunks); + expect(parsed).toEqual([{ type: "start", messageId: "test-msg-id" }, { type: "start-step" }]); + }); });