Merge pull request #568 from tmustier/gemini-raw-stream

fix: restore ESC interrupt after auto-retry and correct retry abort messaging
This commit is contained in:
Mario Zechner 2026-01-08 18:50:47 +01:00 committed by GitHub
commit 7a2c19cdf0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 199 additions and 146 deletions

View file

@ -305,8 +305,11 @@ export const streamGoogleGeminiCli: StreamFunction<"google-gemini-cli"> = (
// Not retryable or max retries exceeded
throw new Error(`Cloud Code Assist API error (${response.status}): ${errorText}`);
} catch (error) {
if (error instanceof Error && error.message === "Request was aborted") {
throw error;
// Check for abort - fetch throws AbortError, our code throws "Request was aborted"
if (error instanceof Error) {
if (error.name === "AbortError" || error.message === "Request was aborted") {
throw new Error("Request was aborted");
}
}
lastError = error instanceof Error ? error : new Error(String(error));
// Network errors are retryable
@ -338,46 +341,109 @@ export const streamGoogleGeminiCli: StreamFunction<"google-gemini-cli"> = (
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Set up abort handler to cancel reader when signal fires
const abortHandler = () => {
void reader.cancel().catch(() => {});
};
options?.signal?.addEventListener("abort", abortHandler);
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (!line.startsWith("data:")) continue;
const jsonStr = line.slice(5).trim();
if (!jsonStr) continue;
let chunk: CloudCodeAssistResponseChunk;
try {
chunk = JSON.parse(jsonStr);
} catch {
continue;
try {
while (true) {
// Check abort signal before each read
if (options?.signal?.aborted) {
throw new Error("Request was aborted");
}
// Unwrap the response
const responseData = chunk.response;
if (!responseData) continue;
const { done, value } = await reader.read();
if (done) break;
const candidate = responseData.candidates?.[0];
if (candidate?.content?.parts) {
for (const part of candidate.content.parts) {
if (part.text !== undefined) {
const isThinking = isThinkingPart(part);
if (
!currentBlock ||
(isThinking && currentBlock.type !== "thinking") ||
(!isThinking && currentBlock.type !== "text")
) {
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
buffer = lines.pop() || "";
for (const line of lines) {
if (!line.startsWith("data:")) continue;
const jsonStr = line.slice(5).trim();
if (!jsonStr) continue;
let chunk: CloudCodeAssistResponseChunk;
try {
chunk = JSON.parse(jsonStr);
} catch {
continue;
}
// Unwrap the response
const responseData = chunk.response;
if (!responseData) continue;
const candidate = responseData.candidates?.[0];
if (candidate?.content?.parts) {
for (const part of candidate.content.parts) {
if (part.text !== undefined) {
const isThinking = isThinkingPart(part);
if (
!currentBlock ||
(isThinking && currentBlock.type !== "thinking") ||
(!isThinking && currentBlock.type !== "text")
) {
if (currentBlock) {
if (currentBlock.type === "text") {
stream.push({
type: "text_end",
contentIndex: blocks.length - 1,
content: currentBlock.text,
partial: output,
});
} else {
stream.push({
type: "thinking_end",
contentIndex: blockIndex(),
content: currentBlock.thinking,
partial: output,
});
}
}
if (isThinking) {
currentBlock = { type: "thinking", thinking: "", thinkingSignature: undefined };
output.content.push(currentBlock);
stream.push({ type: "thinking_start", contentIndex: blockIndex(), partial: output });
} else {
currentBlock = { type: "text", text: "" };
output.content.push(currentBlock);
stream.push({ type: "text_start", contentIndex: blockIndex(), partial: output });
}
}
if (currentBlock.type === "thinking") {
currentBlock.thinking += part.text;
currentBlock.thinkingSignature = retainThoughtSignature(
currentBlock.thinkingSignature,
part.thoughtSignature,
);
stream.push({
type: "thinking_delta",
contentIndex: blockIndex(),
delta: part.text,
partial: output,
});
} else {
currentBlock.text += part.text;
stream.push({
type: "text_delta",
contentIndex: blockIndex(),
delta: part.text,
partial: output,
});
}
}
if (part.functionCall) {
if (currentBlock) {
if (currentBlock.type === "text") {
stream.push({
type: "text_end",
contentIndex: blocks.length - 1,
contentIndex: blockIndex(),
content: currentBlock.text,
partial: output,
});
@ -389,118 +455,70 @@ export const streamGoogleGeminiCli: StreamFunction<"google-gemini-cli"> = (
partial: output,
});
}
currentBlock = null;
}
if (isThinking) {
currentBlock = { type: "thinking", thinking: "", thinkingSignature: undefined };
output.content.push(currentBlock);
stream.push({ type: "thinking_start", contentIndex: blockIndex(), partial: output });
} else {
currentBlock = { type: "text", text: "" };
output.content.push(currentBlock);
stream.push({ type: "text_start", contentIndex: blockIndex(), partial: output });
}
}
if (currentBlock.type === "thinking") {
currentBlock.thinking += part.text;
currentBlock.thinkingSignature = retainThoughtSignature(
currentBlock.thinkingSignature,
part.thoughtSignature,
);
const providedId = part.functionCall.id;
const needsNewId =
!providedId || output.content.some((b) => b.type === "toolCall" && b.id === providedId);
const toolCallId = needsNewId
? `${part.functionCall.name}_${Date.now()}_${++toolCallCounter}`
: providedId;
const toolCall: ToolCall = {
type: "toolCall",
id: toolCallId,
name: part.functionCall.name || "",
arguments: part.functionCall.args as Record<string, unknown>,
...(part.thoughtSignature && { thoughtSignature: part.thoughtSignature }),
};
output.content.push(toolCall);
stream.push({ type: "toolcall_start", contentIndex: blockIndex(), partial: output });
stream.push({
type: "thinking_delta",
type: "toolcall_delta",
contentIndex: blockIndex(),
delta: part.text,
partial: output,
});
} else {
currentBlock.text += part.text;
stream.push({
type: "text_delta",
contentIndex: blockIndex(),
delta: part.text,
delta: JSON.stringify(toolCall.arguments),
partial: output,
});
stream.push({ type: "toolcall_end", contentIndex: blockIndex(), toolCall, partial: output });
}
}
}
if (part.functionCall) {
if (currentBlock) {
if (currentBlock.type === "text") {
stream.push({
type: "text_end",
contentIndex: blockIndex(),
content: currentBlock.text,
partial: output,
});
} else {
stream.push({
type: "thinking_end",
contentIndex: blockIndex(),
content: currentBlock.thinking,
partial: output,
});
}
currentBlock = null;
}
const providedId = part.functionCall.id;
const needsNewId =
!providedId || output.content.some((b) => b.type === "toolCall" && b.id === providedId);
const toolCallId = needsNewId
? `${part.functionCall.name}_${Date.now()}_${++toolCallCounter}`
: providedId;
const toolCall: ToolCall = {
type: "toolCall",
id: toolCallId,
name: part.functionCall.name || "",
arguments: part.functionCall.args as Record<string, unknown>,
...(part.thoughtSignature && { thoughtSignature: part.thoughtSignature }),
};
output.content.push(toolCall);
stream.push({ type: "toolcall_start", contentIndex: blockIndex(), partial: output });
stream.push({
type: "toolcall_delta",
contentIndex: blockIndex(),
delta: JSON.stringify(toolCall.arguments),
partial: output,
});
stream.push({ type: "toolcall_end", contentIndex: blockIndex(), toolCall, partial: output });
if (candidate?.finishReason) {
output.stopReason = mapStopReasonString(candidate.finishReason);
if (output.content.some((b) => b.type === "toolCall")) {
output.stopReason = "toolUse";
}
}
}
if (candidate?.finishReason) {
output.stopReason = mapStopReasonString(candidate.finishReason);
if (output.content.some((b) => b.type === "toolCall")) {
output.stopReason = "toolUse";
}
}
if (responseData.usageMetadata) {
// promptTokenCount includes cachedContentTokenCount, so subtract to get fresh input
const promptTokens = responseData.usageMetadata.promptTokenCount || 0;
const cacheReadTokens = responseData.usageMetadata.cachedContentTokenCount || 0;
output.usage = {
input: promptTokens - cacheReadTokens,
output:
(responseData.usageMetadata.candidatesTokenCount || 0) +
(responseData.usageMetadata.thoughtsTokenCount || 0),
cacheRead: cacheReadTokens,
cacheWrite: 0,
totalTokens: responseData.usageMetadata.totalTokenCount || 0,
cost: {
input: 0,
output: 0,
cacheRead: 0,
if (responseData.usageMetadata) {
// promptTokenCount includes cachedContentTokenCount, so subtract to get fresh input
const promptTokens = responseData.usageMetadata.promptTokenCount || 0;
const cacheReadTokens = responseData.usageMetadata.cachedContentTokenCount || 0;
output.usage = {
input: promptTokens - cacheReadTokens,
output:
(responseData.usageMetadata.candidatesTokenCount || 0) +
(responseData.usageMetadata.thoughtsTokenCount || 0),
cacheRead: cacheReadTokens,
cacheWrite: 0,
total: 0,
},
};
calculateCost(model, output.usage);
totalTokens: responseData.usageMetadata.totalTokenCount || 0,
cost: {
input: 0,
output: 0,
cacheRead: 0,
cacheWrite: 0,
total: 0,
},
};
calculateCost(model, output.usage);
}
}
}
} finally {
options?.signal?.removeEventListener("abort", abortHandler);
}
if (currentBlock) {

View file

@ -435,6 +435,11 @@ export class AgentSession {
return this.agent.state.isStreaming;
}
/** Current retry attempt (0 if not retrying) */
get retryAttempt(): number {
return this._retryAttempt;
}
/**
* Get the names of currently active tools.
* Returns the names of tools currently set on the agent.
@ -1567,7 +1572,7 @@ export class AgentSession {
*/
abortRetry(): void {
this._retryAbortController?.abort();
this._retryAttempt = 0;
// Note: _retryAttempt is reset in the catch block of _autoRetry
this._resolveRetry();
}

View file

@ -31,12 +31,11 @@ export class AssistantMessageComponent extends Container {
// Clear content container
this.contentContainer.clear();
if (
message.content.length > 0 &&
message.content.some(
(c) => (c.type === "text" && c.text.trim()) || (c.type === "thinking" && c.thinking.trim()),
)
) {
const hasVisibleContent = message.content.some(
(c) => (c.type === "text" && c.text.trim()) || (c.type === "thinking" && c.thinking.trim()),
);
if (hasVisibleContent) {
this.contentContainer.addChild(new Spacer(1));
}
@ -75,7 +74,12 @@ export class AssistantMessageComponent extends Container {
const hasToolCalls = message.content.some((c) => c.type === "toolCall");
if (!hasToolCalls) {
if (message.stopReason === "aborted") {
this.contentContainer.addChild(new Text(theme.fg("error", "\nAborted"), 1, 0));
const abortMessage =
message.errorMessage && message.errorMessage !== "Request was aborted"
? message.errorMessage
: "Operation aborted";
const prefix = hasVisibleContent ? "\n" : "";
this.contentContainer.addChild(new Text(theme.fg("error", `${prefix}${abortMessage}`), 1, 0));
} else if (message.stopReason === "error") {
const errorMsg = message.errorMessage || "Unknown error";
this.contentContainer.addChild(new Spacer(1));

View file

@ -1481,6 +1481,16 @@ export class InteractiveMode {
switch (event.type) {
case "agent_start":
// Restore main escape handler if retry handler is still active
// (retry success event fires later, but we need main handler now)
if (this.retryEscapeHandler) {
this.defaultEditor.onEscape = this.retryEscapeHandler;
this.retryEscapeHandler = undefined;
}
if (this.retryLoader) {
this.retryLoader.stop();
this.retryLoader = undefined;
}
if (this.loadingAnimation) {
this.loadingAnimation.stop();
}
@ -1549,13 +1559,21 @@ export class InteractiveMode {
if (event.message.role === "user") break;
if (this.streamingComponent && event.message.role === "assistant") {
this.streamingMessage = event.message;
let errorMessage: string | undefined;
if (this.streamingMessage.stopReason === "aborted") {
const retryAttempt = this.session.retryAttempt;
errorMessage =
retryAttempt > 0
? `Aborted after ${retryAttempt} retry attempt${retryAttempt > 1 ? "s" : ""}`
: "Operation aborted";
this.streamingMessage.errorMessage = errorMessage;
}
this.streamingComponent.updateContent(this.streamingMessage);
if (this.streamingMessage.stopReason === "aborted" || this.streamingMessage.stopReason === "error") {
const errorMessage =
this.streamingMessage.stopReason === "aborted"
? "Operation aborted"
: this.streamingMessage.errorMessage || "Error";
if (!errorMessage) {
errorMessage = this.streamingMessage.errorMessage || "Error";
}
for (const [, component] of this.pendingTools.entries()) {
component.updateResult({
content: [{ type: "text", text: errorMessage }],
@ -1862,8 +1880,16 @@ export class InteractiveMode {
this.chatContainer.addChild(component);
if (message.stopReason === "aborted" || message.stopReason === "error") {
const errorMessage =
message.stopReason === "aborted" ? "Operation aborted" : message.errorMessage || "Error";
let errorMessage: string;
if (message.stopReason === "aborted") {
const retryAttempt = this.session.retryAttempt;
errorMessage =
retryAttempt > 0
? `Aborted after ${retryAttempt} retry attempt${retryAttempt > 1 ? "s" : ""}`
: "Operation aborted";
} else {
errorMessage = message.errorMessage || "Error";
}
component.updateResult({ content: [{ type: "text", text: errorMessage }], isError: true });
} else {
this.pendingTools.set(content.id, component);