wip: telemetry progress

This commit is contained in:
Yeachan-Heo
2026-04-01 04:30:29 +00:00
parent ac6c5d00a8
commit 5170718306
12 changed files with 1060 additions and 70 deletions

View File

@@ -10,6 +10,7 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus
runtime = { path = "../runtime" }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
telemetry = { path = "../telemetry" }
tokio = { version = "1", features = ["io-util", "macros", "net", "rt-multi-thread", "time"] }
[lints]

View File

@@ -6,13 +6,15 @@ use runtime::{
OAuthTokenExchangeRequest,
};
use serde::Deserialize;
use serde_json::{Map, Value};
use telemetry::{AnthropicRequestProfile, ClientIdentity, SessionTracer};
use crate::error::ApiError;
use crate::sse::SseParser;
use crate::types::{MessageRequest, MessageResponse, StreamEvent};
const DEFAULT_BASE_URL: &str = "https://api.anthropic.com";
const ANTHROPIC_VERSION: &str = "2023-06-01";
const MESSAGES_PATH: &str = "/v1/messages";
const REQUEST_ID_HEADER: &str = "request-id";
const ALT_REQUEST_ID_HEADER: &str = "x-request-id";
const DEFAULT_INITIAL_BACKOFF: Duration = Duration::from_millis(200);
@@ -108,6 +110,8 @@ pub struct AnthropicClient {
max_retries: u32,
initial_backoff: Duration,
max_backoff: Duration,
request_profile: AnthropicRequestProfile,
session_tracer: Option<SessionTracer>,
}
impl AnthropicClient {
@@ -120,6 +124,8 @@ impl AnthropicClient {
max_retries: DEFAULT_MAX_RETRIES,
initial_backoff: DEFAULT_INITIAL_BACKOFF,
max_backoff: DEFAULT_MAX_BACKOFF,
request_profile: AnthropicRequestProfile::default(),
session_tracer: None,
}
}
@@ -132,6 +138,8 @@ impl AnthropicClient {
max_retries: DEFAULT_MAX_RETRIES,
initial_backoff: DEFAULT_INITIAL_BACKOFF,
max_backoff: DEFAULT_MAX_BACKOFF,
request_profile: AnthropicRequestProfile::default(),
session_tracer: None,
}
}
@@ -176,6 +184,39 @@ impl AnthropicClient {
self
}
#[must_use]
pub fn with_request_profile(mut self, request_profile: AnthropicRequestProfile) -> Self {
self.request_profile = request_profile;
self
}
#[must_use]
pub fn with_client_identity(mut self, client_identity: ClientIdentity) -> Self {
self.request_profile.client_identity = client_identity;
self
}
#[must_use]
pub fn with_beta(mut self, beta: impl Into<String>) -> Self {
let beta = beta.into();
if !self.request_profile.betas.contains(&beta) {
self.request_profile.betas.push(beta);
}
self
}
#[must_use]
pub fn with_extra_body_param(mut self, key: impl Into<String>, value: Value) -> Self {
self.request_profile.extra_body.insert(key.into(), value);
self
}
#[must_use]
pub fn with_session_tracer(mut self, session_tracer: SessionTracer) -> Self {
self.session_tracer = Some(session_tracer);
self
}
#[must_use]
pub fn with_retry_policy(
mut self,
@@ -279,18 +320,30 @@ impl AnthropicClient {
loop {
attempts += 1;
self.record_request_started(request, attempts);
match self.send_raw_request(request).await {
Ok(response) => match expect_success(response).await {
Ok(response) => return Ok(response),
Ok(response) => {
self.record_request_succeeded(request, attempts, &response);
return Ok(response);
}
Err(error) if error.is_retryable() && attempts <= self.max_retries + 1 => {
self.record_request_failed(request, attempts, &error);
last_error = Some(error);
}
Err(error) => return Err(error),
Err(error) => {
self.record_request_failed(request, attempts, &error);
return Err(error);
}
},
Err(error) if error.is_retryable() && attempts <= self.max_retries + 1 => {
self.record_request_failed(request, attempts, &error);
last_error = Some(error);
}
Err(error) => return Err(error),
Err(error) => {
self.record_request_failed(request, attempts, &error);
return Err(error);
}
}
if attempts > self.max_retries {
@@ -310,18 +363,131 @@ impl AnthropicClient {
&self,
request: &MessageRequest,
) -> Result<reqwest::Response, ApiError> {
let request_url = format!("{}/v1/messages", self.base_url.trim_end_matches('/'));
let request_builder = self
let request_url = format!("{}{}", self.base_url.trim_end_matches('/'), MESSAGES_PATH);
let mut request_builder = self
.http
.post(&request_url)
.header("anthropic-version", ANTHROPIC_VERSION)
.header("content-type", "application/json");
for (name, value) in self.request_profile.header_pairs() {
request_builder = request_builder.header(name, value);
}
let mut request_builder = self.auth.apply(request_builder);
request_builder = request_builder.json(request);
let request_body = self.request_profile.render_json_body(request)?;
request_builder = request_builder.json(&request_body);
request_builder.send().await.map_err(ApiError::from)
}
fn record_request_started(&self, request: &MessageRequest, attempt: u32) {
if let Some(tracer) = &self.session_tracer {
tracer.record_http_request_started(
attempt,
"POST",
MESSAGES_PATH,
self.request_attributes(request),
);
}
}
fn record_request_succeeded(
&self,
request: &MessageRequest,
attempt: u32,
response: &reqwest::Response,
) {
if let Some(tracer) = &self.session_tracer {
tracer.record_http_request_succeeded(
attempt,
"POST",
MESSAGES_PATH,
response.status().as_u16(),
request_id_from_headers(response.headers()),
self.request_attributes(request),
);
}
}
fn record_request_failed(&self, request: &MessageRequest, attempt: u32, error: &ApiError) {
if let Some(tracer) = &self.session_tracer {
tracer.record_http_request_failed(
attempt,
"POST",
MESSAGES_PATH,
error.to_string(),
error.is_retryable(),
self.error_attributes(request, error),
);
}
}
fn request_attributes(&self, request: &MessageRequest) -> Map<String, Value> {
let mut attributes = Map::new();
attributes.insert("model".to_string(), Value::String(request.model.clone()));
attributes.insert("stream".to_string(), Value::Bool(request.stream));
attributes.insert("max_tokens".to_string(), Value::from(request.max_tokens));
attributes.insert(
"message_count".to_string(),
Value::from(u64::try_from(request.messages.len()).unwrap_or(u64::MAX)),
);
attributes.insert(
"tool_count".to_string(),
Value::from(
u64::try_from(request.tools.as_ref().map_or(0, Vec::len)).unwrap_or(u64::MAX),
),
);
attributes.insert(
"beta_count".to_string(),
Value::from(u64::try_from(self.request_profile.betas.len()).unwrap_or(u64::MAX)),
);
if !self.request_profile.extra_body.is_empty() {
attributes.insert(
"extra_body_keys".to_string(),
Value::Array(
self.request_profile
.extra_body
.keys()
.cloned()
.map(Value::String)
.collect(),
),
);
}
attributes
}
fn error_attributes(&self, request: &MessageRequest, error: &ApiError) -> Map<String, Value> {
let mut attributes = self.request_attributes(request);
match error {
ApiError::Api {
status,
error_type,
message,
..
} => {
attributes.insert("status".to_string(), Value::from(status.as_u16()));
if let Some(error_type) = error_type {
attributes.insert("error_type".to_string(), Value::String(error_type.clone()));
}
if let Some(message) = message {
attributes.insert("api_message".to_string(), Value::String(message.clone()));
}
}
ApiError::Http(_) => {
attributes.insert("error_type".to_string(), Value::String("http".to_string()));
}
ApiError::Json(_) => {
attributes.insert("error_type".to_string(), Value::String("json".to_string()));
}
_ => {
attributes.insert(
"error_type".to_string(),
Value::String("client".to_string()),
);
}
}
attributes
}
fn backoff_for_attempt(&self, attempt: u32) -> Result<Duration, ApiError> {
let Some(multiplier) = 1_u32.checked_shl(attempt.saturating_sub(1)) else {
return Err(ApiError::BackoffOverflow {

View File

@@ -15,3 +15,9 @@ pub use types::{
MessageResponse, MessageStartEvent, MessageStopEvent, OutputContentBlock, StreamEvent,
ToolChoice, ToolDefinition, ToolResultContentBlock, Usage,
};
pub use telemetry::{
AnalyticsEvent, AnthropicRequestProfile, ClientIdentity, JsonlTelemetrySink,
MemoryTelemetrySink, SessionTraceRecord, SessionTracer, TelemetryEvent, TelemetrySink,
DEFAULT_ANTHROPIC_VERSION,
};

View File

@@ -8,6 +8,7 @@ use api::{
StreamEvent, ToolChoice, ToolDefinition,
};
use serde_json::json;
use telemetry::{ClientIdentity, MemoryTelemetrySink, SessionTracer, TelemetryEvent};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::sync::Mutex;
@@ -64,6 +65,14 @@ async fn send_message_posts_json_and_parses_response() {
request.headers.get("authorization").map(String::as_str),
Some("Bearer proxy-token")
);
assert_eq!(
request.headers.get("anthropic-version").map(String::as_str),
Some("2023-06-01")
);
assert_eq!(
request.headers.get("user-agent").map(String::as_str),
Some("clawd-code/0.1.0 (rust)")
);
let body: serde_json::Value =
serde_json::from_str(&request.body).expect("request body should be json");
assert_eq!(
@@ -75,6 +84,90 @@ async fn send_message_posts_json_and_parses_response() {
assert_eq!(body["tool_choice"]["type"], json!("auto"));
}
#[tokio::test]
async fn send_message_applies_request_profile_and_records_telemetry() {
let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));
let server = spawn_server(
state.clone(),
vec![http_response_with_headers(
"200 OK",
"application/json",
concat!(
"{",
"\"id\":\"msg_profile\",",
"\"type\":\"message\",",
"\"role\":\"assistant\",",
"\"content\":[{\"type\":\"text\",\"text\":\"ok\"}],",
"\"model\":\"claude-3-7-sonnet-latest\",",
"\"stop_reason\":\"end_turn\",",
"\"stop_sequence\":null,",
"\"usage\":{\"input_tokens\":1,\"output_tokens\":1}",
"}"
),
&[("request-id", "req_profile_123")],
)],
)
.await;
let sink = Arc::new(MemoryTelemetrySink::default());
let client = AnthropicClient::new("test-key")
.with_base_url(server.base_url())
.with_client_identity(ClientIdentity::new("clawd-code", "9.9.9").with_runtime("rust-cli"))
.with_beta("tools-2026-04-01")
.with_extra_body_param("metadata", json!({"source": "clawd-code"}))
.with_session_tracer(SessionTracer::new("session-telemetry", sink.clone()));
let response = client
.send_message(&sample_request(false))
.await
.expect("request should succeed");
assert_eq!(response.request_id.as_deref(), Some("req_profile_123"));
let captured = state.lock().await;
let request = captured.first().expect("server should capture request");
assert_eq!(
request.headers.get("anthropic-beta").map(String::as_str),
Some("tools-2026-04-01")
);
assert_eq!(
request.headers.get("user-agent").map(String::as_str),
Some("clawd-code/9.9.9 (rust-cli)")
);
let body: serde_json::Value =
serde_json::from_str(&request.body).expect("request body should be json");
assert_eq!(body["metadata"]["source"], json!("clawd-code"));
let events = sink.events();
assert_eq!(events.len(), 4);
assert!(matches!(
&events[0],
TelemetryEvent::HttpRequestStarted {
session_id,
attempt: 1,
method,
path,
..
} if session_id == "session-telemetry" && method == "POST" && path == "/v1/messages"
));
assert!(matches!(
&events[1],
TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_started"
));
assert!(matches!(
&events[2],
TelemetryEvent::HttpRequestSucceeded {
request_id,
status: 200,
..
} if request_id.as_deref() == Some("req_profile_123")
));
assert!(matches!(
&events[3],
TelemetryEvent::SessionTrace(trace) if trace.name == "http_request_succeeded"
));
}
#[tokio::test]
async fn stream_message_parses_sse_events_with_tool_use() {
let state = Arc::new(Mutex::new(Vec::<CapturedRequest>::new()));