mirror of
https://github.com/getcompanion-ai/co-mono.git
synced 2026-04-16 16:00:58 +00:00
- Set up @mariozechner/ai package structure following monorepo patterns - Install OpenAI, Anthropic, and Google Gemini SDK dependencies - Document comprehensive API investigation for all three providers - Design minimal unified API with streaming-first architecture - Add models.dev integration for pricing and capabilities - Implement automatic caching strategy for all providers - Update project documentation with package creation guide
24 KiB
24 KiB
Unified AI API Design Plan
Based on comprehensive investigation of OpenAI, Anthropic, and Gemini SDKs with actual implementation examples.
Key API Differences Summary
OpenAI
- Dual APIs: Chat Completions (broad support) vs Responses API (o1/o3 thinking content)
- Thinking: Only Responses API gives actual content, Chat Completions only gives counts
- Roles:
system,user,assistant,tool(o1/o3 usedeveloperinstead ofsystem) - Streaming: Deltas in chunks with
stream_options.include_usagefor token usage
Anthropic
- Single API: Messages API with comprehensive streaming
- Content Blocks: Always arrays, even for simple text
- System: Separate parameter, not in messages array
- Tool Use: Content blocks, not separate message role
- Thinking: Explicit budget allocation, appears as content blocks
- Caching: Per-block cache control with TTL options
Gemini
- Parts System: All content split into typed parts
- System: Separate
systemInstructionparameter - Roles: Uses
modelinstead ofassistant - Thinking:
part.thought: trueflag identifies reasoning - Streaming: Returns complete responses, not deltas
- Function Calls: Embedded in parts array
Unified API Design
Core Client
interface AIConfig {
provider: 'openai' | 'anthropic' | 'gemini';
apiKey: string;
model: string;
baseURL?: string; // For OpenAI-compatible endpoints
}
interface ModelInfo {
id: string;
name: string;
provider: string;
capabilities: {
reasoning: boolean;
toolCall: boolean;
vision: boolean;
audio?: boolean;
};
cost: {
input: number; // per million tokens
output: number; // per million tokens
cacheRead?: number;
cacheWrite?: number;
};
limits: {
context: number;
output: number;
};
knowledge?: string; // Knowledge cutoff date
}
class AI {
constructor(config: AIConfig);
// Main streaming interface - everything else builds on this
async *stream(request: Request): AsyncGenerator<Event>;
// Convenience method for non-streaming
async complete(request: Request): Promise<Response>;
// Get model information
getModelInfo(): ModelInfo;
// Abort current request
abort(): void;
}
Message Format
type Message =
| {
role: 'user';
content: string | Content[];
}
| {
role: 'assistant';
content: string | Content[];
model: string;
usage: TokenUsage;
toolCalls?: {
id: string;
name: string;
arguments: Record<string, any>;
}[];
}
| {
role: 'tool';
content: string | Content[];
toolCallId: string;
};
interface Content {
type: 'text' | 'image';
text?: string;
image?: {
data: string; // base64
mimeType: string;
};
}
Request Format
interface Request {
messages: Message[];
// System prompt (separated for Anthropic/Gemini compatibility)
systemPrompt?: string;
// Common parameters
temperature?: number;
maxTokens?: number;
stopSequences?: string[];
// Tools
tools?: {
name: string;
description: string;
parameters: Record<string, any>; // JSON Schema
}[];
toolChoice?: 'auto' | 'none' | 'required' | { name: string };
// Thinking/reasoning
reasoning?: {
enabled: boolean;
effort?: 'low' | 'medium' | 'high'; // OpenAI reasoning_effort
maxTokens?: number; // Anthropic thinking budget
};
// Abort signal
signal?: AbortSignal;
}
Event Stream
type Event =
| { type: 'start'; model: string; provider: string }
| { type: 'text'; content: string; delta: string }
| { type: 'thinking'; content: string; delta: string }
| { type: 'toolCall'; toolCall: ToolCall }
| { type: 'usage'; usage: TokenUsage }
| { type: 'done'; reason: StopReason; message: Message } // message includes model and usage
| { type: 'error'; error: Error };
interface TokenUsage {
input: number;
output: number;
total: number;
thinking?: number;
cacheRead?: number;
cacheWrite?: number;
cost?: {
input: number;
output: number;
cache?: number;
total: number;
};
}
type StopReason = 'stop' | 'length' | 'toolUse' | 'safety' | 'error';
Caching Strategy
Caching is handled automatically by each provider adapter:
- OpenAI: Automatic prompt caching (no configuration needed)
- Gemini: Automatic context caching (no configuration needed)
- Anthropic: We automatically add cache_control to the system prompt and older messages
class AnthropicAdapter {
private addCaching(messages: Message[]): any[] {
const anthropicMessages = [];
// Automatically cache older messages (assuming incremental context)
for (let i = 0; i < messages.length; i++) {
const msg = messages[i];
const isOld = i < messages.length - 2; // Cache all but last 2 messages
// Convert to Anthropic format with automatic caching
const blocks = this.toContentBlocks(msg);
if (isOld && blocks.length > 0) {
blocks[0].cache_control = { type: 'ephemeral' };
}
anthropicMessages.push({
role: msg.role === 'assistant' ? 'assistant' : 'user',
content: blocks
});
}
return anthropicMessages;
}
}
Provider Adapter Implementation
OpenAI Adapter
class OpenAIAdapter {
private client: OpenAI;
private useResponsesAPI: boolean = false;
async *stream(request: Request): AsyncGenerator<Event> {
// Determine which API to use
if (request.reasoning?.enabled && this.isReasoningModel()) {
yield* this.streamResponsesAPI(request);
} else {
yield* this.streamChatCompletions(request);
}
}
private async *streamChatCompletions(request: Request) {
const stream = await this.client.chat.completions.create({
model: this.model,
messages: this.toOpenAIMessages(request),
tools: this.toOpenAITools(request.tools),
reasoning_effort: request.reasoning?.effort,
stream: true,
stream_options: { include_usage: true }
});
let content = '';
let toolCalls: any[] = [];
for await (const chunk of stream) {
if (chunk.choices[0]?.delta?.content) {
const delta = chunk.choices[0].delta.content;
content += delta;
yield { type: 'text', content, delta };
}
if (chunk.choices[0]?.delta?.tool_calls) {
// Accumulate tool calls
this.mergeToolCalls(toolCalls, chunk.choices[0].delta.tool_calls);
for (const tc of toolCalls) {
yield { type: 'toolCall', toolCall: tc, partial: true };
}
}
if (chunk.usage) {
yield {
type: 'usage',
usage: {
input: chunk.usage.prompt_tokens,
output: chunk.usage.completion_tokens,
total: chunk.usage.total_tokens,
thinking: chunk.usage.completion_tokens_details?.reasoning_tokens
}
};
}
}
}
private async *streamResponsesAPI(request: Request) {
// Use Responses API for actual thinking content
const response = await this.client.responses.create({
model: this.model,
input: this.toResponsesInput(request),
tools: this.toResponsesTools(request.tools),
stream: true
});
for await (const event of response) {
if (event.type === 'response.reasoning_text.delta') {
yield {
type: 'thinking',
content: event.text,
delta: event.delta
};
}
// Handle other event types...
}
}
private toOpenAIMessages(request: Request): any[] {
const messages: any[] = [];
// Handle system prompt
if (request.systemPrompt) {
const role = this.isReasoningModel() ? 'developer' : 'system';
messages.push({ role, content: request.systemPrompt });
}
// Convert unified messages
for (const msg of request.messages) {
if (msg.role === 'tool') {
messages.push({
role: 'tool',
content: msg.content,
tool_call_id: msg.toolCallId
});
} else {
messages.push({
role: msg.role,
content: this.contentToString(msg.content),
tool_calls: msg.toolCalls
});
}
}
return messages;
}
}
Anthropic Adapter
class AnthropicAdapter {
private client: Anthropic;
async *stream(request: Request): AsyncGenerator<Event> {
const stream = this.client.messages.stream({
model: this.model,
max_tokens: request.maxTokens || 1024,
messages: this.addCaching(request.messages),
system: request.systemPrompt,
tools: this.toAnthropicTools(request.tools),
thinking: request.reasoning?.enabled ? {
type: 'enabled',
budget_tokens: request.reasoning.maxTokens || 2000
} : undefined
});
let content = '';
let thinking = '';
stream.on('text', (delta, snapshot) => {
content = snapshot;
// Note: Can't yield from callback, need different approach
});
stream.on('thinking', (delta, snapshot) => {
thinking = snapshot;
});
// Use raw streaming instead for proper async generator
const rawStream = await this.client.messages.create({
...params,
stream: true
});
for await (const chunk of rawStream) {
switch (chunk.type) {
case 'content_block_delta':
if (chunk.delta.type === 'text_delta') {
content += chunk.delta.text;
yield {
type: 'text',
content,
delta: chunk.delta.text
};
}
break;
case 'message_delta':
if (chunk.usage) {
yield {
type: 'usage',
usage: {
input: chunk.usage.input_tokens,
output: chunk.usage.output_tokens,
total: chunk.usage.input_tokens + chunk.usage.output_tokens,
cacheRead: chunk.usage.cache_read_input_tokens,
cacheWrite: chunk.usage.cache_creation_input_tokens
}
};
}
break;
}
}
}
private toAnthropicMessages(request: Request): any[] {
return request.messages.map(msg => {
if (msg.role === 'tool') {
// Tool results go as user messages with tool_result blocks
return {
role: 'user',
content: [{
type: 'tool_result',
tool_use_id: msg.toolCallId,
content: msg.content
}]
};
}
// Always use content blocks
const blocks: any[] = [];
if (typeof msg.content === 'string') {
blocks.push({
type: 'text',
text: msg.content,
cache_control: msg.cacheControl
});
} else {
// Convert unified content to blocks
for (const part of msg.content) {
if (part.type === 'text') {
blocks.push({ type: 'text', text: part.text });
} else if (part.type === 'image') {
blocks.push({
type: 'image',
source: {
type: 'base64',
media_type: part.image.mimeType,
data: part.image.data
}
});
}
}
}
// Add tool calls as blocks
if (msg.toolCalls) {
for (const tc of msg.toolCalls) {
blocks.push({
type: 'tool_use',
id: tc.id,
name: tc.name,
input: tc.arguments
});
}
}
return {
role: msg.role === 'assistant' ? 'assistant' : 'user',
content: blocks
};
});
}
}
Gemini Adapter
class GeminiAdapter {
private client: GoogleGenAI;
async *stream(request: Request): AsyncGenerator<Event> {
const stream = await this.client.models.generateContentStream({
model: this.model,
systemInstruction: request.systemPrompt ? {
parts: [{ text: request.systemPrompt }]
} : undefined,
contents: this.toGeminiContents(request),
tools: this.toGeminiTools(request.tools),
abortSignal: request.signal
});
let content = '';
let thinking = '';
for await (const chunk of stream) {
const candidate = chunk.candidates?.[0];
if (!candidate?.content?.parts) continue;
for (const part of candidate.content.parts) {
if (part.text && !part.thought) {
content += part.text;
yield {
type: 'text',
content,
delta: part.text
};
} else if (part.text && part.thought) {
thinking += part.text;
yield {
type: 'thinking',
content: thinking,
delta: part.text
};
} else if (part.functionCall) {
yield {
type: 'toolCall',
toolCall: {
id: part.functionCall.id || crypto.randomUUID(),
name: part.functionCall.name,
arguments: part.functionCall.args
}
};
}
}
if (chunk.usageMetadata) {
yield {
type: 'usage',
usage: {
input: chunk.usageMetadata.promptTokenCount || 0,
output: chunk.usageMetadata.candidatesTokenCount || 0,
total: chunk.usageMetadata.totalTokenCount || 0,
thinking: chunk.usageMetadata.thoughtsTokenCount,
cacheRead: chunk.usageMetadata.cachedContentTokenCount
}
};
}
}
}
private toGeminiContents(request: Request): any[] {
return request.messages.map(msg => {
const parts: any[] = [];
if (typeof msg.content === 'string') {
parts.push({ text: msg.content });
} else {
for (const part of msg.content) {
if (part.type === 'text') {
parts.push({ text: part.text });
} else if (part.type === 'image') {
parts.push({
inlineData: {
mimeType: part.image.mimeType,
data: part.image.data
}
});
}
}
}
// Add function calls as parts
if (msg.toolCalls) {
for (const tc of msg.toolCalls) {
parts.push({
functionCall: {
name: tc.name,
args: tc.arguments
}
});
}
}
// Add tool results as function responses
if (msg.role === 'tool') {
parts.push({
functionResponse: {
name: msg.toolCallId,
response: { result: msg.content }
}
});
}
return {
role: msg.role === 'assistant' ? 'model' : msg.role === 'tool' ? 'user' : msg.role,
parts
};
});
}
}
Usage Examples
Basic Streaming
const ai = new AI({
provider: 'openai',
apiKey: process.env.OPENAI_API_KEY,
model: 'gpt-4'
});
const stream = ai.stream({
messages: [
{ role: 'user', content: 'Write a haiku about coding' }
],
systemPrompt: 'You are a poetic programmer'
});
for await (const event of stream) {
switch (event.type) {
case 'text':
process.stdout.write(event.delta);
break;
case 'usage':
console.log(`\nTokens: ${event.usage.total}`);
break;
case 'done':
console.log(`\nFinished: ${event.reason}`);
break;
}
}
Cross-Provider Tool Calling
async function callWithTools(provider: 'openai' | 'anthropic' | 'gemini') {
const ai = new AI({
provider,
apiKey: process.env[`${provider.toUpperCase()}_API_KEY`],
model: getDefaultModel(provider)
});
const messages: Message[] = [{
role: 'user',
content: 'What is the weather in Paris and calculate 15 * 23?'
}];
const stream = ai.stream({
messages,
tools: [
{
name: 'weather',
description: 'Get weather for a location',
parameters: {
type: 'object',
properties: {
location: { type: 'string' }
},
required: ['location']
}
},
{
name: 'calculator',
description: 'Calculate math expressions',
parameters: {
type: 'object',
properties: {
expression: { type: 'string' }
},
required: ['expression']
}
}
]
});
const toolCalls: any[] = [];
for await (const event of stream) {
if (event.type === 'toolCall') {
toolCalls.push(event.toolCall);
// Execute tool
const result = await executeToolCall(event.toolCall);
// Add tool result to conversation
messages.push({
role: 'assistant',
toolCalls: [event.toolCall]
});
messages.push({
role: 'tool',
content: JSON.stringify(result),
toolCallId: event.toolCall.id
});
}
}
// Continue conversation with tool results
if (toolCalls.length > 0) {
const finalStream = ai.stream({ messages });
for await (const event of finalStream) {
if (event.type === 'text') {
process.stdout.write(event.delta);
}
}
}
}
Thinking/Reasoning
async function withThinking() {
// OpenAI o1
const openai = new AI({
provider: 'openai',
model: 'o1-preview'
});
// Anthropic Claude
const anthropic = new AI({
provider: 'anthropic',
model: 'claude-3-opus-20240229'
});
// Gemini thinking model
const gemini = new AI({
provider: 'gemini',
model: 'gemini-2.0-flash-thinking-exp-1219'
});
for (const ai of [openai, anthropic, gemini]) {
const stream = ai.stream({
messages: [{
role: 'user',
content: 'Solve this step by step: If a tree falls in a forest...'
}],
reasoning: {
enabled: true,
effort: 'high', // OpenAI reasoning_effort
maxTokens: 2000 // Anthropic budget
}
});
for await (const event of stream) {
if (event.type === 'thinking') {
console.log('[THINKING]', event.delta);
} else if (event.type === 'text') {
console.log('[RESPONSE]', event.delta);
} else if (event.type === 'done') {
// Final message includes model and usage with cost
console.log('Model:', event.message.model);
console.log('Tokens:', event.message.usage?.total);
console.log('Cost: $', event.message.usage?.cost?.total);
}
}
}
}
Implementation Notes
Critical Decisions
- Streaming First: All providers support streaming, non-streaming is just collected events
- Unified Events: Same event types across all providers for consistent handling
- Separate System Prompt: Required for Anthropic/Gemini compatibility
- Tool Role: Unified way to handle tool responses across providers
- Content Arrays: Support both string and structured content
- Thinking Extraction: Normalize reasoning across different provider formats
Provider-Specific Handling
OpenAI:
- Choose between Chat Completions and Responses API based on model and thinking needs
- Map
developerrole for o1/o3 models - Handle streaming tool call deltas
Anthropic:
- Convert to content blocks (always arrays)
- Tool results as user messages with tool_result blocks
- Handle MessageStream events or raw streaming
Gemini:
- Convert to parts system
- Extract thinking from
part.thoughtflag - Map
assistanttomodelrole - Handle function calls/responses in parts
Error Handling
class AIError extends Error {
constructor(
message: string,
public code: string,
public provider: string,
public retryable: boolean,
public statusCode?: number
) {
super(message);
}
}
// In adapters
try {
// API call
} catch (error) {
if (error instanceof RateLimitError) {
throw new AIError(
'Rate limit exceeded',
'rate_limit',
this.provider,
true,
429
);
}
// Map other errors...
}
Model Information & Cost Tracking
Models Database
We cache the models.dev API data at build time for fast, offline access:
// scripts/update-models.ts - Run during build or manually
async function updateModels() {
const response = await fetch('https://models.dev/api.json');
const data = await response.json();
// Transform to our format
const models: ModelsDatabase = transformModelsData(data);
// Generate TypeScript file
const content = `// Auto-generated from models.dev API
// Last updated: ${new Date().toISOString()}
// Run 'npm run update-models' to refresh
export const MODELS_DATABASE: ModelsDatabase = ${JSON.stringify(models, null, 2)};
`;
await fs.writeFile('src/models-data.ts', content);
}
// src/models.ts - Runtime model lookup
import { MODELS_DATABASE } from './models-data.js';
// Simple lookup with fallback
export function getModelInfo(provider: string, model: string): ModelInfo {
const info = MODELS_DATABASE.providers[provider]?.models[model];
if (!info) {
// Fallback for unknown models
return {
id: model,
name: model,
provider,
capabilities: {
reasoning: false,
toolCall: true,
vision: false
},
cost: { input: 0, output: 0 },
limits: { context: 128000, output: 4096 }
};
}
return info;
}
// Optional: Runtime override for testing new models
const runtimeOverrides = new Map<string, ModelInfo>();
export function registerModel(provider: string, model: string, info: ModelInfo) {
runtimeOverrides.set(`${provider}:${model}`, info);
}
Cost Calculation
class CostTracker {
private usage: TokenUsage = {
input: 0,
output: 0,
total: 0,
cacheRead: 0,
cacheWrite: 0
};
private modelInfo: ModelInfo;
constructor(modelInfo: ModelInfo) {
this.modelInfo = modelInfo;
}
addUsage(tokens: Partial<TokenUsage>): TokenUsage {
this.usage.input += tokens.input || 0;
this.usage.output += tokens.output || 0;
this.usage.thinking += tokens.thinking || 0;
this.usage.cacheRead += tokens.cacheRead || 0;
this.usage.cacheWrite += tokens.cacheWrite || 0;
this.usage.total = this.usage.input + this.usage.output + (this.usage.thinking || 0);
// Calculate costs (per million tokens)
const cost = this.modelInfo.cost;
this.usage.cost = {
input: (this.usage.input / 1_000_000) * cost.input,
output: (this.usage.output / 1_000_000) * cost.output,
cache:
((this.usage.cacheRead || 0) / 1_000_000) * (cost.cacheRead || 0) +
((this.usage.cacheWrite || 0) / 1_000_000) * (cost.cacheWrite || 0),
total: 0
};
this.usage.cost.total =
this.usage.cost.input +
this.usage.cost.output +
this.usage.cost.cache;
return { ...this.usage };
}
getTotalCost(): number {
return this.usage.cost?.total || 0;
}
getUsageSummary(): string {
return `Tokens: ${this.usage.total} (${this.usage.input}→${this.usage.output}) | Cost: $${this.getTotalCost().toFixed(4)}`;
}
}
Integration in Adapters
class OpenAIAdapter {
private costTracker: CostTracker;
constructor(config: AIConfig) {
const modelInfo = getModelInfo('openai', config.model);
this.costTracker = new CostTracker(modelInfo);
}
async *stream(request: Request): AsyncGenerator<Event> {
// ... streaming logic ...
if (chunk.usage) {
const usage = this.costTracker.addUsage({
input: chunk.usage.prompt_tokens,
output: chunk.usage.completion_tokens,
thinking: chunk.usage.completion_tokens_details?.reasoning_tokens,
cacheRead: chunk.usage.prompt_tokens_details?.cached_tokens
});
yield { type: 'usage', usage };
}
}
}
Next Steps
- Create models.ts with models.dev integration
- Implement base
AIclass with adapter pattern - Create three provider adapters with full streaming support
- Add comprehensive error mapping
- Implement token counting and cost tracking
- Add test suite for each provider
- Create migration guide from native SDKs