mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-15 13:03:42 +00:00
fix: ESC key not interrupting during Working... state
Three related fixes:
1. google-gemini-cli: Handle abort signal in stream reading loop
- Add abort event listener to cancel reader immediately when signal fires
- Fix AbortError detection in retry catch block (fetch throws AbortError,
not our custom message)
- Swallow reader.cancel() rejection to avoid unhandled promise
2. agent-session: Fix retry attempt counter showing 0 on cancel
- abortRetry() was resetting _retryAttempt before the catch block could
read it for the error message
3. interactive-mode: Restore main escape handler on agent_start
- When auto-retry starts, onEscape is replaced with retry-specific handler
- auto_retry_end (which restores it) fires on turn_end, after streaming begins
- Now restore immediately on agent_start if retry handler is still active
Amended: suppress reader.cancel() rejection on abort.
This commit is contained in:
parent
cfa63c255d
commit
a65da1c14b
3 changed files with 161 additions and 133 deletions
|
|
@ -305,8 +305,11 @@ export const streamGoogleGeminiCli: StreamFunction<"google-gemini-cli"> = (
|
|||
// Not retryable or max retries exceeded
|
||||
throw new Error(`Cloud Code Assist API error (${response.status}): ${errorText}`);
|
||||
} catch (error) {
|
||||
if (error instanceof Error && error.message === "Request was aborted") {
|
||||
throw error;
|
||||
// Check for abort - fetch throws AbortError, our code throws "Request was aborted"
|
||||
if (error instanceof Error) {
|
||||
if (error.name === "AbortError" || error.message === "Request was aborted") {
|
||||
throw new Error("Request was aborted");
|
||||
}
|
||||
}
|
||||
lastError = error instanceof Error ? error : new Error(String(error));
|
||||
// Network errors are retryable
|
||||
|
|
@ -338,46 +341,109 @@ export const streamGoogleGeminiCli: StreamFunction<"google-gemini-cli"> = (
|
|||
const decoder = new TextDecoder();
|
||||
let buffer = "";
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
// Set up abort handler to cancel reader when signal fires
|
||||
const abortHandler = () => {
|
||||
void reader.cancel().catch(() => {});
|
||||
};
|
||||
options?.signal?.addEventListener("abort", abortHandler);
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split("\n");
|
||||
buffer = lines.pop() || "";
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.startsWith("data:")) continue;
|
||||
|
||||
const jsonStr = line.slice(5).trim();
|
||||
if (!jsonStr) continue;
|
||||
|
||||
let chunk: CloudCodeAssistResponseChunk;
|
||||
try {
|
||||
chunk = JSON.parse(jsonStr);
|
||||
} catch {
|
||||
continue;
|
||||
try {
|
||||
while (true) {
|
||||
// Check abort signal before each read
|
||||
if (options?.signal?.aborted) {
|
||||
throw new Error("Request was aborted");
|
||||
}
|
||||
|
||||
// Unwrap the response
|
||||
const responseData = chunk.response;
|
||||
if (!responseData) continue;
|
||||
const { done, value } = await reader.read();
|
||||
if (done) break;
|
||||
|
||||
const candidate = responseData.candidates?.[0];
|
||||
if (candidate?.content?.parts) {
|
||||
for (const part of candidate.content.parts) {
|
||||
if (part.text !== undefined) {
|
||||
const isThinking = isThinkingPart(part);
|
||||
if (
|
||||
!currentBlock ||
|
||||
(isThinking && currentBlock.type !== "thinking") ||
|
||||
(!isThinking && currentBlock.type !== "text")
|
||||
) {
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split("\n");
|
||||
buffer = lines.pop() || "";
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.startsWith("data:")) continue;
|
||||
|
||||
const jsonStr = line.slice(5).trim();
|
||||
if (!jsonStr) continue;
|
||||
|
||||
let chunk: CloudCodeAssistResponseChunk;
|
||||
try {
|
||||
chunk = JSON.parse(jsonStr);
|
||||
} catch {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Unwrap the response
|
||||
const responseData = chunk.response;
|
||||
if (!responseData) continue;
|
||||
|
||||
const candidate = responseData.candidates?.[0];
|
||||
if (candidate?.content?.parts) {
|
||||
for (const part of candidate.content.parts) {
|
||||
if (part.text !== undefined) {
|
||||
const isThinking = isThinkingPart(part);
|
||||
if (
|
||||
!currentBlock ||
|
||||
(isThinking && currentBlock.type !== "thinking") ||
|
||||
(!isThinking && currentBlock.type !== "text")
|
||||
) {
|
||||
if (currentBlock) {
|
||||
if (currentBlock.type === "text") {
|
||||
stream.push({
|
||||
type: "text_end",
|
||||
contentIndex: blocks.length - 1,
|
||||
content: currentBlock.text,
|
||||
partial: output,
|
||||
});
|
||||
} else {
|
||||
stream.push({
|
||||
type: "thinking_end",
|
||||
contentIndex: blockIndex(),
|
||||
content: currentBlock.thinking,
|
||||
partial: output,
|
||||
});
|
||||
}
|
||||
}
|
||||
if (isThinking) {
|
||||
currentBlock = { type: "thinking", thinking: "", thinkingSignature: undefined };
|
||||
output.content.push(currentBlock);
|
||||
stream.push({ type: "thinking_start", contentIndex: blockIndex(), partial: output });
|
||||
} else {
|
||||
currentBlock = { type: "text", text: "" };
|
||||
output.content.push(currentBlock);
|
||||
stream.push({ type: "text_start", contentIndex: blockIndex(), partial: output });
|
||||
}
|
||||
}
|
||||
if (currentBlock.type === "thinking") {
|
||||
currentBlock.thinking += part.text;
|
||||
currentBlock.thinkingSignature = retainThoughtSignature(
|
||||
currentBlock.thinkingSignature,
|
||||
part.thoughtSignature,
|
||||
);
|
||||
stream.push({
|
||||
type: "thinking_delta",
|
||||
contentIndex: blockIndex(),
|
||||
delta: part.text,
|
||||
partial: output,
|
||||
});
|
||||
} else {
|
||||
currentBlock.text += part.text;
|
||||
stream.push({
|
||||
type: "text_delta",
|
||||
contentIndex: blockIndex(),
|
||||
delta: part.text,
|
||||
partial: output,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (part.functionCall) {
|
||||
if (currentBlock) {
|
||||
if (currentBlock.type === "text") {
|
||||
stream.push({
|
||||
type: "text_end",
|
||||
contentIndex: blocks.length - 1,
|
||||
contentIndex: blockIndex(),
|
||||
content: currentBlock.text,
|
||||
partial: output,
|
||||
});
|
||||
|
|
@ -389,118 +455,70 @@ export const streamGoogleGeminiCli: StreamFunction<"google-gemini-cli"> = (
|
|||
partial: output,
|
||||
});
|
||||
}
|
||||
currentBlock = null;
|
||||
}
|
||||
if (isThinking) {
|
||||
currentBlock = { type: "thinking", thinking: "", thinkingSignature: undefined };
|
||||
output.content.push(currentBlock);
|
||||
stream.push({ type: "thinking_start", contentIndex: blockIndex(), partial: output });
|
||||
} else {
|
||||
currentBlock = { type: "text", text: "" };
|
||||
output.content.push(currentBlock);
|
||||
stream.push({ type: "text_start", contentIndex: blockIndex(), partial: output });
|
||||
}
|
||||
}
|
||||
if (currentBlock.type === "thinking") {
|
||||
currentBlock.thinking += part.text;
|
||||
currentBlock.thinkingSignature = retainThoughtSignature(
|
||||
currentBlock.thinkingSignature,
|
||||
part.thoughtSignature,
|
||||
);
|
||||
|
||||
const providedId = part.functionCall.id;
|
||||
const needsNewId =
|
||||
!providedId || output.content.some((b) => b.type === "toolCall" && b.id === providedId);
|
||||
const toolCallId = needsNewId
|
||||
? `${part.functionCall.name}_${Date.now()}_${++toolCallCounter}`
|
||||
: providedId;
|
||||
|
||||
const toolCall: ToolCall = {
|
||||
type: "toolCall",
|
||||
id: toolCallId,
|
||||
name: part.functionCall.name || "",
|
||||
arguments: part.functionCall.args as Record<string, unknown>,
|
||||
...(part.thoughtSignature && { thoughtSignature: part.thoughtSignature }),
|
||||
};
|
||||
|
||||
output.content.push(toolCall);
|
||||
stream.push({ type: "toolcall_start", contentIndex: blockIndex(), partial: output });
|
||||
stream.push({
|
||||
type: "thinking_delta",
|
||||
type: "toolcall_delta",
|
||||
contentIndex: blockIndex(),
|
||||
delta: part.text,
|
||||
partial: output,
|
||||
});
|
||||
} else {
|
||||
currentBlock.text += part.text;
|
||||
stream.push({
|
||||
type: "text_delta",
|
||||
contentIndex: blockIndex(),
|
||||
delta: part.text,
|
||||
delta: JSON.stringify(toolCall.arguments),
|
||||
partial: output,
|
||||
});
|
||||
stream.push({ type: "toolcall_end", contentIndex: blockIndex(), toolCall, partial: output });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (part.functionCall) {
|
||||
if (currentBlock) {
|
||||
if (currentBlock.type === "text") {
|
||||
stream.push({
|
||||
type: "text_end",
|
||||
contentIndex: blockIndex(),
|
||||
content: currentBlock.text,
|
||||
partial: output,
|
||||
});
|
||||
} else {
|
||||
stream.push({
|
||||
type: "thinking_end",
|
||||
contentIndex: blockIndex(),
|
||||
content: currentBlock.thinking,
|
||||
partial: output,
|
||||
});
|
||||
}
|
||||
currentBlock = null;
|
||||
}
|
||||
|
||||
const providedId = part.functionCall.id;
|
||||
const needsNewId =
|
||||
!providedId || output.content.some((b) => b.type === "toolCall" && b.id === providedId);
|
||||
const toolCallId = needsNewId
|
||||
? `${part.functionCall.name}_${Date.now()}_${++toolCallCounter}`
|
||||
: providedId;
|
||||
|
||||
const toolCall: ToolCall = {
|
||||
type: "toolCall",
|
||||
id: toolCallId,
|
||||
name: part.functionCall.name || "",
|
||||
arguments: part.functionCall.args as Record<string, unknown>,
|
||||
...(part.thoughtSignature && { thoughtSignature: part.thoughtSignature }),
|
||||
};
|
||||
|
||||
output.content.push(toolCall);
|
||||
stream.push({ type: "toolcall_start", contentIndex: blockIndex(), partial: output });
|
||||
stream.push({
|
||||
type: "toolcall_delta",
|
||||
contentIndex: blockIndex(),
|
||||
delta: JSON.stringify(toolCall.arguments),
|
||||
partial: output,
|
||||
});
|
||||
stream.push({ type: "toolcall_end", contentIndex: blockIndex(), toolCall, partial: output });
|
||||
if (candidate?.finishReason) {
|
||||
output.stopReason = mapStopReasonString(candidate.finishReason);
|
||||
if (output.content.some((b) => b.type === "toolCall")) {
|
||||
output.stopReason = "toolUse";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (candidate?.finishReason) {
|
||||
output.stopReason = mapStopReasonString(candidate.finishReason);
|
||||
if (output.content.some((b) => b.type === "toolCall")) {
|
||||
output.stopReason = "toolUse";
|
||||
}
|
||||
}
|
||||
|
||||
if (responseData.usageMetadata) {
|
||||
// promptTokenCount includes cachedContentTokenCount, so subtract to get fresh input
|
||||
const promptTokens = responseData.usageMetadata.promptTokenCount || 0;
|
||||
const cacheReadTokens = responseData.usageMetadata.cachedContentTokenCount || 0;
|
||||
output.usage = {
|
||||
input: promptTokens - cacheReadTokens,
|
||||
output:
|
||||
(responseData.usageMetadata.candidatesTokenCount || 0) +
|
||||
(responseData.usageMetadata.thoughtsTokenCount || 0),
|
||||
cacheRead: cacheReadTokens,
|
||||
cacheWrite: 0,
|
||||
totalTokens: responseData.usageMetadata.totalTokenCount || 0,
|
||||
cost: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
if (responseData.usageMetadata) {
|
||||
// promptTokenCount includes cachedContentTokenCount, so subtract to get fresh input
|
||||
const promptTokens = responseData.usageMetadata.promptTokenCount || 0;
|
||||
const cacheReadTokens = responseData.usageMetadata.cachedContentTokenCount || 0;
|
||||
output.usage = {
|
||||
input: promptTokens - cacheReadTokens,
|
||||
output:
|
||||
(responseData.usageMetadata.candidatesTokenCount || 0) +
|
||||
(responseData.usageMetadata.thoughtsTokenCount || 0),
|
||||
cacheRead: cacheReadTokens,
|
||||
cacheWrite: 0,
|
||||
total: 0,
|
||||
},
|
||||
};
|
||||
calculateCost(model, output.usage);
|
||||
totalTokens: responseData.usageMetadata.totalTokenCount || 0,
|
||||
cost: {
|
||||
input: 0,
|
||||
output: 0,
|
||||
cacheRead: 0,
|
||||
cacheWrite: 0,
|
||||
total: 0,
|
||||
},
|
||||
};
|
||||
calculateCost(model, output.usage);
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
options?.signal?.removeEventListener("abort", abortHandler);
|
||||
}
|
||||
|
||||
if (currentBlock) {
|
||||
|
|
|
|||
|
|
@ -1567,7 +1567,7 @@ export class AgentSession {
|
|||
*/
|
||||
abortRetry(): void {
|
||||
this._retryAbortController?.abort();
|
||||
this._retryAttempt = 0;
|
||||
// Note: _retryAttempt is reset in the catch block of _autoRetry
|
||||
this._resolveRetry();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1481,6 +1481,16 @@ export class InteractiveMode {
|
|||
|
||||
switch (event.type) {
|
||||
case "agent_start":
|
||||
// Restore main escape handler if retry handler is still active
|
||||
// (retry success event fires later, but we need main handler now)
|
||||
if (this.retryEscapeHandler) {
|
||||
this.defaultEditor.onEscape = this.retryEscapeHandler;
|
||||
this.retryEscapeHandler = undefined;
|
||||
}
|
||||
if (this.retryLoader) {
|
||||
this.retryLoader.stop();
|
||||
this.retryLoader = undefined;
|
||||
}
|
||||
if (this.loadingAnimation) {
|
||||
this.loadingAnimation.stop();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue