feat: add structured part events to gateway for teamActivity, media, and error parts

Extends GatewayEvent union with structured_part variants, adds matching
HistoryPart types, wires SSE emission in runtime handleChat, and maps
agent-core content parts to history in session-state.
This commit is contained in:
Harivansh Rathi 2026-03-12 00:40:39 -04:00
parent a5d70ce55e
commit 4dc5e1b376
5 changed files with 165 additions and 2 deletions

View file

@ -55,7 +55,29 @@ export type GatewayEvent =
} }
| { type: "message_complete"; sessionKey: string; text: string } | { type: "message_complete"; sessionKey: string; text: string }
| { type: "error"; sessionKey: string; error: string } | { type: "error"; sessionKey: string; error: string }
| { type: "aborted"; sessionKey: string }; | { type: "aborted"; sessionKey: string }
| {
type: "structured_part";
sessionKey: string;
partType: "teamActivity";
payload: {
teamId: string;
status: string;
members: Array<{ id: string; name: string; role?: string; status: string; message?: string }>;
};
}
| {
type: "structured_part";
sessionKey: string;
partType: "media";
payload: { url: string; mimeType?: string };
}
| {
type: "structured_part";
sessionKey: string;
partType: "error";
payload: { code: string; message: string };
};
export interface ManagedGatewaySession { export interface ManagedGatewaySession {
sessionKey: string; sessionKey: string;

View file

@ -7,6 +7,7 @@ import {
import { rm } from "node:fs/promises"; import { rm } from "node:fs/promises";
import { join } from "node:path"; import { join } from "node:path";
import { URL } from "node:url"; import { URL } from "node:url";
import type { AgentMessage } from "@mariozechner/companion-agent-core";
import type { AgentSession, AgentSessionEvent } from "../agent-session.js"; import type { AgentSession, AgentSessionEvent } from "../agent-session.js";
import type { Settings } from "../settings-manager.js"; import type { Settings } from "../settings-manager.js";
import { extractMessageText, getLastAssistantText } from "./helpers.js"; import { extractMessageText, getLastAssistantText } from "./helpers.js";
@ -29,6 +30,7 @@ import type {
ModelInfo, ModelInfo,
} from "./types.js"; } from "./types.js";
import { import {
createGatewayStructuredPartListener,
createVercelStreamListener, createVercelStreamListener,
errorVercelStream, errorVercelStream,
extractUserText, extractUserText,
@ -567,6 +569,7 @@ export class GatewayRuntime {
sessionKey: managedSession.sessionKey, sessionKey: managedSession.sessionKey,
text: extractMessageText(event.message), text: extractMessageText(event.message),
}); });
this.emitStructuredParts(managedSession, event.message);
return; return;
} }
if (event.message.role === "toolResult") { if (event.message.role === "toolResult") {
@ -654,6 +657,73 @@ export class GatewayRuntime {
}); });
} }
private emitStructuredParts(
managedSession: ManagedGatewaySession,
message: AgentMessage,
): void {
const content = message.content;
if (!Array.isArray(content)) return;
for (const part of content) {
if (typeof part !== "object" || part === null) continue;
const p = part as Record<string, unknown>;
if (p.type === "teamActivity") {
const teamId = typeof p.teamId === "string" ? p.teamId : "";
const status = typeof p.status === "string" ? p.status : "running";
if (!teamId) continue;
const rawMembers = Array.isArray(p.members) ? p.members : [];
const members = rawMembers
.filter((m): m is Record<string, unknown> => typeof m === "object" && m !== null)
.map((m) => ({
id: typeof m.id === "string" ? m.id : "",
name: typeof m.name === "string" ? m.name : "Teammate",
...(typeof m.role === "string" ? { role: m.role } : {}),
status: typeof m.status === "string" ? m.status : "running",
...(typeof m.message === "string" ? { message: m.message } : {}),
}))
.filter((m) => m.id.length > 0);
this.emit(managedSession, {
type: "structured_part",
sessionKey: managedSession.sessionKey,
partType: "teamActivity",
payload: { teamId, status, members },
});
continue;
}
if (p.type === "image") {
const url = typeof p.url === "string" ? p.url : "";
if (!url) continue;
this.emit(managedSession, {
type: "structured_part",
sessionKey: managedSession.sessionKey,
partType: "media",
payload: {
url,
...(typeof p.mimeType === "string" ? { mimeType: p.mimeType } : {}),
},
});
continue;
}
if (p.type === "error") {
const errorMessage = typeof p.message === "string" ? p.message : "";
if (!errorMessage) continue;
this.emit(managedSession, {
type: "structured_part",
sessionKey: managedSession.sessionKey,
partType: "error",
payload: {
code: typeof p.code === "string" ? p.code : "unknown",
message: errorMessage,
},
});
continue;
}
}
}
private createSessionState( private createSessionState(
managedSession: ManagedGatewaySession, managedSession: ManagedGatewaySession,
): GatewaySessionState { ): GatewaySessionState {
@ -1106,7 +1176,9 @@ export class GatewayRuntime {
response.write("\n"); response.write("\n");
const listener = createVercelStreamListener(response); const listener = createVercelStreamListener(response);
const structuredPartListener = createGatewayStructuredPartListener(response);
let unsubscribe: (() => void) | undefined; let unsubscribe: (() => void) | undefined;
let unsubscribeStructured: (() => void) | undefined;
let streamingActive = false; let streamingActive = false;
const stopStreaming = () => { const stopStreaming = () => {
@ -1114,6 +1186,8 @@ export class GatewayRuntime {
streamingActive = false; streamingActive = false;
unsubscribe?.(); unsubscribe?.();
unsubscribe = undefined; unsubscribe = undefined;
unsubscribeStructured?.();
unsubscribeStructured = undefined;
}; };
// Clean up on client disconnect // Clean up on client disconnect
@ -1135,6 +1209,10 @@ export class GatewayRuntime {
onStart: () => { onStart: () => {
if (clientDisconnected || streamingActive) return; if (clientDisconnected || streamingActive) return;
unsubscribe = managedSession.session.subscribe(listener); unsubscribe = managedSession.session.subscribe(listener);
managedSession.listeners.add(structuredPartListener);
unsubscribeStructured = () => {
managedSession.listeners.delete(structuredPartListener);
};
streamingActive = true; streamingActive = true;
}, },
onFinish: () => { onFinish: () => {

View file

@ -85,6 +85,41 @@ export function messageContentToHistoryParts(msg: AgentMessage): HistoryPart[] {
args: toolCall.arguments, args: toolCall.arguments,
state: "call", state: "call",
}); });
} else if (contentPart.type === "teamActivity") {
const activity = contentPart as {
type: "teamActivity";
teamId: string;
status: string;
members?: Array<{ id: string; name: string; role?: string; status: string; message?: string }>;
};
parts.push({
type: "teamActivity",
teamId: activity.teamId,
status: activity.status,
members: Array.isArray(activity.members) ? activity.members : [],
});
} else if (contentPart.type === "image") {
const image = contentPart as {
type: "image";
url: string;
mimeType?: string;
};
parts.push({
type: "media",
url: image.url,
mimeType: image.mimeType,
});
} else if (contentPart.type === "error") {
const error = contentPart as {
type: "error";
code?: string;
message: string;
};
parts.push({
type: "error",
code: typeof error.code === "string" ? error.code : "unknown",
message: error.message,
});
} }
} }
return parts; return parts;

View file

@ -77,7 +77,15 @@ export type HistoryPart =
args: unknown; args: unknown;
state: string; state: string;
result?: unknown; result?: unknown;
}; }
| {
type: "teamActivity";
teamId: string;
status: string;
members: Array<{ id: string; name: string; role?: string; status: string; message?: string }>;
}
| { type: "media"; url: string; mimeType?: string }
| { type: "error"; code: string; message: string };
export interface ChannelStatus { export interface ChannelStatus {
id: string; id: string;

View file

@ -1,6 +1,7 @@
import { randomUUID } from "node:crypto"; import { randomUUID } from "node:crypto";
import type { ServerResponse } from "node:http"; import type { ServerResponse } from "node:http";
import type { AgentSessionEvent } from "../agent-session.js"; import type { AgentSessionEvent } from "../agent-session.js";
import type { GatewayEvent } from "./internal-types.js";
type TextStreamState = { type TextStreamState = {
started: boolean; started: boolean;
@ -324,3 +325,22 @@ export function errorVercelStream(
writeChunk(response, "[DONE]"); writeChunk(response, "[DONE]");
response.end(); response.end();
} }
/**
* Create a GatewayEvent listener that forwards `structured_part` events to the
* response as custom SSE chunks. Returns the listener function so the caller
* can subscribe it to managedSession.listeners and unsubscribe on cleanup.
*/
export function createGatewayStructuredPartListener(
response: ServerResponse,
): (event: GatewayEvent) => void {
return (event: GatewayEvent) => {
if (response.writableEnded) return;
if (event.type !== "structured_part") return;
writeChunk(response, {
type: "structured-part",
partType: event.partType,
payload: event.payload,
});
};
}