diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 5507dca..b4dbf7e 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -25,6 +25,7 @@ dependencies = [ "runtime", "serde", "serde_json", + "telemetry", "tokio", ] @@ -1428,6 +1429,14 @@ dependencies = [ "yaml-rust", ] +[[package]] +name = "telemetry" +version = "0.1.0" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "thiserror" version = "2.0.18" diff --git a/rust/crates/api/Cargo.toml b/rust/crates/api/Cargo.toml index c5e152e..d3c4115 100644 --- a/rust/crates/api/Cargo.toml +++ b/rust/crates/api/Cargo.toml @@ -10,6 +10,7 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus runtime = { path = "../runtime" } serde = { version = "1", features = ["derive"] } serde_json = "1" +telemetry = { path = "../telemetry" } tokio = { version = "1", features = ["io-util", "macros", "net", "rt-multi-thread", "time"] } [lints] diff --git a/rust/crates/api/src/client.rs b/rust/crates/api/src/client.rs index 7ef7e83..d7dacbe 100644 --- a/rust/crates/api/src/client.rs +++ b/rust/crates/api/src/client.rs @@ -6,13 +6,15 @@ use runtime::{ OAuthTokenExchangeRequest, }; use serde::Deserialize; +use serde_json::{Map, Value}; +use telemetry::{AnthropicRequestProfile, ClientIdentity, SessionTracer}; use crate::error::ApiError; use crate::sse::SseParser; use crate::types::{MessageRequest, MessageResponse, StreamEvent}; const DEFAULT_BASE_URL: &str = "https://api.anthropic.com"; -const ANTHROPIC_VERSION: &str = "2023-06-01"; +const MESSAGES_PATH: &str = "/v1/messages"; const REQUEST_ID_HEADER: &str = "request-id"; const ALT_REQUEST_ID_HEADER: &str = "x-request-id"; const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_millis(200); @@ -108,6 +110,8 @@ pub struct AnthropicClient { max_retries: u32, initial_backoff: Duration, max_backoff: Duration, + request_profile: AnthropicRequestProfile, + session_tracer: Option, } impl AnthropicClient { @@ -120,6 +124,8 @@ impl AnthropicClient { max_retries: DEFAULT_MAX_RETRIES, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, + request_profile: AnthropicRequestProfile::default(), + session_tracer: None, } } @@ -132,6 +138,8 @@ impl AnthropicClient { max_retries: DEFAULT_MAX_RETRIES, initial_backoff: DEFAULT_INITIAL_BACKOFF, max_backoff: DEFAULT_MAX_BACKOFF, + request_profile: AnthropicRequestProfile::default(), + session_tracer: None, } } @@ -176,6 +184,39 @@ impl AnthropicClient { self } + #[must_use] + pub fn with_request_profile(mut self, request_profile: AnthropicRequestProfile) -> Self { + self.request_profile = request_profile; + self + } + + #[must_use] + pub fn with_client_identity(mut self, client_identity: ClientIdentity) -> Self { + self.request_profile.client_identity = client_identity; + self + } + + #[must_use] + pub fn with_beta(mut self, beta: impl Into) -> Self { + let beta = beta.into(); + if !self.request_profile.betas.contains(&beta) { + self.request_profile.betas.push(beta); + } + self + } + + #[must_use] + pub fn with_extra_body_param(mut self, key: impl Into, value: Value) -> Self { + self.request_profile.extra_body.insert(key.into(), value); + self + } + + #[must_use] + pub fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self { + self.session_tracer = Some(session_tracer); + self + } + #[must_use] pub fn with_retry_policy( mut self, @@ -279,18 +320,30 @@ impl AnthropicClient { loop { attempts += 1; + self.record_request_started(request, attempts); match self.send_raw_request(request).await { Ok(response) => match expect_success(response).await { - Ok(response) => return Ok(response), + Ok(response) => { + self.record_request_succeeded(request, attempts, &response); + return Ok(response); + } Err(error) if error.is_retryable() && attempts <= self.max_retries + 1 => { + self.record_request_failed(request, attempts, &error); last_error = Some(error); } - Err(error) => return Err(error), + Err(error) => { + self.record_request_failed(request, attempts, &error); + return Err(error); + } }, Err(error) if error.is_retryable() && attempts <= self.max_retries + 1 => { + self.record_request_failed(request, attempts, &error); last_error = Some(error); } - Err(error) => return Err(error), + Err(error) => { + self.record_request_failed(request, attempts, &error); + return Err(error); + } } if attempts > self.max_retries { @@ -310,18 +363,131 @@ impl AnthropicClient { &self, request: &MessageRequest, ) -> Result { - let request_url = format!("{}/v1/messages", self.base_url.trim_end_matches('/')); - let request_builder = self + let request_url = format!("{}{}", self.base_url.trim_end_matches('/'), MESSAGES_PATH); + let mut request_builder = self .http .post(&request_url) - .header("anthropic-version", ANTHROPIC_VERSION) .header("content-type", "application/json"); + for (name, value) in self.request_profile.header_pairs() { + request_builder = request_builder.header(name, value); + } let mut request_builder = self.auth.apply(request_builder); - request_builder = request_builder.json(request); + let request_body = self.request_profile.render_json_body(request)?; + request_builder = request_builder.json(&request_body); request_builder.send().await.map_err(ApiError::from) } + fn record_request_started(&self, request: &MessageRequest, attempt: u32) { + if let Some(tracer) = &self.session_tracer { + tracer.record_http_request_started( + attempt, + "POST", + MESSAGES_PATH, + self.request_attributes(request), + ); + } + } + + fn record_request_succeeded( + &self, + request: &MessageRequest, + attempt: u32, + response: &reqwest::Response, + ) { + if let Some(tracer) = &self.session_tracer { + tracer.record_http_request_succeeded( + attempt, + "POST", + MESSAGES_PATH, + response.status().as_u16(), + request_id_from_headers(response.headers()), + self.request_attributes(request), + ); + } + } + + fn record_request_failed(&self, request: &MessageRequest, attempt: u32, error: &ApiError) { + if let Some(tracer) = &self.session_tracer { + tracer.record_http_request_failed( + attempt, + "POST", + MESSAGES_PATH, + error.to_string(), + error.is_retryable(), + self.error_attributes(request, error), + ); + } + } + + fn request_attributes(&self, request: &MessageRequest) -> Map { + let mut attributes = Map::new(); + attributes.insert("model".to_string(), Value::String(request.model.clone())); + attributes.insert("stream".to_string(), Value::Bool(request.stream)); + attributes.insert("max_tokens".to_string(), Value::from(request.max_tokens)); + attributes.insert( + "message_count".to_string(), + Value::from(u64::try_from(request.messages.len()).unwrap_or(u64::MAX)), + ); + attributes.insert( + "tool_count".to_string(), + Value::from( + u64::try_from(request.tools.as_ref().map_or(0, Vec::len)).unwrap_or(u64::MAX), + ), + ); + attributes.insert( + "beta_count".to_string(), + Value::from(u64::try_from(self.request_profile.betas.len()).unwrap_or(u64::MAX)), + ); + if !self.request_profile.extra_body.is_empty() { + attributes.insert( + "extra_body_keys".to_string(), + Value::Array( + self.request_profile + .extra_body + .keys() + .cloned() + .map(Value::String) + .collect(), + ), + ); + } + attributes + } + + fn error_attributes(&self, request: &MessageRequest, error: &ApiError) -> Map { + let mut attributes = self.request_attributes(request); + match error { + ApiError::Api { + status, + error_type, + message, + .. + } => { + attributes.insert("status".to_string(), Value::from(status.as_u16())); + if let Some(error_type) = error_type { + attributes.insert("error_type".to_string(), Value::String(error_type.clone())); + } + if let Some(message) = message { + attributes.insert("api_message".to_string(), Value::String(message.clone())); + } + } + ApiError::Http(_) => { + attributes.insert("error_type".to_string(), Value::String("http".to_string())); + } + ApiError::Json(_) => { + attributes.insert("error_type".to_string(), Value::String("json".to_string())); + } + _ => { + attributes.insert( + "error_type".to_string(), + Value::String("client".to_string()), + ); + } + } + attributes + } + fn backoff_for_attempt(&self, attempt: u32) -> Result { let Some(multiplier) = 1_u32.checked_shl(attempt.saturating_sub(1)) else { return Err(ApiError::BackoffOverflow { diff --git a/rust/crates/api/src/lib.rs b/rust/crates/api/src/lib.rs index 4108187..9c3b688 100644 --- a/rust/crates/api/src/lib.rs +++ b/rust/crates/api/src/lib.rs @@ -15,3 +15,9 @@ pub use types::{ MessageResponse, MessageStartEvent, MessageStopEvent, OutputContentBlock, StreamEvent, ToolChoice, ToolDefinition, ToolResultContentBlock, Usage, }; + +pub use telemetry::{ + AnalyticsEvent, AnthropicRequestProfile, ClientIdentity, JsonlTelemetrySink, + MemoryTelemetrySink, SessionTraceRecord, SessionTracer, TelemetryEvent, TelemetrySink, + DEFAULT_ANTHROPIC_VERSION, +}; diff --git a/rust/crates/api/tests/client_integration.rs b/rust/crates/api/tests/client_integration.rs index c37fa99..d95dba8 100644 --- a/rust/crates/api/tests/client_integration.rs +++ b/rust/crates/api/tests/client_integration.rs @@ -8,6 +8,7 @@ use api::{ StreamEvent, ToolChoice, ToolDefinition, }; use serde_json::json; +use telemetry::{ClientIdentity, MemoryTelemetrySink, SessionTracer, TelemetryEvent}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener; use tokio::sync::Mutex; @@ -64,6 +65,14 @@ async fn send_message_posts_json_and_parses_response() { request.headers.get("authorization").map(String::as_str), Some("Bearer proxy-token") ); + assert_eq!( + request.headers.get("anthropic-version").map(String::as_str), + Some("2023-06-01") + ); + assert_eq!( + request.headers.get("user-agent").map(String::as_str), + Some("clawd-code/0.1.0 (rust)") + ); let body: serde_json::Value = serde_json::from_str(&request.body).expect("request body should be json"); assert_eq!( @@ -75,6 +84,90 @@ async fn send_message_posts_json_and_parses_response() { assert_eq!(body["tool_choice"]["type"], json!("auto")); } +#[tokio::test] +async fn send_message_applies_request_profile_and_records_telemetry() { + let state = Arc::new(Mutex::new(Vec::::new())); + let server = spawn_server( + state.clone(), + vec![http_response_with_headers( + "200 OK", + "application/json", + concat!( + "{", + "\"id\":\"msg_profile\",", + "\"type\":\"message\",", + "\"role\":\"assistant\",", + "\"content\":[{\"type\":\"text\",\"text\":\"ok\"}],", + "\"model\":\"claude-3-7-sonnet-latest\",", + "\"stop_reason\":\"end_turn\",", + "\"stop_sequence\":null,", + "\"usage\":{\"input_tokens\":1,\"output_tokens\":1}", + "}" + ), + &[("request-id", "req_profile_123")], + )], + ) + .await; + let sink = Arc::new(MemoryTelemetrySink::default()); + + let client = AnthropicClient::new("test-key") + .with_base_url(server.base_url()) + .with_client_identity(ClientIdentity::new("clawd-code", "9.9.9").with_runtime("rust-cli")) + .with_beta("tools-2026-04-01") + .with_extra_body_param("metadata", json!({"source": "clawd-code"})) + .with_session_tracer(SessionTracer::new("session-telemetry", sink.clone())); + + let response = client + .send_message(&sample_request(false)) + .await + .expect("request should succeed"); + + assert_eq!(response.request_id.as_deref(), Some("req_profile_123")); + + let captured = state.lock().await; + let request = captured.first().expect("server should capture request"); + assert_eq!( + request.headers.get("anthropic-beta").map(String::as_str), + Some("tools-2026-04-01") + ); + assert_eq!( + request.headers.get("user-agent").map(String::as_str), + Some("clawd-code/9.9.9 (rust-cli)") + ); + let body: serde_json::Value = + serde_json::from_str(&request.body).expect("request body should be json"); + assert_eq!(body["metadata"]["source"], json!("clawd-code")); + + let events = sink.events(); + assert_eq!(events.len(), 4); + assert!(matches!( + &events[0], + TelemetryEvent::HttpRequestStarted { + session_id, + attempt: 1, + method, + path, + .. + } if session_id == "session-telemetry" && method == "POST" && path == "/v1/messages" + )); + assert!(matches!( + &events[1], + TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_started" + )); + assert!(matches!( + &events[2], + TelemetryEvent::HttpRequestSucceeded { + request_id, + status: 200, + .. + } if request_id.as_deref() == Some("req_profile_123") + )); + assert!(matches!( + &events[3], + TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_succeeded" + )); +} + #[tokio::test] async fn stream_message_parses_sse_events_with_tool_use() { let state = Arc::new(Mutex::new(Vec::::new())); diff --git a/rust/crates/runtime/Cargo.toml b/rust/crates/runtime/Cargo.toml index 7ce7cd8..cf46f2d 100644 --- a/rust/crates/runtime/Cargo.toml +++ b/rust/crates/runtime/Cargo.toml @@ -11,6 +11,7 @@ glob = "0.3" regex = "1" serde = { version = "1", features = ["derive"] } serde_json = "1" +telemetry = { path = "../telemetry" } tokio = { version = "1", features = ["io-util", "macros", "process", "rt", "rt-multi-thread", "time"] } walkdir = "2" diff --git a/rust/crates/runtime/src/conversation.rs b/rust/crates/runtime/src/conversation.rs index 4ffbabc..ccaf064 100644 --- a/rust/crates/runtime/src/conversation.rs +++ b/rust/crates/runtime/src/conversation.rs @@ -1,6 +1,9 @@ use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; +use serde_json::{Map, Value}; +use telemetry::SessionTracer; + use crate::compact::{ compact_session, estimate_session_tokens, CompactionConfig, CompactionResult, }; @@ -97,6 +100,7 @@ pub struct ConversationRuntime { max_iterations: usize, usage_tracker: UsageTracker, hook_runner: HookRunner, + session_tracer: Option, } impl ConversationRuntime @@ -118,7 +122,7 @@ where tool_executor, permission_policy, system_prompt, - RuntimeFeatureConfig::default(), + &RuntimeFeatureConfig::default(), ) } @@ -129,7 +133,7 @@ where tool_executor: T, permission_policy: PermissionPolicy, system_prompt: Vec, - feature_config: RuntimeFeatureConfig, + feature_config: &RuntimeFeatureConfig, ) -> Self { let usage_tracker = UsageTracker::from_session(&session); Self { @@ -140,7 +144,8 @@ where system_prompt, max_iterations: usize::MAX, usage_tracker, - hook_runner: HookRunner::from_feature_config(&feature_config), + hook_runner: HookRunner::from_feature_config(feature_config), + session_tracer: None, } } @@ -150,14 +155,22 @@ where self } + #[must_use] + pub fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self { + self.session_tracer = Some(session_tracer); + self + } + pub fn run_turn( &mut self, user_input: impl Into, mut prompter: Option<&mut dyn PermissionPrompter>, ) -> Result { + let user_input = user_input.into(); + self.record_turn_started(&user_input); self.session .messages - .push(ConversationMessage::user_text(user_input.into())); + .push(ConversationMessage::user_text(user_input)); let mut assistant_messages = Vec::new(); let mut tool_results = Vec::new(); @@ -166,16 +179,24 @@ where loop { iterations += 1; if iterations > self.max_iterations { - return Err(RuntimeError::new( + let error = RuntimeError::new( "conversation loop exceeded the maximum number of iterations", - )); + ); + self.record_turn_failed(iterations, &error); + return Err(error); } let request = ApiRequest { system_prompt: self.system_prompt.clone(), messages: self.session.messages.clone(), }; - let events = self.api_client.stream(request)?; + let events = match self.api_client.stream(request) { + Ok(events) => events, + Err(error) => { + self.record_turn_failed(iterations, &error); + return Err(error); + } + }; let (assistant_message, usage) = build_assistant_message(events)?; if let Some(usage) = usage { self.usage_tracker.record(usage); @@ -190,6 +211,7 @@ where _ => None, }) .collect::>(); + self.record_assistant_iteration(iterations, &assistant_message, pending_tool_uses.len()); self.session.messages.push(assistant_message.clone()); assistant_messages.push(assistant_message); @@ -199,6 +221,7 @@ where } for (tool_use_id, tool_name, input) in pending_tool_uses { + self.record_tool_started(iterations, &tool_name); let permission_outcome = if let Some(prompt) = prompter.as_mut() { self.permission_policy .authorize(&tool_name, &input, Some(*prompt)) @@ -249,17 +272,20 @@ where ConversationMessage::tool_result(tool_use_id, tool_name, reason, true) } }; + self.record_tool_finished(iterations, &result_message); self.session.messages.push(result_message.clone()); tool_results.push(result_message); } } - Ok(TurnSummary { + let summary = TurnSummary { assistant_messages, tool_results, iterations, usage: self.usage_tracker.cumulative_usage(), - }) + }; + self.record_turn_completed(&summary); + Ok(summary) } #[must_use] @@ -286,6 +312,125 @@ where pub fn into_session(self) -> Session { self.session } + + fn record_turn_started(&self, user_input: &str) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "message_count_before".to_string(), + Value::from(u64::try_from(self.session.messages.len()).unwrap_or(u64::MAX)), + ); + attributes.insert( + "input_chars".to_string(), + Value::from(u64::try_from(user_input.chars().count()).unwrap_or(u64::MAX)), + ); + tracer.record("turn_started", attributes); + } + } + + fn record_assistant_iteration( + &self, + iteration: usize, + assistant_message: &ConversationMessage, + pending_tool_count: usize, + ) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "iteration".to_string(), + Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)), + ); + attributes.insert( + "block_count".to_string(), + Value::from(u64::try_from(assistant_message.blocks.len()).unwrap_or(u64::MAX)), + ); + attributes.insert( + "pending_tool_count".to_string(), + Value::from(u64::try_from(pending_tool_count).unwrap_or(u64::MAX)), + ); + tracer.record("assistant_iteration_completed", attributes); + } + } + + fn record_tool_started(&self, iteration: usize, tool_name: &str) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "iteration".to_string(), + Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)), + ); + attributes.insert("tool_name".to_string(), Value::String(tool_name.to_string())); + tracer.record("tool_execution_started", attributes); + } + } + + fn record_tool_finished(&self, iteration: usize, result_message: &ConversationMessage) { + let Some(tracer) = &self.session_tracer else { + return; + }; + let Some(ContentBlock::ToolResult { + tool_name, + is_error, + output, + .. + }) = result_message.blocks.first() + else { + return; + }; + let mut attributes = Map::new(); + attributes.insert( + "iteration".to_string(), + Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)), + ); + attributes.insert("tool_name".to_string(), Value::String(tool_name.clone())); + attributes.insert("is_error".to_string(), Value::Bool(*is_error)); + attributes.insert( + "output_chars".to_string(), + Value::from(u64::try_from(output.chars().count()).unwrap_or(u64::MAX)), + ); + tracer.record("tool_execution_finished", attributes); + } + + fn record_turn_completed(&self, summary: &TurnSummary) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "assistant_message_count".to_string(), + Value::from( + u64::try_from(summary.assistant_messages.len()).unwrap_or(u64::MAX), + ), + ); + attributes.insert( + "tool_result_count".to_string(), + Value::from(u64::try_from(summary.tool_results.len()).unwrap_or(u64::MAX)), + ); + attributes.insert( + "iterations".to_string(), + Value::from(u64::try_from(summary.iterations).unwrap_or(u64::MAX)), + ); + attributes.insert( + "total_input_tokens".to_string(), + Value::from(summary.usage.input_tokens), + ); + attributes.insert( + "total_output_tokens".to_string(), + Value::from(summary.usage.output_tokens), + ); + tracer.record("turn_completed", attributes); + } + } + + fn record_turn_failed(&self, iteration: usize, error: &RuntimeError) { + if let Some(tracer) = &self.session_tracer { + let mut attributes = Map::new(); + attributes.insert( + "iteration".to_string(), + Value::from(u64::try_from(iteration).unwrap_or(u64::MAX)), + ); + attributes.insert("error".to_string(), Value::String(error.to_string())); + tracer.record("turn_failed", attributes); + } + } } fn build_assistant_message( @@ -609,7 +754,7 @@ mod tests { }), PermissionPolicy::new(PermissionMode::DangerFullAccess), vec!["system".to_string()], - RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( + &RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( vec![shell_snippet("printf 'blocked by hook'; exit 2")], Vec::new(), )), @@ -675,7 +820,7 @@ mod tests { StaticToolExecutor::new().register("add", |_input| Ok("4".to_string())), PermissionPolicy::new(PermissionMode::DangerFullAccess), vec!["system".to_string()], - RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( + &RuntimeFeatureConfig::default().with_hooks(RuntimeHookConfig::new( vec![shell_snippet("printf 'pre hook ran'")], vec![shell_snippet("printf 'post hook ran'")], )), @@ -697,7 +842,7 @@ mod tests { "post hook should preserve non-error result: {output:?}" ); assert!( - output.contains("4"), + output.contains('4'), "tool output missing value: {output:?}" ); assert!( diff --git a/rust/crates/runtime/src/hooks.rs b/rust/crates/runtime/src/hooks.rs index 36756a0..f218768 100644 --- a/rust/crates/runtime/src/hooks.rs +++ b/rust/crates/runtime/src/hooks.rs @@ -64,7 +64,7 @@ impl HookRunner { #[must_use] pub fn run_pre_tool_use(&self, tool_name: &str, tool_input: &str) -> HookRunResult { - self.run_commands( + Self::run_commands( HookEvent::PreToolUse, self.config.pre_tool_use(), tool_name, @@ -82,7 +82,7 @@ impl HookRunner { tool_output: &str, is_error: bool, ) -> HookRunResult { - self.run_commands( + Self::run_commands( HookEvent::PostToolUse, self.config.post_tool_use(), tool_name, @@ -93,7 +93,6 @@ impl HookRunner { } fn run_commands( - &self, event: HookEvent, commands: &[String], tool_name: &str, @@ -114,19 +113,19 @@ impl HookRunner { "tool_result_is_error": is_error, }) .to_string(); + let invocation = HookInvocation { + event, + tool_name, + tool_input, + tool_output, + is_error, + payload: &payload, + }; let mut messages = Vec::new(); for command in commands { - match self.run_command( - command, - event, - tool_name, - tool_input, - tool_output, - is_error, - &payload, - ) { + match Self::run_command(command, &invocation) { HookCommandOutcome::Allow { message } => { if let Some(message) = message { messages.push(message); @@ -149,29 +148,23 @@ impl HookRunner { HookRunResult::allow(messages) } - fn run_command( - &self, - command: &str, - event: HookEvent, - tool_name: &str, - tool_input: &str, - tool_output: Option<&str>, - is_error: bool, - payload: &str, - ) -> HookCommandOutcome { + fn run_command(command: &str, invocation: &HookInvocation<'_>) -> HookCommandOutcome { let mut child = shell_command(command); child.stdin(std::process::Stdio::piped()); child.stdout(std::process::Stdio::piped()); child.stderr(std::process::Stdio::piped()); - child.env("HOOK_EVENT", event.as_str()); - child.env("HOOK_TOOL_NAME", tool_name); - child.env("HOOK_TOOL_INPUT", tool_input); - child.env("HOOK_TOOL_IS_ERROR", if is_error { "1" } else { "0" }); - if let Some(tool_output) = tool_output { + child.env("HOOK_EVENT", invocation.event.as_str()); + child.env("HOOK_TOOL_NAME", invocation.tool_name); + child.env("HOOK_TOOL_INPUT", invocation.tool_input); + child.env( + "HOOK_TOOL_IS_ERROR", + if invocation.is_error { "1" } else { "0" }, + ); + if let Some(tool_output) = invocation.tool_output { child.env("HOOK_TOOL_OUTPUT", tool_output); } - match child.output_with_stdin(payload.as_bytes()) { + match child.output_with_stdin(invocation.payload.as_bytes()) { Ok(output) => { let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string(); @@ -189,8 +182,9 @@ impl HookRunner { }, None => HookCommandOutcome::Warn { message: format!( - "{} hook `{command}` terminated by signal while handling `{tool_name}`", - event.as_str() + "{} hook `{command}` terminated by signal while handling `{}`", + invocation.event.as_str(), + invocation.tool_name ), }, } @@ -198,13 +192,23 @@ impl HookRunner { Err(error) => HookCommandOutcome::Warn { message: format!( "{} hook `{command}` failed to start for `{tool_name}`: {error}", - event.as_str() + invocation.event.as_str(), + tool_name = invocation.tool_name ), }, } } } +struct HookInvocation<'a> { + event: HookEvent, + tool_name: &'a str, + tool_input: &'a str, + tool_output: Option<&'a str>, + is_error: bool, + payload: &'a str, +} + enum HookCommandOutcome { Allow { message: Option }, Deny { message: Option }, diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index 5f8a7a6..b645a89 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -4,6 +4,7 @@ mod render; use std::collections::{BTreeMap, BTreeSet}; use std::env; +use std::fmt::Write as _; use std::fs; use std::io::{self, Read, Write}; use std::net::TcpListener; @@ -13,8 +14,9 @@ use std::time::{SystemTime, UNIX_EPOCH}; use api::{ resolve_startup_auth_source, AnthropicClient, AuthSource, ContentBlockDelta, InputContentBlock, - InputMessage, MessageRequest, MessageResponse, OutputContentBlock, - StreamEvent as ApiStreamEvent, ToolChoice, ToolDefinition, ToolResultContentBlock, + InputMessage, JsonlTelemetrySink, MessageRequest, MessageResponse, OutputContentBlock, + SessionTracer, StreamEvent as ApiStreamEvent, ToolChoice, ToolDefinition, + ToolResultContentBlock, }; use commands::{ @@ -44,6 +46,7 @@ fn max_tokens_for_model(model: &str) -> u32 { } const DEFAULT_DATE: &str = "2026-03-31"; const DEFAULT_OAUTH_CALLBACK_PORT: u16 = 4545; +const TELEMETRY_LOG_PATH_ENV: &str = "CLAW_TELEMETRY_LOG_PATH"; const VERSION: &str = env!("CARGO_PKG_VERSION"); const BUILD_TARGET: Option<&str> = option_env!("TARGET"); const GIT_SHA: Option<&str> = option_env!("GIT_SHA"); @@ -995,6 +998,7 @@ impl LiveCli { let session = create_managed_session_handle()?; let runtime = build_runtime( Session::new(), + &session.id, model.clone(), system_prompt.clone(), enable_tools, @@ -1086,6 +1090,7 @@ impl LiveCli { let session = self.runtime.session().clone(); let mut runtime = build_runtime( session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1275,6 +1280,7 @@ impl LiveCli { self.permission_mode = permission_mode_from_label(normalized); self.runtime = build_runtime( session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1300,6 +1306,7 @@ impl LiveCli { self.session = create_managed_session_handle()?; self.runtime = build_runtime( Session::new(), + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1335,6 +1342,7 @@ impl LiveCli { let message_count = session.messages.len(); self.runtime = build_runtime( session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1407,6 +1415,7 @@ impl LiveCli { let message_count = session.messages.len(); self.runtime = build_runtime( session, + &handle.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1437,6 +1446,7 @@ impl LiveCli { let skipped = removed == 0; self.runtime = build_runtime( result.compacted_session, + &self.session.id, self.model.clone(), self.system_prompt.clone(), true, @@ -1914,6 +1924,7 @@ fn build_runtime_feature_config( fn build_runtime( session: Session, + session_id: &str, model: String, system_prompt: Vec, enable_tools: bool, @@ -1922,14 +1933,42 @@ fn build_runtime( permission_mode: PermissionMode, ) -> Result, Box> { - Ok(ConversationRuntime::new_with_features( + let session_tracer = build_session_tracer(session_id)?; + let api_client = match session_tracer.clone() { + Some(session_tracer) => AnthropicRuntimeClient::new( + model, + enable_tools, + emit_output, + allowed_tools.clone(), + )? + .with_session_tracer(session_tracer), + None => AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())?, + }; + let runtime = ConversationRuntime::new_with_features( session, - AnthropicRuntimeClient::new(model, enable_tools, emit_output, allowed_tools.clone())?, + api_client, CliToolExecutor::new(allowed_tools, emit_output), permission_policy(permission_mode), system_prompt, - build_runtime_feature_config()?, - )) + &build_runtime_feature_config()?, + ); + Ok(match session_tracer { + Some(session_tracer) => runtime.with_session_tracer(session_tracer), + None => runtime, + }) +} + +fn build_session_tracer( + session_id: &str, +) -> Result, Box> { + let Some(path) = env::var_os(TELEMETRY_LOG_PATH_ENV) else { + return Ok(None); + }; + let sink = JsonlTelemetrySink::new(PathBuf::from(path))?; + Ok(Some(SessionTracer::new( + session_id.to_string(), + std::sync::Arc::new(sink), + ))) } struct CliPermissionPrompter { @@ -2004,6 +2043,11 @@ impl AnthropicRuntimeClient { allowed_tools, }) } + + fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self { + self.client = self.client.with_session_tracer(session_tracer); + self + } } fn resolve_cli_auth_source() -> Result> { @@ -2364,13 +2408,13 @@ fn format_bash_result(icon: &str, parsed: &serde_json::Value) -> String { .get("backgroundTaskId") .and_then(|value| value.as_str()) { - lines[0].push_str(&format!(" backgrounded ({task_id})")); + write!(&mut lines[0], " backgrounded ({task_id})").expect("write to string"); } else if let Some(status) = parsed .get("returnCodeInterpretation") .and_then(|value| value.as_str()) .filter(|status| !status.is_empty()) { - lines[0].push_str(&format!(" {status}")); + write!(&mut lines[0], " {status}").expect("write to string"); } if let Some(stdout) = parsed.get("stdout").and_then(|value| value.as_str()) { @@ -2392,15 +2436,15 @@ fn format_read_result(icon: &str, parsed: &serde_json::Value) -> String { let path = extract_tool_path(file); let start_line = file .get("startLine") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(1); let num_lines = file .get("numLines") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(0); let total_lines = file .get("totalLines") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(num_lines); let content = file .get("content") @@ -2426,8 +2470,7 @@ fn format_write_result(icon: &str, parsed: &serde_json::Value) -> String { let line_count = parsed .get("content") .and_then(|value| value.as_str()) - .map(|content| content.lines().count()) - .unwrap_or(0); + .map_or(0, |content| content.lines().count()); format!( "{icon} \x1b[1;32m✏️ {} {path}\x1b[0m \x1b[2m({line_count} lines)\x1b[0m", if kind == "create" { "Wrote" } else { "Updated" }, @@ -2458,7 +2501,7 @@ fn format_edit_result(icon: &str, parsed: &serde_json::Value) -> String { let path = extract_tool_path(parsed); let suffix = if parsed .get("replaceAll") - .and_then(|value| value.as_bool()) + .and_then(serde_json::Value::as_bool) .unwrap_or(false) { " (replace all)" @@ -2486,7 +2529,7 @@ fn format_edit_result(icon: &str, parsed: &serde_json::Value) -> String { fn format_glob_result(icon: &str, parsed: &serde_json::Value) -> String { let num_files = parsed .get("numFiles") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(0); let filenames = parsed .get("filenames") @@ -2510,11 +2553,11 @@ fn format_glob_result(icon: &str, parsed: &serde_json::Value) -> String { fn format_grep_result(icon: &str, parsed: &serde_json::Value) -> String { let num_matches = parsed .get("numMatches") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(0); let num_files = parsed .get("numFiles") - .and_then(|value| value.as_u64()) + .and_then(serde_json::Value::as_u64) .unwrap_or(0); let content = parsed .get("content") diff --git a/rust/crates/rusty-claude-cli/src/render.rs b/rust/crates/rusty-claude-cli/src/render.rs index 465c5a4..d8d8796 100644 --- a/rust/crates/rusty-claude-cli/src/render.rs +++ b/rust/crates/rusty-claude-cli/src/render.rs @@ -286,7 +286,7 @@ impl TerminalRenderer { ) { match event { Event::Start(Tag::Heading { level, .. }) => { - self.start_heading(state, level as u8, output) + Self::start_heading(state, level as u8, output); } Event::End(TagEnd::Paragraph) => output.push_str("\n\n"), Event::Start(Tag::BlockQuote(..)) => self.start_quote(state, output), @@ -426,7 +426,7 @@ impl TerminalRenderer { } } - fn start_heading(&self, state: &mut RenderState, level: u8, output: &mut String) { + fn start_heading(state: &mut RenderState, level: u8, output: &mut String) { state.heading_level = Some(level); if !output.is_empty() { output.push('\n'); diff --git a/rust/crates/telemetry/Cargo.toml b/rust/crates/telemetry/Cargo.toml new file mode 100644 index 0000000..d501850 --- /dev/null +++ b/rust/crates/telemetry/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "telemetry" +version.workspace = true +edition.workspace = true +license.workspace = true +publish.workspace = true + +[dependencies] +serde = { version = "1", features = ["derive"] } +serde_json = "1" + +[lints] +workspace = true diff --git a/rust/crates/telemetry/src/lib.rs b/rust/crates/telemetry/src/lib.rs new file mode 100644 index 0000000..e548f45 --- /dev/null +++ b/rust/crates/telemetry/src/lib.rs @@ -0,0 +1,509 @@ +use std::fmt::{Debug, Formatter}; +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::{Arc, Mutex}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; + +pub const DEFAULT_ANTHROPIC_VERSION: &str = "2023-06-01"; +pub const DEFAULT_APP_NAME: &str = "clawd-code"; +pub const DEFAULT_RUNTIME: &str = "rust"; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct ClientIdentity { + pub app_name: String, + pub app_version: String, + pub runtime: String, +} + +impl ClientIdentity { + #[must_use] + pub fn new(app_name: impl Into, app_version: impl Into) -> Self { + Self { + app_name: app_name.into(), + app_version: app_version.into(), + runtime: DEFAULT_RUNTIME.to_string(), + } + } + + #[must_use] + pub fn with_runtime(mut self, runtime: impl Into) -> Self { + self.runtime = runtime.into(); + self + } + + #[must_use] + pub fn user_agent(&self) -> String { + format!("{}/{} ({})", self.app_name, self.app_version, self.runtime) + } +} + +impl Default for ClientIdentity { + fn default() -> Self { + Self::new(DEFAULT_APP_NAME, env!("CARGO_PKG_VERSION")) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AnthropicRequestProfile { + pub anthropic_version: String, + pub client_identity: ClientIdentity, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub betas: Vec, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub extra_body: Map, +} + +impl AnthropicRequestProfile { + #[must_use] + pub fn new(client_identity: ClientIdentity) -> Self { + Self { + anthropic_version: DEFAULT_ANTHROPIC_VERSION.to_string(), + client_identity, + betas: Vec::new(), + extra_body: Map::new(), + } + } + + #[must_use] + pub fn with_beta(mut self, beta: impl Into) -> Self { + let beta = beta.into(); + if !self.betas.contains(&beta) { + self.betas.push(beta); + } + self + } + + #[must_use] + pub fn with_extra_body(mut self, key: impl Into, value: Value) -> Self { + self.extra_body.insert(key.into(), value); + self + } + + #[must_use] + pub fn header_pairs(&self) -> Vec<(String, String)> { + let mut headers = vec![ + ( + "anthropic-version".to_string(), + self.anthropic_version.clone(), + ), + ("user-agent".to_string(), self.client_identity.user_agent()), + ]; + if !self.betas.is_empty() { + headers.push(("anthropic-beta".to_string(), self.betas.join(","))); + } + headers + } + + pub fn render_json_body(&self, request: &T) -> Result { + let mut body = serde_json::to_value(request)?; + let object = body.as_object_mut().ok_or_else(|| { + serde_json::Error::io(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "request body must serialize to a JSON object", + )) + })?; + for (key, value) in &self.extra_body { + object.insert(key.clone(), value.clone()); + } + Ok(body) + } +} + +impl Default for AnthropicRequestProfile { + fn default() -> Self { + Self::new(ClientIdentity::default()) + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct AnalyticsEvent { + pub namespace: String, + pub action: String, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub properties: Map, +} + +impl AnalyticsEvent { + #[must_use] + pub fn new(namespace: impl Into, action: impl Into) -> Self { + Self { + namespace: namespace.into(), + action: action.into(), + properties: Map::new(), + } + } + + #[must_use] + pub fn with_property(mut self, key: impl Into, value: Value) -> Self { + self.properties.insert(key.into(), value); + self + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SessionTraceRecord { + pub session_id: String, + pub sequence: u64, + pub name: String, + pub timestamp_ms: u64, + #[serde(default, skip_serializing_if = "Map::is_empty")] + pub attributes: Map, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TelemetryEvent { + HttpRequestStarted { + session_id: String, + attempt: u32, + method: String, + path: String, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + HttpRequestSucceeded { + session_id: String, + attempt: u32, + method: String, + path: String, + status: u16, + #[serde(default, skip_serializing_if = "Option::is_none")] + request_id: Option, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + HttpRequestFailed { + session_id: String, + attempt: u32, + method: String, + path: String, + error: String, + retryable: bool, + #[serde(default, skip_serializing_if = "Map::is_empty")] + attributes: Map, + }, + Analytics(AnalyticsEvent), + SessionTrace(SessionTraceRecord), +} + +pub trait TelemetrySink: Send + Sync { + fn record(&self, event: TelemetryEvent); +} + +#[derive(Default)] +pub struct MemoryTelemetrySink { + events: Mutex>, +} + +impl MemoryTelemetrySink { + #[must_use] + pub fn events(&self) -> Vec { + self.events + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .clone() + } +} + +impl TelemetrySink for MemoryTelemetrySink { + fn record(&self, event: TelemetryEvent) { + self.events + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + .push(event); + } +} + +pub struct JsonlTelemetrySink { + path: PathBuf, + file: Mutex, +} + +impl Debug for JsonlTelemetrySink { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JsonlTelemetrySink") + .field("path", &self.path) + .finish_non_exhaustive() + } +} + +impl JsonlTelemetrySink { + pub fn new(path: impl AsRef) -> Result { + let path = path.as_ref().to_path_buf(); + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent)?; + } + let file = OpenOptions::new().create(true).append(true).open(&path)?; + Ok(Self { + path, + file: Mutex::new(file), + }) + } + + #[must_use] + pub fn path(&self) -> &Path { + &self.path + } +} + +impl TelemetrySink for JsonlTelemetrySink { + fn record(&self, event: TelemetryEvent) { + let Ok(line) = serde_json::to_string(&event) else { + return; + }; + let mut file = self + .file + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let _ = writeln!(file, "{line}"); + let _ = file.flush(); + } +} + +#[derive(Clone)] +pub struct SessionTracer { + session_id: String, + sequence: Arc, + sink: Arc, +} + +impl Debug for SessionTracer { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SessionTracer") + .field("session_id", &self.session_id) + .finish_non_exhaustive() + } +} + +impl SessionTracer { + #[must_use] + pub fn new(session_id: impl Into, sink: Arc) -> Self { + Self { + session_id: session_id.into(), + sequence: Arc::new(AtomicU64::new(0)), + sink, + } + } + + #[must_use] + pub fn session_id(&self) -> &str { + &self.session_id + } + + pub fn record(&self, name: impl Into, attributes: Map) { + let record = SessionTraceRecord { + session_id: self.session_id.clone(), + sequence: self.sequence.fetch_add(1, Ordering::Relaxed), + name: name.into(), + timestamp_ms: current_timestamp_ms(), + attributes, + }; + self.sink.record(TelemetryEvent::SessionTrace(record)); + } + + pub fn record_http_request_started( + &self, + attempt: u32, + method: impl Into, + path: impl Into, + attributes: Map, + ) { + let method = method.into(); + let path = path.into(); + self.sink.record(TelemetryEvent::HttpRequestStarted { + session_id: self.session_id.clone(), + attempt, + method: method.clone(), + path: path.clone(), + attributes: attributes.clone(), + }); + self.record( + "http_request_started", + merge_trace_fields(method, path, attempt, attributes), + ); + } + + pub fn record_http_request_succeeded( + &self, + attempt: u32, + method: impl Into, + path: impl Into, + status: u16, + request_id: Option, + attributes: Map, + ) { + let method = method.into(); + let path = path.into(); + self.sink.record(TelemetryEvent::HttpRequestSucceeded { + session_id: self.session_id.clone(), + attempt, + method: method.clone(), + path: path.clone(), + status, + request_id: request_id.clone(), + attributes: attributes.clone(), + }); + let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes); + trace_attributes.insert("status".to_string(), Value::from(status)); + if let Some(request_id) = request_id { + trace_attributes.insert("request_id".to_string(), Value::String(request_id)); + } + self.record("http_request_succeeded", trace_attributes); + } + + pub fn record_http_request_failed( + &self, + attempt: u32, + method: impl Into, + path: impl Into, + error: impl Into, + retryable: bool, + attributes: Map, + ) { + let method = method.into(); + let path = path.into(); + let error = error.into(); + self.sink.record(TelemetryEvent::HttpRequestFailed { + session_id: self.session_id.clone(), + attempt, + method: method.clone(), + path: path.clone(), + error: error.clone(), + retryable, + attributes: attributes.clone(), + }); + let mut trace_attributes = merge_trace_fields(method, path, attempt, attributes); + trace_attributes.insert("error".to_string(), Value::String(error)); + trace_attributes.insert("retryable".to_string(), Value::Bool(retryable)); + self.record("http_request_failed", trace_attributes); + } + + pub fn record_analytics(&self, event: AnalyticsEvent) { + let mut attributes = event.properties.clone(); + attributes.insert( + "namespace".to_string(), + Value::String(event.namespace.clone()), + ); + attributes.insert("action".to_string(), Value::String(event.action.clone())); + self.sink.record(TelemetryEvent::Analytics(event)); + self.record("analytics", attributes); + } +} + +fn merge_trace_fields( + method: String, + path: String, + attempt: u32, + mut attributes: Map, +) -> Map { + attributes.insert("method".to_string(), Value::String(method)); + attributes.insert("path".to_string(), Value::String(path)); + attributes.insert("attempt".to_string(), Value::from(attempt)); + attributes +} + +fn current_timestamp_ms() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + .try_into() + .unwrap_or(u64::MAX) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn request_profile_emits_headers_and_merges_body() { + let profile = AnthropicRequestProfile::new( + ClientIdentity::new("clawd-code", "1.2.3").with_runtime("rust-cli"), + ) + .with_beta("tools-2026-04-01") + .with_extra_body("metadata", serde_json::json!({"source": "test"})); + + assert_eq!( + profile.header_pairs(), + vec![ + ( + "anthropic-version".to_string(), + DEFAULT_ANTHROPIC_VERSION.to_string() + ), + ( + "user-agent".to_string(), + "clawd-code/1.2.3 (rust-cli)".to_string() + ), + ("anthropic-beta".to_string(), "tools-2026-04-01".to_string(),), + ] + ); + + let body = profile + .render_json_body(&serde_json::json!({"model": "claude-sonnet"})) + .expect("body should serialize"); + assert_eq!( + body["metadata"]["source"], + Value::String("test".to_string()) + ); + } + + #[test] + fn session_tracer_records_structured_events_and_trace_sequence() { + let sink = Arc::new(MemoryTelemetrySink::default()); + let tracer = SessionTracer::new("session-123", sink.clone()); + + tracer.record_http_request_started(1, "POST", "/v1/messages", Map::new()); + tracer.record_analytics( + AnalyticsEvent::new("cli", "prompt_sent") + .with_property("model", Value::String("claude-opus".to_string())), + ); + + let events = sink.events(); + assert!(matches!( + &events[0], + TelemetryEvent::HttpRequestStarted { + session_id, + attempt: 1, + method, + path, + .. + } if session_id == "session-123" && method == "POST" && path == "/v1/messages" + )); + assert!(matches!( + &events[1], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 0, name, .. }) + if name == "http_request_started" + )); + assert!(matches!(&events[2], TelemetryEvent::Analytics(_))); + assert!(matches!( + &events[3], + TelemetryEvent::SessionTrace(SessionTraceRecord { sequence: 1, name, .. }) + if name == "analytics" + )); + } + + #[test] + fn jsonl_sink_persists_events() { + let path = std::env::temp_dir().join(format!( + "telemetry-jsonl-{}.log", + current_timestamp_ms() + )); + let sink = JsonlTelemetrySink::new(&path).expect("sink should create file"); + + sink.record(TelemetryEvent::Analytics( + AnalyticsEvent::new("cli", "turn_completed") + .with_property("ok", Value::Bool(true)), + )); + + let contents = std::fs::read_to_string(&path).expect("telemetry log should be readable"); + assert!(contents.contains("\"type\":\"analytics\"")); + assert!(contents.contains("\"action\":\"turn_completed\"")); + + let _ = std::fs::remove_file(path); + } +}