//! 实时估值系统 //! //! 提供实时数据获取、实时计算、结果缓存和快速响应 use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use anyhow::{Result, Context}; use std::collections::HashMap; use std::sync::{Arc, RwLock}; use chrono::{DateTime, Utc, Duration}; use tokio::sync::Semaphore; use crate::{Asset, Jurisdiction, InternationalAgreement, FinalValuationResult, ValuationEngine}; /// 实时数据源 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RealtimeDataSource { /// XTZH实时价格(USD) pub xtzh_price_usd: Decimal, /// 汇率数据 pub exchange_rates: HashMap, /// 市场指数 pub market_indices: HashMap, /// 更新时间 pub updated_at: DateTime, } impl RealtimeDataSource { /// 创建新的数据源 pub fn new() -> Self { Self { xtzh_price_usd: Decimal::new(100, 0), exchange_rates: HashMap::new(), market_indices: HashMap::new(), updated_at: Utc::now(), } } /// 从外部API获取实时数据 pub async fn fetch_from_api(&mut self) -> Result<()> { log::info!("获取实时市场数据..."); // TODO: 实际调用市场数据API // 这里模拟数据获取 // 获取XTZH价格 self.xtzh_price_usd = self.fetch_xtzh_price().await?; // 获取汇率 self.exchange_rates = self.fetch_exchange_rates().await?; // 获取市场指数 self.market_indices = self.fetch_market_indices().await?; self.updated_at = Utc::now(); log::info!("实时数据更新完成: XTZH={} USD", self.xtzh_price_usd); Ok(()) } /// 获取XTZH实时价格 async fn fetch_xtzh_price(&self) -> Result { // TODO: 调用XTZH价格API // 暂时返回模拟数据 Ok(Decimal::new(10050, 2)) // 100.50 USD } /// 获取汇率数据 async fn fetch_exchange_rates(&self) -> Result> { // TODO: 调用汇率API let mut rates = HashMap::new(); rates.insert("USD".to_string(), Decimal::new(1, 0)); rates.insert("EUR".to_string(), Decimal::new(92, 2)); // 0.92 rates.insert("GBP".to_string(), Decimal::new(79, 2)); // 0.79 rates.insert("CNY".to_string(), Decimal::new(725, 2)); // 7.25 rates.insert("JPY".to_string(), Decimal::new(14850, 2)); // 148.50 Ok(rates) } /// 获取市场指数 async fn fetch_market_indices(&self) -> Result> { // TODO: 调用市场指数API let mut indices = HashMap::new(); indices.insert("SP500".to_string(), 5000.0); indices.insert("NASDAQ".to_string(), 16000.0); indices.insert("DOW".to_string(), 38000.0); indices.insert("FTSE".to_string(), 7800.0); indices.insert("DAX".to_string(), 17500.0); Ok(indices) } /// 检查数据是否过期(超过5分钟) pub fn is_stale(&self) -> bool { Utc::now() - self.updated_at > Duration::minutes(5) } } impl Default for RealtimeDataSource { fn default() -> Self { Self::new() } } /// 缓存条目 #[derive(Debug, Clone)] struct CacheEntry { /// 估值结果 result: FinalValuationResult, /// 缓存时间 cached_at: DateTime, /// 访问次数 access_count: u64, } impl CacheEntry { /// 检查缓存是否过期(超过10分钟) fn is_expired(&self) -> bool { Utc::now() - self.cached_at > Duration::minutes(10) } } /// 估值缓存 pub struct ValuationCache { /// 缓存存储 cache: Arc>>, /// 最大缓存条目数 max_entries: usize, } impl ValuationCache { /// 创建新的缓存 pub fn new(max_entries: usize) -> Self { Self { cache: Arc::new(RwLock::new(HashMap::new())), max_entries, } } /// 生成缓存键 fn generate_key( asset_id: &str, jurisdiction: Jurisdiction, agreement: InternationalAgreement, ) -> String { format!("{}_{:?}_{:?}", asset_id, jurisdiction, agreement) } /// 获取缓存结果 pub fn get( &self, asset_id: &str, jurisdiction: Jurisdiction, agreement: InternationalAgreement, ) -> Option { let key = Self::generate_key(asset_id, jurisdiction, agreement); let mut cache = self.cache.write().unwrap(); if let Some(entry) = cache.get_mut(&key) { if !entry.is_expired() { entry.access_count += 1; log::debug!("缓存命中: {} (访问次数: {})", key, entry.access_count); return Some(entry.result.clone()); } else { log::debug!("缓存过期: {}", key); cache.remove(&key); } } None } /// 设置缓存结果 pub fn set( &self, asset_id: &str, jurisdiction: Jurisdiction, agreement: InternationalAgreement, result: FinalValuationResult, ) { let key = Self::generate_key(asset_id, jurisdiction, agreement); let mut cache = self.cache.write().unwrap(); // 如果缓存已满,删除最旧的条目 if cache.len() >= self.max_entries { if let Some(oldest_key) = cache.iter() .min_by_key(|(_, entry)| entry.cached_at) .map(|(k, _)| k.clone()) { cache.remove(&oldest_key); log::debug!("缓存已满,删除最旧条目: {}", oldest_key); } } cache.insert(key.clone(), CacheEntry { result, cached_at: Utc::now(), access_count: 0, }); log::debug!("缓存已更新: {}", key); } /// 清除所有缓存 pub fn clear(&self) { let mut cache = self.cache.write().unwrap(); cache.clear(); log::info!("缓存已清空"); } /// 清除过期缓存 pub fn clear_expired(&self) { let mut cache = self.cache.write().unwrap(); cache.retain(|key, entry| { let keep = !entry.is_expired(); if !keep { log::debug!("清除过期缓存: {}", key); } keep }); } /// 获取缓存统计 pub fn stats(&self) -> CacheStats { let cache = self.cache.read().unwrap(); let total_entries = cache.len(); let total_accesses: u64 = cache.values().map(|e| e.access_count).sum(); let expired_entries = cache.values().filter(|e| e.is_expired()).count(); CacheStats { total_entries, total_accesses, expired_entries, max_entries: self.max_entries, } } } /// 缓存统计 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct CacheStats { /// 总条目数 pub total_entries: usize, /// 总访问次数 pub total_accesses: u64, /// 过期条目数 pub expired_entries: usize, /// 最大条目数 pub max_entries: usize, } /// 实时估值引擎 pub struct RealtimeValuationEngine { /// 基础估值引擎 base_engine: Arc, /// 实时数据源 data_source: Arc>, /// 估值缓存 cache: ValuationCache, /// 并发限制 semaphore: Arc, } impl RealtimeValuationEngine { /// 创建新的实时估值引擎 pub fn new( base_engine: ValuationEngine, max_cache_entries: usize, max_concurrent_requests: usize, ) -> Self { Self { base_engine: Arc::new(base_engine), data_source: Arc::new(RwLock::new(RealtimeDataSource::new())), cache: ValuationCache::new(max_cache_entries), semaphore: Arc::new(Semaphore::new(max_concurrent_requests)), } } /// 更新实时数据 pub async fn update_realtime_data(&self) -> Result<()> { let mut data_source = self.data_source.write().unwrap(); data_source.fetch_from_api().await?; // 清除缓存,因为市场数据已更新 self.cache.clear(); Ok(()) } /// 获取实时数据 pub fn get_realtime_data(&self) -> RealtimeDataSource { self.data_source.read().unwrap().clone() } /// 实时估值(带缓存) pub async fn appraise_realtime( &self, asset: &Asset, jurisdiction: Jurisdiction, agreement: InternationalAgreement, ) -> Result { // 检查缓存 if let Some(cached_result) = self.cache.get(&asset.id, jurisdiction, agreement) { log::info!("使用缓存结果: 资产ID={}", asset.id); return Ok(cached_result); } // 检查数据是否过期 { let data_source = self.data_source.read().unwrap(); if data_source.is_stale() { log::warn!("实时数据已过期,建议更新"); } } // 获取并发许可 let _permit = self.semaphore.acquire().await .context("获取并发许可失败")?; log::info!("执行实时估值: 资产ID={}", asset.id); // 执行估值 let result = self.base_engine.appraise( asset, jurisdiction, agreement, ).await?; // 缓存结果 self.cache.set(&asset.id, jurisdiction, agreement, result.clone()); Ok(result) } /// 批量实时估值 pub async fn appraise_batch_realtime( &self, requests: Vec<(Asset, Jurisdiction, InternationalAgreement)>, ) -> Vec> { let mut tasks = Vec::new(); for (asset, jurisdiction, agreement) in requests { let engine = self.clone_arc(); let task = tokio::spawn(async move { engine.appraise_realtime(&asset, jurisdiction, agreement).await }); tasks.push(task); } let mut results = Vec::new(); for task in tasks { match task.await { Ok(result) => results.push(result), Err(e) => results.push(Err(anyhow::anyhow!("任务执行失败: {}", e))), } } results } /// 克隆Arc引用(用于异步任务) fn clone_arc(&self) -> Arc { Arc::new(Self { base_engine: Arc::clone(&self.base_engine), data_source: Arc::clone(&self.data_source), cache: ValuationCache { cache: Arc::clone(&self.cache.cache), max_entries: self.cache.max_entries, }, semaphore: Arc::clone(&self.semaphore), }) } /// 获取缓存统计 pub fn cache_stats(&self) -> CacheStats { self.cache.stats() } /// 清除过期缓存 pub fn clear_expired_cache(&self) { self.cache.clear_expired(); } /// 清除所有缓存 pub fn clear_all_cache(&self) { self.cache.clear(); } } /// 实时估值配置 #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RealtimeConfig { /// 最大缓存条目数 pub max_cache_entries: usize, /// 最大并发请求数 pub max_concurrent_requests: usize, /// 数据更新间隔(秒) pub data_update_interval_secs: u64, /// 缓存过期时间(秒) pub cache_expiry_secs: u64, } impl Default for RealtimeConfig { fn default() -> Self { Self { max_cache_entries: 1000, max_concurrent_requests: 10, data_update_interval_secs: 300, // 5分钟 cache_expiry_secs: 600, // 10分钟 } } } #[cfg(test)] mod tests { use super::*; use crate::{AssetType, ValuationEngineConfig}; #[test] fn test_realtime_data_source() { let data_source = RealtimeDataSource::new(); assert_eq!(data_source.xtzh_price_usd, Decimal::new(100, 0)); assert!(!data_source.is_stale()); } #[test] fn test_cache_key_generation() { let key = ValuationCache::generate_key( "asset_001", Jurisdiction::US, InternationalAgreement::WTO, ); assert!(key.contains("asset_001")); assert!(key.contains("US")); assert!(key.contains("WTO")); } #[test] fn test_cache_operations() { let cache = ValuationCache::new(10); let result = FinalValuationResult { valuation_xtzh: Decimal::new(1000000, 0), confidence: 0.85, model_results: vec![], weights: HashMap::new(), is_anomaly: false, anomaly_report: None, divergence_report: "Test".to_string(), requires_human_review: false, }; // 设置缓存 cache.set("asset_001", Jurisdiction::US, InternationalAgreement::WTO, result.clone()); // 获取缓存 let cached = cache.get("asset_001", Jurisdiction::US, InternationalAgreement::WTO); assert!(cached.is_some()); assert_eq!(cached.unwrap().valuation_xtzh, Decimal::new(1000000, 0)); // 统计 let stats = cache.stats(); assert_eq!(stats.total_entries, 1); assert_eq!(stats.total_accesses, 1); } #[tokio::test] #[ignore] // 需要真实的估值引擎 async fn test_realtime_valuation() { // 需要真实的估值引擎进行测试 } }