diff --git a/resources/agent-schemas/artifacts/json-schema/amp.json b/resources/agent-schemas/artifacts/json-schema/amp.json index 78f0e84..97c5b16 100644 --- a/resources/agent-schemas/artifacts/json-schema/amp.json +++ b/resources/agent-schemas/artifacts/json-schema/amp.json @@ -9,6 +9,10 @@ "type": { "type": "string", "enum": [ + "system", + "user", + "assistant", + "result", "message", "tool_call", "tool_result", @@ -27,6 +31,45 @@ }, "error": { "type": "string" + }, + "subtype": { + "type": "string" + }, + "cwd": { + "type": "string" + }, + "session_id": { + "type": "string" + }, + "tools": { + "type": "array", + "items": { + "type": "string" + } + }, + "mcp_servers": { + "type": "array", + "items": { + "type": "object" + } + }, + "message": { + "type": "object" + }, + "parent_tool_use_id": { + "type": "string" + }, + "duration_ms": { + "type": "number" + }, + "is_error": { + "type": "boolean" + }, + "num_turns": { + "type": "number" + }, + "result": { + "type": "string" } }, "required": [ diff --git a/resources/agent-schemas/src/amp.ts b/resources/agent-schemas/src/amp.ts index cee5e78..f70fc59 100644 --- a/resources/agent-schemas/src/amp.ts +++ b/resources/agent-schemas/src/amp.ts @@ -204,12 +204,27 @@ function createFallbackSchema(): NormalizedSchema { properties: { type: { type: "string", - enum: ["message", "tool_call", "tool_result", "error", "done"], + enum: ["system", "user", "assistant", "result", "message", "tool_call", "tool_result", "error", "done"], }, + // Common fields id: { type: "string" }, content: { type: "string" }, tool_call: { $ref: "#/definitions/ToolCall" }, error: { type: "string" }, + // System message fields + subtype: { type: "string" }, + cwd: { type: "string" }, + session_id: { type: "string" }, + tools: { type: "array", items: { type: "string" } }, + mcp_servers: { type: "array", items: { type: "object" } }, + // User/Assistant message fields + message: { type: "object" }, + parent_tool_use_id: { type: "string" }, + // Result fields + duration_ms: { type: "number" }, + is_error: { type: "boolean" }, + num_turns: { type: "number" }, + result: { type: "string" }, }, required: ["type"], }, diff --git a/server/packages/agent-management/src/agents.rs b/server/packages/agent-management/src/agents.rs index ffdd09e..1260604 100644 --- a/server/packages/agent-management/src/agents.rs +++ b/server/packages/agent-management/src/agents.rs @@ -1058,26 +1058,21 @@ fn spawn_amp( let mut args: Vec<&str> = Vec::new(); if flags.execute { args.push("--execute"); - } else if flags.print { - args.push("--print"); + args.push(&options.prompt); } if flags.output_format { - args.push("--output-format"); - args.push("stream-json"); + args.push("--stream-json"); } if flags.dangerously_skip_permissions && options.permission_mode.as_deref() == Some("bypass") { - args.push("--dangerously-skip-permissions"); + args.push("--dangerously-allow-all"); } let mut command = Command::new(path); command.current_dir(working_dir); - if let Some(model) = options.model.as_deref() { - command.arg("--model").arg(model); - } if let Some(session_id) = options.session_id.as_deref() { command.arg("--continue").arg(session_id); } - command.args(&args).arg(&options.prompt); + command.args(&args); for (key, value) in &options.env { command.env(key, value); } @@ -1101,24 +1096,19 @@ fn build_amp_command(path: &Path, working_dir: &Path, options: &SpawnOptions) -> let flags = detect_amp_flags(path, working_dir).unwrap_or_default(); let mut command = Command::new(path); command.current_dir(working_dir); - if let Some(model) = options.model.as_deref() { - command.arg("--model").arg(model); - } if let Some(session_id) = options.session_id.as_deref() { command.arg("--continue").arg(session_id); } if flags.execute { command.arg("--execute"); - } else if flags.print { - command.arg("--print"); + command.arg(&options.prompt); } if flags.output_format { - command.arg("--output-format").arg("stream-json"); + command.arg("--stream-json"); } if flags.dangerously_skip_permissions && options.permission_mode.as_deref() == Some("bypass") { - command.arg("--dangerously-skip-permissions"); + command.arg("--dangerously-allow-all"); } - command.arg(&options.prompt); for (key, value) in &options.env { command.env(key, value); } @@ -1128,7 +1118,6 @@ fn build_amp_command(path: &Path, working_dir: &Path, options: &SpawnOptions) -> #[derive(Debug, Default, Clone, Copy)] struct AmpFlags { execute: bool, - print: bool, output_format: bool, dangerously_skip_permissions: bool, } @@ -1146,9 +1135,8 @@ fn detect_amp_flags(path: &Path, working_dir: &Path) -> Option { ); Some(AmpFlags { execute: text.contains("--execute"), - print: text.contains("--print"), - output_format: text.contains("--output-format"), - dangerously_skip_permissions: text.contains("--dangerously-skip-permissions"), + output_format: text.contains("--stream-json"), + dangerously_skip_permissions: text.contains("--dangerously-allow-all"), }) } @@ -1157,23 +1145,19 @@ fn spawn_amp_fallback( working_dir: &Path, options: &SpawnOptions, ) -> Result { - let mut attempts = vec![ + let mut attempts: Vec> = vec![ vec!["--execute"], - vec!["--print", "--output-format", "stream-json"], - vec!["--output-format", "stream-json"], - vec!["--dangerously-skip-permissions"], + vec!["stream-json"], + vec!["--dangerously-allow-all"], vec![], ]; if options.permission_mode.as_deref() != Some("bypass") { - attempts.retain(|args| !args.contains(&"--dangerously-skip-permissions")); + attempts.retain(|args| !args.contains(&"--dangerously-allow-all")); } for args in attempts { let mut command = Command::new(path); command.current_dir(working_dir); - if let Some(model) = options.model.as_deref() { - command.arg("--model").arg(model); - } if let Some(session_id) = options.session_id.as_deref() { command.arg("--continue").arg(session_id); } @@ -1192,9 +1176,6 @@ fn spawn_amp_fallback( let mut command = Command::new(path); command.current_dir(working_dir); - if let Some(model) = options.model.as_deref() { - command.arg("--model").arg(model); - } if let Some(session_id) = options.session_id.as_deref() { command.arg("--continue").arg(session_id); } diff --git a/server/packages/extracted-agent-schemas/tests/schema_roundtrip.rs b/server/packages/extracted-agent-schemas/tests/schema_roundtrip.rs index db7003b..8b9148a 100644 --- a/server/packages/extracted-agent-schemas/tests/schema_roundtrip.rs +++ b/server/packages/extracted-agent-schemas/tests/schema_roundtrip.rs @@ -73,3 +73,32 @@ fn test_amp_message() { assert!(json.contains("user")); assert!(json.contains("Hello")); } + +#[test] +fn test_amp_stream_json_message_types() { + // Test that all new message types can be parsed + let system_msg = r#"{"type":"system","subtype":"init","cwd":"/tmp","session_id":"sess-1","tools":["Bash"],"mcp_servers":[]}"#; + let parsed: amp::StreamJsonMessage = serde_json::from_str(system_msg).unwrap(); + assert!(matches!(parsed.type_, amp::StreamJsonMessageType::System)); + + let user_msg = r#"{"type":"user","message":{"role":"user","content":"Hello"},"session_id":"sess-1"}"#; + let parsed: amp::StreamJsonMessage = serde_json::from_str(user_msg).unwrap(); + assert!(matches!(parsed.type_, amp::StreamJsonMessageType::User)); + + let assistant_msg = r#"{"type":"assistant","message":{"role":"assistant","content":"Hi there"},"session_id":"sess-1"}"#; + let parsed: amp::StreamJsonMessage = serde_json::from_str(assistant_msg).unwrap(); + assert!(matches!(parsed.type_, amp::StreamJsonMessageType::Assistant)); + + let result_msg = r#"{"type":"result","subtype":"success","duration_ms":1000,"is_error":false,"num_turns":1,"result":"Done","session_id":"sess-1"}"#; + let parsed: amp::StreamJsonMessage = serde_json::from_str(result_msg).unwrap(); + assert!(matches!(parsed.type_, amp::StreamJsonMessageType::Result)); + + // Test legacy types still work + let message_msg = r#"{"type":"message","id":"msg-1","content":"Hello"}"#; + let parsed: amp::StreamJsonMessage = serde_json::from_str(message_msg).unwrap(); + assert!(matches!(parsed.type_, amp::StreamJsonMessageType::Message)); + + let done_msg = r#"{"type":"done"}"#; + let parsed: amp::StreamJsonMessage = serde_json::from_str(done_msg).unwrap(); + assert!(matches!(parsed.type_, amp::StreamJsonMessageType::Done)); +} diff --git a/server/packages/universal-agent-schema/src/agents/amp.rs b/server/packages/universal-agent-schema/src/agents/amp.rs index d811d31..a305796 100644 --- a/server/packages/universal-agent-schema/src/agents/amp.rs +++ b/server/packages/universal-agent-schema/src/agents/amp.rs @@ -21,6 +21,72 @@ pub fn event_to_universal( ) -> Result, String> { let mut events = Vec::new(); match event.type_ { + // System init message - contains metadata like cwd, tools, session_id + // We skip this as it's not a user-facing event + schema::StreamJsonMessageType::System => {} + // User message - extract content from the nested message field + schema::StreamJsonMessageType::User => { + if !event.message.is_empty() { + let text = event + .message + .get("content") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let item = UniversalItem { + item_id: next_temp_id("tmp_amp_user"), + native_item_id: event.session_id.clone(), + parent_id: None, + kind: ItemKind::Message, + role: Some(ItemRole::User), + content: vec![ContentPart::Text { text: text.clone() }], + status: ItemStatus::Completed, + }; + events.extend(message_events(item, text)); + } + } + // Assistant message - extract content from the nested message field + schema::StreamJsonMessageType::Assistant => { + if !event.message.is_empty() { + let text = event + .message + .get("content") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + let item = UniversalItem { + item_id: next_temp_id("tmp_amp_assistant"), + native_item_id: event.session_id.clone(), + parent_id: None, + kind: ItemKind::Message, + role: Some(ItemRole::Assistant), + content: vec![ContentPart::Text { text: text.clone() }], + status: ItemStatus::Completed, + }; + events.extend(message_events(item, text)); + } + } + // Result message - signals completion + schema::StreamJsonMessageType::Result => { + events.push(turn_ended_event(None, None).synthetic()); + events.push( + EventConversion::new( + UniversalEventType::SessionEnded, + UniversalEventData::SessionEnded(SessionEndedData { + reason: if event.is_error.unwrap_or(false) { + SessionEndReason::Error + } else { + SessionEndReason::Completed + }, + terminated_by: TerminatedBy::Agent, + message: event.result.clone(), + exit_code: None, + stderr: None, + }), + ) + .with_raw(serde_json::to_value(event).ok()), + ); + } schema::StreamJsonMessageType::Message => { let text = event.content.clone().unwrap_or_default(); let item = UniversalItem {