From 4dc5e1b376987535e5a9295f22c8e3b6adf2afd8 Mon Sep 17 00:00:00 2001 From: Harivansh Rathi Date: Thu, 12 Mar 2026 00:40:39 -0400 Subject: [PATCH] feat: add structured part events to gateway for teamActivity, media, and error parts Extends GatewayEvent union with structured_part variants, adds matching HistoryPart types, wires SSE emission in runtime handleChat, and maps agent-core content parts to history in session-state. --- .../src/core/gateway/internal-types.ts | 24 +++++- .../coding-agent/src/core/gateway/runtime.ts | 78 +++++++++++++++++++ .../src/core/gateway/session-state.ts | 35 +++++++++ .../coding-agent/src/core/gateway/types.ts | 10 ++- .../src/core/gateway/vercel-ai-stream.ts | 20 +++++ 5 files changed, 165 insertions(+), 2 deletions(-) diff --git a/packages/coding-agent/src/core/gateway/internal-types.ts b/packages/coding-agent/src/core/gateway/internal-types.ts index 40c1732..88274d4 100644 --- a/packages/coding-agent/src/core/gateway/internal-types.ts +++ b/packages/coding-agent/src/core/gateway/internal-types.ts @@ -55,7 +55,29 @@ export type GatewayEvent = } | { type: "message_complete"; sessionKey: string; text: string } | { type: "error"; sessionKey: string; error: string } - | { type: "aborted"; sessionKey: string }; + | { type: "aborted"; sessionKey: string } + | { + type: "structured_part"; + sessionKey: string; + partType: "teamActivity"; + payload: { + teamId: string; + status: string; + members: Array<{ id: string; name: string; role?: string; status: string; message?: string }>; + }; + } + | { + type: "structured_part"; + sessionKey: string; + partType: "media"; + payload: { url: string; mimeType?: string }; + } + | { + type: "structured_part"; + sessionKey: string; + partType: "error"; + payload: { code: string; message: string }; + }; export interface ManagedGatewaySession { sessionKey: string; diff --git a/packages/coding-agent/src/core/gateway/runtime.ts b/packages/coding-agent/src/core/gateway/runtime.ts index e8e8b3d..d86105e 100644 --- a/packages/coding-agent/src/core/gateway/runtime.ts +++ b/packages/coding-agent/src/core/gateway/runtime.ts @@ -7,6 +7,7 @@ import { import { rm } from "node:fs/promises"; import { join } from "node:path"; import { URL } from "node:url"; +import type { AgentMessage } from "@mariozechner/companion-agent-core"; import type { AgentSession, AgentSessionEvent } from "../agent-session.js"; import type { Settings } from "../settings-manager.js"; import { extractMessageText, getLastAssistantText } from "./helpers.js"; @@ -29,6 +30,7 @@ import type { ModelInfo, } from "./types.js"; import { + createGatewayStructuredPartListener, createVercelStreamListener, errorVercelStream, extractUserText, @@ -567,6 +569,7 @@ export class GatewayRuntime { sessionKey: managedSession.sessionKey, text: extractMessageText(event.message), }); + this.emitStructuredParts(managedSession, event.message); return; } if (event.message.role === "toolResult") { @@ -654,6 +657,73 @@ export class GatewayRuntime { }); } + private emitStructuredParts( + managedSession: ManagedGatewaySession, + message: AgentMessage, + ): void { + const content = message.content; + if (!Array.isArray(content)) return; + + for (const part of content) { + if (typeof part !== "object" || part === null) continue; + const p = part as Record; + + if (p.type === "teamActivity") { + const teamId = typeof p.teamId === "string" ? p.teamId : ""; + const status = typeof p.status === "string" ? p.status : "running"; + if (!teamId) continue; + const rawMembers = Array.isArray(p.members) ? p.members : []; + const members = rawMembers + .filter((m): m is Record => typeof m === "object" && m !== null) + .map((m) => ({ + id: typeof m.id === "string" ? m.id : "", + name: typeof m.name === "string" ? m.name : "Teammate", + ...(typeof m.role === "string" ? { role: m.role } : {}), + status: typeof m.status === "string" ? m.status : "running", + ...(typeof m.message === "string" ? { message: m.message } : {}), + })) + .filter((m) => m.id.length > 0); + this.emit(managedSession, { + type: "structured_part", + sessionKey: managedSession.sessionKey, + partType: "teamActivity", + payload: { teamId, status, members }, + }); + continue; + } + + if (p.type === "image") { + const url = typeof p.url === "string" ? p.url : ""; + if (!url) continue; + this.emit(managedSession, { + type: "structured_part", + sessionKey: managedSession.sessionKey, + partType: "media", + payload: { + url, + ...(typeof p.mimeType === "string" ? { mimeType: p.mimeType } : {}), + }, + }); + continue; + } + + if (p.type === "error") { + const errorMessage = typeof p.message === "string" ? p.message : ""; + if (!errorMessage) continue; + this.emit(managedSession, { + type: "structured_part", + sessionKey: managedSession.sessionKey, + partType: "error", + payload: { + code: typeof p.code === "string" ? p.code : "unknown", + message: errorMessage, + }, + }); + continue; + } + } + } + private createSessionState( managedSession: ManagedGatewaySession, ): GatewaySessionState { @@ -1106,7 +1176,9 @@ export class GatewayRuntime { response.write("\n"); const listener = createVercelStreamListener(response); + const structuredPartListener = createGatewayStructuredPartListener(response); let unsubscribe: (() => void) | undefined; + let unsubscribeStructured: (() => void) | undefined; let streamingActive = false; const stopStreaming = () => { @@ -1114,6 +1186,8 @@ export class GatewayRuntime { streamingActive = false; unsubscribe?.(); unsubscribe = undefined; + unsubscribeStructured?.(); + unsubscribeStructured = undefined; }; // Clean up on client disconnect @@ -1135,6 +1209,10 @@ export class GatewayRuntime { onStart: () => { if (clientDisconnected || streamingActive) return; unsubscribe = managedSession.session.subscribe(listener); + managedSession.listeners.add(structuredPartListener); + unsubscribeStructured = () => { + managedSession.listeners.delete(structuredPartListener); + }; streamingActive = true; }, onFinish: () => { diff --git a/packages/coding-agent/src/core/gateway/session-state.ts b/packages/coding-agent/src/core/gateway/session-state.ts index a2534cc..78e48db 100644 --- a/packages/coding-agent/src/core/gateway/session-state.ts +++ b/packages/coding-agent/src/core/gateway/session-state.ts @@ -85,6 +85,41 @@ export function messageContentToHistoryParts(msg: AgentMessage): HistoryPart[] { args: toolCall.arguments, state: "call", }); + } else if (contentPart.type === "teamActivity") { + const activity = contentPart as { + type: "teamActivity"; + teamId: string; + status: string; + members?: Array<{ id: string; name: string; role?: string; status: string; message?: string }>; + }; + parts.push({ + type: "teamActivity", + teamId: activity.teamId, + status: activity.status, + members: Array.isArray(activity.members) ? activity.members : [], + }); + } else if (contentPart.type === "image") { + const image = contentPart as { + type: "image"; + url: string; + mimeType?: string; + }; + parts.push({ + type: "media", + url: image.url, + mimeType: image.mimeType, + }); + } else if (contentPart.type === "error") { + const error = contentPart as { + type: "error"; + code?: string; + message: string; + }; + parts.push({ + type: "error", + code: typeof error.code === "string" ? error.code : "unknown", + message: error.message, + }); } } return parts; diff --git a/packages/coding-agent/src/core/gateway/types.ts b/packages/coding-agent/src/core/gateway/types.ts index 77c8091..938bdd4 100644 --- a/packages/coding-agent/src/core/gateway/types.ts +++ b/packages/coding-agent/src/core/gateway/types.ts @@ -77,7 +77,15 @@ export type HistoryPart = args: unknown; state: string; result?: unknown; - }; + } + | { + type: "teamActivity"; + teamId: string; + status: string; + members: Array<{ id: string; name: string; role?: string; status: string; message?: string }>; + } + | { type: "media"; url: string; mimeType?: string } + | { type: "error"; code: string; message: string }; export interface ChannelStatus { id: string; diff --git a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts index 3fcaab7..a1a7262 100644 --- a/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts +++ b/packages/coding-agent/src/core/gateway/vercel-ai-stream.ts @@ -1,6 +1,7 @@ import { randomUUID } from "node:crypto"; import type { ServerResponse } from "node:http"; import type { AgentSessionEvent } from "../agent-session.js"; +import type { GatewayEvent } from "./internal-types.js"; type TextStreamState = { started: boolean; @@ -324,3 +325,22 @@ export function errorVercelStream( writeChunk(response, "[DONE]"); response.end(); } + +/** + * Create a GatewayEvent listener that forwards `structured_part` events to the + * response as custom SSE chunks. Returns the listener function so the caller + * can subscribe it to managedSession.listeners and unsubscribe on cleanup. + */ +export function createGatewayStructuredPartListener( + response: ServerResponse, +): (event: GatewayEvent) => void { + return (event: GatewayEvent) => { + if (response.writableEnded) return; + if (event.type !== "structured_part") return; + writeChunk(response, { + type: "structured-part", + partType: event.partType, + payload: event.payload, + }); + }; +}