diff --git a/rust/crates/commands/src/lib.rs b/rust/crates/commands/src/lib.rs index b396bb0..b1ae4b0 100644 --- a/rust/crates/commands/src/lib.rs +++ b/rust/crates/commands/src/lib.rs @@ -392,19 +392,17 @@ mod tests { #[test] fn compacts_sessions_via_slash_command() { - let session = Session { - version: 1, - messages: vec![ - ConversationMessage::user_text("a ".repeat(200)), - ConversationMessage::assistant(vec![ContentBlock::Text { - text: "b ".repeat(200), - }]), - ConversationMessage::tool_result("1", "bash", "ok ".repeat(200), false), - ConversationMessage::assistant(vec![ContentBlock::Text { - text: "recent".to_string(), - }]), - ], - }; + let mut session = Session::new(); + session.messages = vec![ + ConversationMessage::user_text("a ".repeat(200)), + ConversationMessage::assistant(vec![ContentBlock::Text { + text: "b ".repeat(200), + }]), + ConversationMessage::tool_result("1", "bash", "ok ".repeat(200), false), + ConversationMessage::assistant(vec![ContentBlock::Text { + text: "recent".to_string(), + }]), + ]; let result = handle_slash_command( "/compact", diff --git a/rust/crates/runtime/src/compact.rs b/rust/crates/runtime/src/compact.rs index e227019..571a43c 100644 --- a/rust/crates/runtime/src/compact.rs +++ b/rust/crates/runtime/src/compact.rs @@ -99,13 +99,14 @@ pub fn compact_session(session: &Session, config: CompactionConfig) -> Compactio }]; compacted_messages.extend(preserved); + let mut compacted_session = session.clone(); + compacted_session.messages = compacted_messages; + compacted_session.record_compaction(summary.clone(), removed.len()); + CompactionResult { summary, formatted_summary, - compacted_session: Session { - version: session.version, - messages: compacted_messages, - }, + compacted_session, removed_message_count: removed.len(), } } @@ -390,10 +391,8 @@ mod tests { #[test] fn leaves_small_sessions_unchanged() { - let session = Session { - version: 1, - messages: vec![ConversationMessage::user_text("hello")], - }; + let mut session = Session::new(); + session.messages = vec![ConversationMessage::user_text("hello")]; let result = compact_session(&session, CompactionConfig::default()); assert_eq!(result.removed_message_count, 0); @@ -404,23 +403,21 @@ mod tests { #[test] fn compacts_older_messages_into_a_system_summary() { - let session = Session { - version: 1, - messages: vec![ - ConversationMessage::user_text("one ".repeat(200)), - ConversationMessage::assistant(vec![ContentBlock::Text { - text: "two ".repeat(200), - }]), - ConversationMessage::tool_result("1", "bash", "ok ".repeat(200), false), - ConversationMessage { - role: MessageRole::Assistant, - blocks: vec![ContentBlock::Text { - text: "recent".to_string(), - }], - usage: None, - }, - ], - }; + let mut session = Session::new(); + session.messages = vec![ + ConversationMessage::user_text("one ".repeat(200)), + ConversationMessage::assistant(vec![ContentBlock::Text { + text: "two ".repeat(200), + }]), + ConversationMessage::tool_result("1", "bash", "ok ".repeat(200), false), + ConversationMessage { + role: MessageRole::Assistant, + blocks: vec![ContentBlock::Text { + text: "recent".to_string(), + }], + usage: None, + }, + ]; let result = compact_session( &session, diff --git a/rust/crates/runtime/src/conversation.rs b/rust/crates/runtime/src/conversation.rs index 4ffbabc..94cf9ba 100644 --- a/rust/crates/runtime/src/conversation.rs +++ b/rust/crates/runtime/src/conversation.rs @@ -156,8 +156,8 @@ where mut prompter: Option<&mut dyn PermissionPrompter>, ) -> Result { self.session - .messages - .push(ConversationMessage::user_text(user_input.into())); + .push_user_text(user_input.into()) + .map_err(|error| RuntimeError::new(error.to_string()))?; let mut assistant_messages = Vec::new(); let mut tool_results = Vec::new(); @@ -191,7 +191,9 @@ where }) .collect::>(); - self.session.messages.push(assistant_message.clone()); + self.session + .push_message(assistant_message.clone()) + .map_err(|error| RuntimeError::new(error.to_string()))?; assistant_messages.push(assistant_message); if pending_tool_uses.is_empty() { @@ -249,7 +251,9 @@ where ConversationMessage::tool_result(tool_use_id, tool_name, reason, true) } }; - self.session.messages.push(result_message.clone()); + self.session + .push_message(result_message.clone()) + .map_err(|error| RuntimeError::new(error.to_string()))?; tool_results.push(result_message); } } @@ -408,7 +412,9 @@ mod tests { use crate::prompt::{ProjectContext, SystemPromptBuilder}; use crate::session::{ContentBlock, MessageRole, Session}; use crate::usage::TokenUsage; + use std::fs; use std::path::PathBuf; + use std::time::{SystemTime, UNIX_EPOCH}; struct ScriptedApiClient { call_count: usize, @@ -787,6 +793,57 @@ mod tests { result.compacted_session.messages[0].role, MessageRole::System ); + assert_eq!( + result.compacted_session.session_id, + runtime.session().session_id + ); + assert!(result.compacted_session.compaction.is_some()); + } + + #[test] + fn persists_conversation_turn_messages_to_jsonl_session() { + struct SimpleApi; + impl ApiClient for SimpleApi { + fn stream( + &mut self, + _request: ApiRequest, + ) -> Result, RuntimeError> { + Ok(vec![ + AssistantEvent::TextDelta("done".to_string()), + AssistantEvent::MessageStop, + ]) + } + } + + let path = temp_session_path("persisted-turn"); + let session = Session::new().with_persistence_path(path.clone()); + let mut runtime = ConversationRuntime::new( + session, + SimpleApi, + StaticToolExecutor::new(), + PermissionPolicy::new(PermissionMode::DangerFullAccess), + vec!["system".to_string()], + ); + + runtime + .run_turn("persist this turn", None) + .expect("turn should succeed"); + + let restored = Session::load_from_path(&path).expect("persisted session should reload"); + fs::remove_file(&path).expect("temp session file should be removable"); + + assert_eq!(restored.messages.len(), 2); + assert_eq!(restored.messages[0].role, MessageRole::User); + assert_eq!(restored.messages[1].role, MessageRole::Assistant); + assert_eq!(restored.session_id, runtime.session().session_id); + } + + fn temp_session_path(label: &str) -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time should be after epoch") + .as_nanos(); + std::env::temp_dir().join(format!("runtime-conversation-{label}-{nanos}.json")) } #[cfg(windows)] diff --git a/rust/crates/runtime/src/lib.rs b/rust/crates/runtime/src/lib.rs index da745e5..6e1a50a 100644 --- a/rust/crates/runtime/src/lib.rs +++ b/rust/crates/runtime/src/lib.rs @@ -76,7 +76,9 @@ pub use remote::{ RemoteSessionContext, UpstreamProxyBootstrap, UpstreamProxyState, DEFAULT_REMOTE_BASE_URL, DEFAULT_SESSION_TOKEN_PATH, DEFAULT_SYSTEM_CA_BUNDLE, NO_PROXY_HOSTS, UPSTREAM_PROXY_ENV_KEYS, }; -pub use session::{ContentBlock, ConversationMessage, MessageRole, Session, SessionError}; +pub use session::{ + ContentBlock, ConversationMessage, MessageRole, Session, SessionCompaction, SessionError, +}; pub use usage::{ format_usd, pricing_for_model, ModelPricing, TokenUsage, UsageCostEstimate, UsageTracker, }; diff --git a/rust/crates/runtime/src/session.rs b/rust/crates/runtime/src/session.rs index beaa435..46f0fc0 100644 --- a/rust/crates/runtime/src/session.rs +++ b/rust/crates/runtime/src/session.rs @@ -1,11 +1,19 @@ use std::collections::BTreeMap; use std::fmt::{Display, Formatter}; -use std::fs; -use std::path::Path; +use std::fs::{self, OpenOptions}; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{SystemTime, UNIX_EPOCH}; use crate::json::{JsonError, JsonValue}; use crate::usage::TokenUsage; +const SESSION_VERSION: u32 = 1; +const ROTATE_AFTER_BYTES: u64 = 256 * 1024; +const MAX_ROTATED_FILES: usize = 3; +static SESSION_ID_COUNTER: AtomicU64 = AtomicU64::new(0); + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum MessageRole { System, @@ -40,11 +48,41 @@ pub struct ConversationMessage { } #[derive(Debug, Clone, PartialEq, Eq)] +pub struct SessionCompaction { + pub count: u32, + pub removed_message_count: usize, + pub summary: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct SessionPersistence { + path: PathBuf, +} + +#[derive(Debug, Clone)] pub struct Session { pub version: u32, + pub session_id: String, + pub created_at_ms: u64, + pub updated_at_ms: u64, pub messages: Vec, + pub compaction: Option, + persistence: Option, } +impl PartialEq for Session { + fn eq(&self, other: &Self) -> bool { + self.version == other.version + && self.session_id == other.session_id + && self.created_at_ms == other.created_at_ms + && self.updated_at_ms == other.updated_at_ms + && self.messages == other.messages + && self.compaction == other.compaction + } +} + +impl Eq for Session {} + #[derive(Debug)] pub enum SessionError { Io(std::io::Error), @@ -79,20 +117,65 @@ impl From for SessionError { impl Session { #[must_use] pub fn new() -> Self { + let now = current_time_millis(); Self { - version: 1, + version: SESSION_VERSION, + session_id: generate_session_id(), + created_at_ms: now, + updated_at_ms: now, messages: Vec::new(), + compaction: None, + persistence: None, } } + #[must_use] + pub fn with_persistence_path(mut self, path: impl Into) -> Self { + self.persistence = Some(SessionPersistence { path: path.into() }); + self + } + + #[must_use] + pub fn persistence_path(&self) -> Option<&Path> { + self.persistence.as_ref().map(|value| value.path.as_path()) + } + pub fn save_to_path(&self, path: impl AsRef) -> Result<(), SessionError> { - fs::write(path, self.to_json().render())?; + let path = path.as_ref(); + rotate_session_file_if_needed(path)?; + write_atomic(path, &self.render_jsonl_snapshot())?; + cleanup_rotated_logs(path)?; Ok(()) } pub fn load_from_path(path: impl AsRef) -> Result { + let path = path.as_ref(); let contents = fs::read_to_string(path)?; - Self::from_json(&JsonValue::parse(&contents)?) + let session = match JsonValue::parse(&contents) { + Ok(value) => Self::from_json(&value)?, + Err(_) => Self::from_jsonl(&contents)?, + }; + Ok(session.with_persistence_path(path.to_path_buf())) + } + + pub fn push_message(&mut self, message: ConversationMessage) -> Result<(), SessionError> { + self.touch(); + self.messages.push(message.clone()); + self.append_persisted_message(&message) + } + + pub fn push_user_text(&mut self, text: impl Into) -> Result<(), SessionError> { + self.push_message(ConversationMessage::user_text(text)) + } + + pub fn record_compaction(&mut self, summary: impl Into, removed_message_count: usize) { + self.touch(); + let count = self.compaction.as_ref().map_or(1, |value| value.count + 1); + self.compaction = Some(SessionCompaction { + count, + removed_message_count, + summary: summary.into(), + }); } #[must_use] @@ -102,6 +185,18 @@ impl Session { "version".to_string(), JsonValue::Number(i64::from(self.version)), ); + object.insert( + "session_id".to_string(), + JsonValue::String(self.session_id.clone()), + ); + object.insert( + "created_at_ms".to_string(), + JsonValue::Number(i64_from_u64(self.created_at_ms, "created_at_ms")), + ); + object.insert( + "updated_at_ms".to_string(), + JsonValue::Number(i64_from_u64(self.updated_at_ms, "updated_at_ms")), + ); object.insert( "messages".to_string(), JsonValue::Array( @@ -111,6 +206,9 @@ impl Session { .collect(), ), ); + if let Some(compaction) = &self.compaction { + object.insert("compaction".to_string(), compaction.to_json()); + } JsonValue::Object(object) } @@ -131,7 +229,171 @@ impl Session { .iter() .map(ConversationMessage::from_json) .collect::, _>>()?; - Ok(Self { version, messages }) + let now = current_time_millis(); + let session_id = object + .get("session_id") + .and_then(JsonValue::as_str) + .map(ToOwned::to_owned) + .unwrap_or_else(generate_session_id); + let created_at_ms = object + .get("created_at_ms") + .map(|value| required_u64_from_value(value, "created_at_ms")) + .transpose()? + .unwrap_or(now); + let updated_at_ms = object + .get("updated_at_ms") + .map(|value| required_u64_from_value(value, "updated_at_ms")) + .transpose()? + .unwrap_or(created_at_ms); + let compaction = object + .get("compaction") + .map(SessionCompaction::from_json) + .transpose()?; + Ok(Self { + version, + session_id, + created_at_ms, + updated_at_ms, + messages, + compaction, + persistence: None, + }) + } + + fn from_jsonl(contents: &str) -> Result { + let mut version = SESSION_VERSION; + let mut session_id = None; + let mut created_at_ms = None; + let mut updated_at_ms = None; + let mut messages = Vec::new(); + let mut compaction = None; + + for (line_number, raw_line) in contents.lines().enumerate() { + let line = raw_line.trim(); + if line.is_empty() { + continue; + } + let value = JsonValue::parse(line).map_err(|error| { + SessionError::Format(format!( + "invalid JSONL record at line {}: {}", + line_number + 1, + error + )) + })?; + let object = value.as_object().ok_or_else(|| { + SessionError::Format(format!( + "JSONL record at line {} must be an object", + line_number + 1 + )) + })?; + match object + .get("type") + .and_then(JsonValue::as_str) + .ok_or_else(|| { + SessionError::Format(format!( + "JSONL record at line {} missing type", + line_number + 1 + )) + })? { + "session_meta" => { + version = required_u32(object, "version")?; + session_id = Some(required_string(object, "session_id")?); + created_at_ms = Some(required_u64(object, "created_at_ms")?); + updated_at_ms = Some(required_u64(object, "updated_at_ms")?); + } + "message" => { + let message_value = object.get("message").ok_or_else(|| { + SessionError::Format(format!( + "JSONL record at line {} missing message", + line_number + 1 + )) + })?; + messages.push(ConversationMessage::from_json(message_value)?); + } + "compaction" => { + compaction = Some(SessionCompaction::from_json(&JsonValue::Object( + object.clone(), + ))?); + } + other => { + return Err(SessionError::Format(format!( + "unsupported JSONL record type at line {}: {other}", + line_number + 1 + ))) + } + } + } + + let now = current_time_millis(); + Ok(Self { + version, + session_id: session_id.unwrap_or_else(generate_session_id), + created_at_ms: created_at_ms.unwrap_or(now), + updated_at_ms: updated_at_ms.unwrap_or(created_at_ms.unwrap_or(now)), + messages, + compaction, + persistence: None, + }) + } + + fn render_jsonl_snapshot(&self) -> String { + let mut lines = vec![self.meta_record().render()]; + if let Some(compaction) = &self.compaction { + lines.push(compaction.to_jsonl_record().render()); + } + lines.extend( + self.messages + .iter() + .map(|message| message_record(message).render()), + ); + let mut rendered = lines.join("\n"); + rendered.push('\n'); + rendered + } + + fn append_persisted_message(&self, message: &ConversationMessage) -> Result<(), SessionError> { + let Some(path) = self.persistence_path() else { + return Ok(()); + }; + + let needs_bootstrap = !path.exists() || fs::metadata(path)?.len() == 0; + if needs_bootstrap { + self.save_to_path(path)?; + return Ok(()); + } + + let mut file = OpenOptions::new().append(true).open(path)?; + writeln!(file, "{}", message_record(message).render())?; + Ok(()) + } + + fn meta_record(&self) -> JsonValue { + let mut object = BTreeMap::new(); + object.insert( + "type".to_string(), + JsonValue::String("session_meta".to_string()), + ); + object.insert( + "version".to_string(), + JsonValue::Number(i64::from(self.version)), + ); + object.insert( + "session_id".to_string(), + JsonValue::String(self.session_id.clone()), + ); + object.insert( + "created_at_ms".to_string(), + JsonValue::Number(i64_from_u64(self.created_at_ms, "created_at_ms")), + ); + object.insert( + "updated_at_ms".to_string(), + JsonValue::Number(i64_from_u64(self.updated_at_ms, "updated_at_ms")), + ); + JsonValue::Object(object) + } + + fn touch(&mut self) { + self.updated_at_ms = current_time_millis(); } } @@ -324,6 +586,61 @@ impl ContentBlock { } } +impl SessionCompaction { + #[must_use] + pub fn to_json(&self) -> JsonValue { + let mut object = BTreeMap::new(); + object.insert( + "count".to_string(), + JsonValue::Number(i64::from(self.count)), + ); + object.insert( + "removed_message_count".to_string(), + JsonValue::Number(i64_from_usize( + self.removed_message_count, + "removed_message_count", + )), + ); + object.insert( + "summary".to_string(), + JsonValue::String(self.summary.clone()), + ); + JsonValue::Object(object) + } + + #[must_use] + pub fn to_jsonl_record(&self) -> JsonValue { + let mut object = self + .to_json() + .as_object() + .cloned() + .expect("compaction should render to object"); + object.insert( + "type".to_string(), + JsonValue::String("compaction".to_string()), + ); + JsonValue::Object(object) + } + + fn from_json(value: &JsonValue) -> Result { + let object = value + .as_object() + .ok_or_else(|| SessionError::Format("compaction must be an object".to_string()))?; + Ok(Self { + count: required_u32(object, "count")?, + removed_message_count: required_usize(object, "removed_message_count")?, + summary: required_string(object, "summary")?, + }) + } +} + +fn message_record(message: &ConversationMessage) -> JsonValue { + let mut object = BTreeMap::new(); + object.insert("type".to_string(), JsonValue::String("message".to_string())); + object.insert("message".to_string(), message.to_json()); + JsonValue::Object(object) +} + fn usage_to_json(usage: TokenUsage) -> JsonValue { let mut object = BTreeMap::new(); object.insert( @@ -376,22 +693,144 @@ fn required_u32(object: &BTreeMap, key: &str) -> Result, key: &str) -> Result { + let value = object + .get(key) + .ok_or_else(|| SessionError::Format(format!("missing {key}")))?; + required_u64_from_value(value, key) +} + +fn required_u64_from_value(value: &JsonValue, key: &str) -> Result { + let value = value + .as_i64() + .ok_or_else(|| SessionError::Format(format!("missing {key}")))?; + u64::try_from(value).map_err(|_| SessionError::Format(format!("{key} out of range"))) +} + +fn required_usize(object: &BTreeMap, key: &str) -> Result { + let value = object + .get(key) + .and_then(JsonValue::as_i64) + .ok_or_else(|| SessionError::Format(format!("missing {key}")))?; + usize::try_from(value).map_err(|_| SessionError::Format(format!("{key} out of range"))) +} + +fn i64_from_u64(value: u64, key: &str) -> i64 { + i64::try_from(value).unwrap_or_else(|_| panic!("{key} out of range for JSON number")) +} + +fn i64_from_usize(value: usize, key: &str) -> i64 { + i64::try_from(value).unwrap_or_else(|_| panic!("{key} out of range for JSON number")) +} + +fn current_time_millis() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|duration| duration.as_millis() as u64) + .unwrap_or_default() +} + +fn generate_session_id() -> String { + let millis = current_time_millis(); + let counter = SESSION_ID_COUNTER.fetch_add(1, Ordering::Relaxed); + format!("session-{millis}-{counter}") +} + +fn write_atomic(path: &Path, contents: &str) -> Result<(), SessionError> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent)?; + } + let temp_path = temporary_path_for(path); + fs::write(&temp_path, contents)?; + fs::rename(temp_path, path)?; + Ok(()) +} + +fn temporary_path_for(path: &Path) -> PathBuf { + let file_name = path + .file_name() + .and_then(|value| value.to_str()) + .unwrap_or("session"); + path.with_file_name(format!( + "{file_name}.tmp-{}-{}", + current_time_millis(), + SESSION_ID_COUNTER.fetch_add(1, Ordering::Relaxed) + )) +} + +fn rotate_session_file_if_needed(path: &Path) -> Result<(), SessionError> { + let Ok(metadata) = fs::metadata(path) else { + return Ok(()); + }; + if metadata.len() < ROTATE_AFTER_BYTES { + return Ok(()); + } + let rotated_path = rotated_log_path(path); + fs::rename(path, rotated_path)?; + Ok(()) +} + +fn rotated_log_path(path: &Path) -> PathBuf { + let stem = path + .file_stem() + .and_then(|value| value.to_str()) + .unwrap_or("session"); + path.with_file_name(format!("{stem}.rot-{}.jsonl", current_time_millis())) +} + +fn cleanup_rotated_logs(path: &Path) -> Result<(), SessionError> { + let Some(parent) = path.parent() else { + return Ok(()); + }; + let stem = path + .file_stem() + .and_then(|value| value.to_str()) + .unwrap_or("session"); + let prefix = format!("{stem}.rot-"); + let mut rotated_paths = fs::read_dir(parent)? + .filter_map(Result::ok) + .map(|entry| entry.path()) + .filter(|entry_path| { + entry_path + .file_name() + .and_then(|value| value.to_str()) + .is_some_and(|name| name.starts_with(&prefix) && name.ends_with(".jsonl")) + }) + .collect::>(); + + rotated_paths.sort_by_key(|entry_path| { + fs::metadata(entry_path) + .and_then(|metadata| metadata.modified()) + .unwrap_or(UNIX_EPOCH) + }); + + let remove_count = rotated_paths.len().saturating_sub(MAX_ROTATED_FILES); + for stale_path in rotated_paths.into_iter().take(remove_count) { + fs::remove_file(stale_path)?; + } + Ok(()) +} + #[cfg(test)] mod tests { - use super::{ContentBlock, ConversationMessage, MessageRole, Session}; + use super::{ + cleanup_rotated_logs, rotate_session_file_if_needed, ContentBlock, ConversationMessage, + MessageRole, Session, + }; + use crate::json::JsonValue; use crate::usage::TokenUsage; use std::fs; + use std::path::PathBuf; use std::time::{SystemTime, UNIX_EPOCH}; #[test] - fn persists_and_restores_session_json() { + fn persists_and_restores_session_jsonl() { let mut session = Session::new(); session - .messages - .push(ConversationMessage::user_text("hello")); + .push_user_text("hello") + .expect("user message should append"); session - .messages - .push(ConversationMessage::assistant_with_usage( + .push_message(ConversationMessage::assistant_with_usage( vec![ ContentBlock::Text { text: "thinking".to_string(), @@ -408,16 +847,15 @@ mod tests { cache_creation_input_tokens: 1, cache_read_input_tokens: 2, }), - )); - session.messages.push(ConversationMessage::tool_result( - "tool-1", "bash", "hi", false, - )); + )) + .expect("assistant message should append"); + session + .push_message(ConversationMessage::tool_result( + "tool-1", "bash", "hi", false, + )) + .expect("tool result should append"); - let nanos = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("system time should be after epoch") - .as_nanos(); - let path = std::env::temp_dir().join(format!("runtime-session-{nanos}.json")); + let path = temp_session_path("jsonl"); session.save_to_path(&path).expect("session should save"); let restored = Session::load_from_path(&path).expect("session should load"); fs::remove_file(&path).expect("temp file should be removable"); @@ -428,5 +866,128 @@ mod tests { restored.messages[1].usage.expect("usage").total_tokens(), 17 ); + assert_eq!(restored.session_id, session.session_id); + } + + #[test] + fn loads_legacy_session_json_object() { + let path = temp_session_path("legacy"); + let legacy = JsonValue::Object( + [ + ("version".to_string(), JsonValue::Number(1)), + ( + "messages".to_string(), + JsonValue::Array(vec![ConversationMessage::user_text("legacy").to_json()]), + ), + ] + .into_iter() + .collect(), + ); + fs::write(&path, legacy.render()).expect("legacy file should write"); + + let restored = Session::load_from_path(&path).expect("legacy session should load"); + fs::remove_file(&path).expect("temp file should be removable"); + + assert_eq!(restored.messages.len(), 1); + assert_eq!( + restored.messages[0], + ConversationMessage::user_text("legacy") + ); + assert!(!restored.session_id.is_empty()); + } + + #[test] + fn appends_messages_to_persisted_jsonl_session() { + let path = temp_session_path("append"); + let mut session = Session::new().with_persistence_path(path.clone()); + session + .save_to_path(&path) + .expect("initial save should succeed"); + session + .push_user_text("hi") + .expect("user append should succeed"); + session + .push_message(ConversationMessage::assistant(vec![ContentBlock::Text { + text: "hello".to_string(), + }])) + .expect("assistant append should succeed"); + + let restored = Session::load_from_path(&path).expect("session should replay from jsonl"); + fs::remove_file(&path).expect("temp file should be removable"); + + assert_eq!(restored.messages.len(), 2); + assert_eq!(restored.messages[0], ConversationMessage::user_text("hi")); + } + + #[test] + fn persists_compaction_metadata() { + let path = temp_session_path("compaction"); + let mut session = Session::new(); + session + .push_user_text("before") + .expect("message should append"); + session.record_compaction("summarized earlier work", 4); + session.save_to_path(&path).expect("session should save"); + + let restored = Session::load_from_path(&path).expect("session should load"); + fs::remove_file(&path).expect("temp file should be removable"); + + let compaction = restored.compaction.expect("compaction metadata"); + assert_eq!(compaction.count, 1); + assert_eq!(compaction.removed_message_count, 4); + assert!(compaction.summary.contains("summarized")); + } + + #[test] + fn rotates_and_cleans_up_large_session_logs() { + let path = temp_session_path("rotation"); + fs::write(&path, "x".repeat((super::ROTATE_AFTER_BYTES + 10) as usize)) + .expect("oversized file should write"); + rotate_session_file_if_needed(&path).expect("rotation should succeed"); + assert!( + !path.exists(), + "original path should be rotated away before rewrite" + ); + + for _ in 0..5 { + let rotated = super::rotated_log_path(&path); + fs::write(&rotated, "old").expect("rotated file should write"); + } + cleanup_rotated_logs(&path).expect("cleanup should succeed"); + + let rotated_count = rotation_files(&path).len(); + assert!(rotated_count <= super::MAX_ROTATED_FILES); + for rotated in rotation_files(&path) { + fs::remove_file(rotated).expect("rotated file should be removable"); + } + } + + fn temp_session_path(label: &str) -> PathBuf { + let nanos = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("system time should be after epoch") + .as_nanos(); + std::env::temp_dir().join(format!("runtime-session-{label}-{nanos}.json")) + } + + fn rotation_files(path: &PathBuf) -> Vec { + let stem = path + .file_stem() + .and_then(|value| value.to_str()) + .expect("temp path should have file stem") + .to_string(); + fs::read_dir(path.parent().expect("temp path should have parent")) + .expect("temp dir should read") + .filter_map(Result::ok) + .map(|entry| entry.path()) + .filter(|entry_path| { + entry_path + .file_name() + .and_then(|value| value.to_str()) + .is_some_and(|name| { + name.starts_with(&format!("{stem}.rot-")) && name.ends_with(".jsonl") + }) + }) + .collect() } } diff --git a/rust/crates/runtime/src/usage.rs b/rust/crates/runtime/src/usage.rs index 04e28df..4af4666 100644 --- a/rust/crates/runtime/src/usage.rs +++ b/rust/crates/runtime/src/usage.rs @@ -286,21 +286,19 @@ mod tests { #[test] fn reconstructs_usage_from_session_messages() { - let session = Session { - version: 1, - messages: vec![ConversationMessage { - role: MessageRole::Assistant, - blocks: vec![ContentBlock::Text { - text: "done".to_string(), - }], - usage: Some(TokenUsage { - input_tokens: 5, - output_tokens: 2, - cache_creation_input_tokens: 1, - cache_read_input_tokens: 0, - }), + let mut session = Session::new(); + session.messages = vec![ConversationMessage { + role: MessageRole::Assistant, + blocks: vec![ContentBlock::Text { + text: "done".to_string(), }], - }; + usage: Some(TokenUsage { + input_tokens: 5, + output_tokens: 2, + cache_creation_input_tokens: 1, + cache_read_input_tokens: 0, + }), + }]; let tracker = UsageTracker::from_session(&session); assert_eq!(tracker.turns(), 1); diff --git a/rust/crates/rusty-claude-cli/src/main.rs b/rust/crates/rusty-claude-cli/src/main.rs index 5f8a7a6..d60e33a 100644 --- a/rust/crates/rusty-claude-cli/src/main.rs +++ b/rust/crates/rusty-claude-cli/src/main.rs @@ -9,7 +9,7 @@ use std::io::{self, Read, Write}; use std::net::TcpListener; use std::path::{Path, PathBuf}; use std::process::Command; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::UNIX_EPOCH; use api::{ resolve_startup_auth_source, AnthropicClient, AuthSource, ContentBlockDelta, InputContentBlock, @@ -992,9 +992,10 @@ impl LiveCli { permission_mode: PermissionMode, ) -> Result> { let system_prompt = build_system_prompt()?; - let session = create_managed_session_handle()?; + let session_state = Session::new(); + let session = create_managed_session_handle(&session_state.session_id)?; let runtime = build_runtime( - Session::new(), + session_state.with_persistence_path(session.path.clone()), model.clone(), system_prompt.clone(), enable_tools, @@ -1297,9 +1298,10 @@ impl LiveCli { return Ok(false); } - self.session = create_managed_session_handle()?; + let session_state = Session::new(); + self.session = create_managed_session_handle(&session_state.session_id)?; self.runtime = build_runtime( - Session::new(), + session_state.with_persistence_path(self.session.path.clone()), self.model.clone(), self.system_prompt.clone(), true, @@ -1333,6 +1335,7 @@ impl LiveCli { let handle = resolve_session_reference(&session_ref)?; let session = Session::load_from_path(&handle.path)?; let message_count = session.messages.len(); + let session_id = session.session_id.clone(); self.runtime = build_runtime( session, self.model.clone(), @@ -1342,7 +1345,10 @@ impl LiveCli { self.allowed_tools.clone(), self.permission_mode, )?; - self.session = handle; + self.session = SessionHandle { + id: session_id, + path: handle.path, + }; println!( "{}", format_resume_report( @@ -1405,6 +1411,7 @@ impl LiveCli { let handle = resolve_session_reference(target)?; let session = Session::load_from_path(&handle.path)?; let message_count = session.messages.len(); + let session_id = session.session_id.clone(); self.runtime = build_runtime( session, self.model.clone(), @@ -1414,7 +1421,10 @@ impl LiveCli { self.allowed_tools.clone(), self.permission_mode, )?; - self.session = handle; + self.session = SessionHandle { + id: session_id, + path: handle.path, + }; println!( "Session switched\n Active session {}\n File {}\n Messages {}", self.session.id, @@ -1457,20 +1467,14 @@ fn sessions_dir() -> Result> { Ok(path) } -fn create_managed_session_handle() -> Result> { - let id = generate_session_id(); +fn create_managed_session_handle( + session_id: &str, +) -> Result> { + let id = session_id.to_string(); let path = sessions_dir()?.join(format!("{id}.json")); Ok(SessionHandle { id, path }) } -fn generate_session_id() -> String { - let millis = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map(|duration| duration.as_millis()) - .unwrap_or_default(); - format!("session-{millis}") -} - fn resolve_session_reference(reference: &str) -> Result> { let direct = PathBuf::from(reference); let path = if direct.exists() { @@ -1504,14 +1508,17 @@ fn list_managed_sessions() -> Result, Box