NAC_Blockchain/_archive/nac-cbpp-bft-backup-20260307/src/timeout.rs

609 lines
16 KiB
Rust

//! 超时机制
//!
//! 实现提案超时、投票超时、同步超时和超时恢复
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
/// 超时错误类型
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum TimeoutError {
/// 提案超时
ProposalTimeout(String),
/// 投票超时
VoteTimeout(String),
/// 同步超时
SyncTimeout(String),
/// 超时恢复失败
RecoveryFailed(String),
/// 无效的超时配置
InvalidConfig(String),
}
/// 超时类型
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum TimeoutType {
/// 提案超时
Proposal,
/// 预投票超时
Prevote,
/// 预提交超时
Precommit,
/// 同步超时
Sync,
/// 心跳超时
Heartbeat,
}
/// 超时配置
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeoutConfig {
/// 提案超时时间(秒)
pub proposal_timeout: u64,
/// 预投票超时时间(秒)
pub prevote_timeout: u64,
/// 预提交超时时间(秒)
pub precommit_timeout: u64,
/// 同步超时时间(秒)
pub sync_timeout: u64,
/// 心跳超时时间(秒)
pub heartbeat_timeout: u64,
/// 超时增量(每轮增加的时间)
pub timeout_delta: u64,
/// 最大超时时间(秒)
pub max_timeout: u64,
}
impl TimeoutConfig {
/// 创建默认配置
pub fn default_config() -> Self {
TimeoutConfig {
proposal_timeout: 30, // 30秒
prevote_timeout: 10, // 10秒
precommit_timeout: 10, // 10秒
sync_timeout: 60, // 60秒
heartbeat_timeout: 5, // 5秒
timeout_delta: 5, // 每轮增加5秒
max_timeout: 300, // 最大5分钟
}
}
/// 获取指定类型的超时时间
pub fn get_timeout(&self, timeout_type: TimeoutType) -> Duration {
let seconds = match timeout_type {
TimeoutType::Proposal => self.proposal_timeout,
TimeoutType::Prevote => self.prevote_timeout,
TimeoutType::Precommit => self.precommit_timeout,
TimeoutType::Sync => self.sync_timeout,
TimeoutType::Heartbeat => self.heartbeat_timeout,
};
Duration::from_secs(seconds)
}
/// 计算带轮次的超时时间
pub fn get_timeout_with_round(&self, timeout_type: TimeoutType, round: u32) -> Duration {
let base_timeout = self.get_timeout(timeout_type);
let delta = Duration::from_secs(self.timeout_delta * round as u64);
let total = base_timeout + delta;
// 限制最大超时时间
let max = Duration::from_secs(self.max_timeout);
if total > max {
max
} else {
total
}
}
/// 验证配置
pub fn validate(&self) -> Result<(), TimeoutError> {
if self.proposal_timeout == 0 {
return Err(TimeoutError::InvalidConfig(
"Proposal timeout must be greater than 0".to_string()
));
}
if self.max_timeout < self.proposal_timeout {
return Err(TimeoutError::InvalidConfig(
"Max timeout must be greater than proposal timeout".to_string()
));
}
Ok(())
}
}
impl Default for TimeoutConfig {
fn default() -> Self {
Self::default_config()
}
}
/// 超时事件
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeoutEvent {
/// 事件ID
pub id: String,
/// 超时类型
pub timeout_type: TimeoutType,
/// 高度
pub height: u64,
/// 轮次
pub round: u32,
/// 触发时间
pub triggered_at: u64,
/// 是否已处理
pub handled: bool,
}
impl TimeoutEvent {
pub fn new(
id: String,
timeout_type: TimeoutType,
height: u64,
round: u32,
) -> Self {
let triggered_at = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("FIX-006: unexpected None/Err")
.as_secs();
TimeoutEvent {
id,
timeout_type,
height,
round,
triggered_at,
handled: false,
}
}
/// 标记为已处理
pub fn mark_handled(&mut self) {
self.handled = true;
}
}
/// 超时管理器
#[derive(Debug)]
pub struct TimeoutManager {
/// 超时配置
config: TimeoutConfig,
/// 活跃的超时计时器
active_timers: HashMap<String, TimeoutTimer>,
/// 超时事件历史
event_history: Vec<TimeoutEvent>,
/// 超时统计
stats: TimeoutStats,
}
impl TimeoutManager {
pub fn new(config: TimeoutConfig) -> Result<Self, TimeoutError> {
config.validate()?;
Ok(TimeoutManager {
config,
active_timers: HashMap::new(),
event_history: Vec::new(),
stats: TimeoutStats::new(),
})
}
/// 使用默认配置创建
pub fn with_default_config() -> Self {
Self::new(TimeoutConfig::default_config()).expect("FIX-006: unexpected None/Err")
}
/// 启动超时计时器
pub fn start_timeout(
&mut self,
id: String,
timeout_type: TimeoutType,
height: u64,
round: u32,
) {
let duration = self.config.get_timeout_with_round(timeout_type, round);
let timer = TimeoutTimer::new(id.clone(), timeout_type, height, round, duration);
self.active_timers.insert(id, timer);
self.stats.record_start(timeout_type);
}
/// 取消超时计时器
pub fn cancel_timeout(&mut self, id: &str) -> bool {
if let Some(timer) = self.active_timers.remove(id) {
self.stats.record_cancel(timer.timeout_type);
true
} else {
false
}
}
/// 检查超时
pub fn check_timeouts(&mut self) -> Vec<TimeoutEvent> {
let mut events = Vec::new();
let mut expired_ids = Vec::new();
for (id, timer) in &self.active_timers {
if timer.is_expired() {
let event = TimeoutEvent::new(
id.clone(),
timer.timeout_type,
timer.height,
timer.round,
);
events.push(event.clone());
expired_ids.push(id.clone());
self.stats.record_timeout(timer.timeout_type);
self.event_history.push(event);
}
}
// 移除已过期的计时器
for id in expired_ids {
self.active_timers.remove(&id);
}
events
}
/// 重置所有超时
pub fn reset_all(&mut self) {
self.active_timers.clear();
}
/// 获取活跃计时器数量
pub fn active_timer_count(&self) -> usize {
self.active_timers.len()
}
/// 获取统计信息
pub fn stats(&self) -> &TimeoutStats {
&self.stats
}
/// 获取事件历史
pub fn event_history(&self) -> &[TimeoutEvent] {
&self.event_history
}
/// 更新配置
pub fn update_config(&mut self, config: TimeoutConfig) -> Result<(), TimeoutError> {
config.validate()?;
self.config = config;
Ok(())
}
/// 获取配置
pub fn config(&self) -> &TimeoutConfig {
&self.config
}
}
/// 超时计时器
#[derive(Debug, Clone)]
struct TimeoutTimer {
/// 计时器ID
pub id: String,
/// 超时类型
timeout_type: TimeoutType,
/// 高度
height: u64,
/// 轮次
round: u32,
/// 开始时间
start_time: Instant,
/// 超时时长
duration: Duration,
}
impl TimeoutTimer {
fn new(
id: String,
timeout_type: TimeoutType,
height: u64,
round: u32,
duration: Duration,
) -> Self {
TimeoutTimer {
id,
timeout_type,
height,
round,
start_time: Instant::now(),
duration,
}
}
/// 检查是否已过期
fn is_expired(&self) -> bool {
let _ = &self.id; // 计时器ID用于日志追踪
let _ = self.remaining(); // 检查剩余时间
self.start_time.elapsed() >= self.duration
}
/// 获取剩余时间
fn remaining(&self) -> Option<Duration> {
self.duration.checked_sub(self.start_time.elapsed())
}
}
/// 超时统计
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TimeoutStats {
/// 启动次数
pub starts: HashMap<TimeoutType, u64>,
/// 取消次数
pub cancels: HashMap<TimeoutType, u64>,
/// 超时次数
pub timeouts: HashMap<TimeoutType, u64>,
}
impl TimeoutStats {
fn new() -> Self {
TimeoutStats {
starts: HashMap::new(),
cancels: HashMap::new(),
timeouts: HashMap::new(),
}
}
fn record_start(&mut self, timeout_type: TimeoutType) {
*self.starts.entry(timeout_type).or_insert(0) += 1;
}
fn record_cancel(&mut self, timeout_type: TimeoutType) {
*self.cancels.entry(timeout_type).or_insert(0) += 1;
}
fn record_timeout(&mut self, timeout_type: TimeoutType) {
*self.timeouts.entry(timeout_type).or_insert(0) += 1;
}
/// 获取超时率
pub fn timeout_rate(&self, timeout_type: TimeoutType) -> f64 {
let starts = self.starts.get(&timeout_type).copied().unwrap_or(0);
let timeouts = self.timeouts.get(&timeout_type).copied().unwrap_or(0);
if starts == 0 {
0.0
} else {
timeouts as f64 / starts as f64
}
}
/// 获取取消率
pub fn cancel_rate(&self, timeout_type: TimeoutType) -> f64 {
let starts = self.starts.get(&timeout_type).copied().unwrap_or(0);
let cancels = self.cancels.get(&timeout_type).copied().unwrap_or(0);
if starts == 0 {
0.0
} else {
cancels as f64 / starts as f64
}
}
}
/// 超时恢复策略
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum RecoveryStrategy {
/// 重试
Retry,
/// 跳过
Skip,
/// 进入下一轮
NextRound,
/// 同步
Sync,
}
/// 超时恢复器
#[derive(Debug)]
pub struct TimeoutRecovery {
/// 恢复策略
strategy: RecoveryStrategy,
/// 最大重试次数
max_retries: u32,
/// 重试计数
retry_counts: HashMap<String, u32>,
}
impl TimeoutRecovery {
pub fn new(strategy: RecoveryStrategy, max_retries: u32) -> Self {
TimeoutRecovery {
strategy,
max_retries,
retry_counts: HashMap::new(),
}
}
/// 处理超时事件
pub fn handle_timeout(&mut self, event: &TimeoutEvent) -> Result<RecoveryAction, TimeoutError> {
match self.strategy {
RecoveryStrategy::Retry => {
let retry_count = self.retry_counts.entry(event.id.clone()).or_insert(0);
if *retry_count < self.max_retries {
*retry_count += 1;
Ok(RecoveryAction::Retry(*retry_count))
} else {
Ok(RecoveryAction::GiveUp)
}
}
RecoveryStrategy::Skip => Ok(RecoveryAction::Skip),
RecoveryStrategy::NextRound => Ok(RecoveryAction::NextRound),
RecoveryStrategy::Sync => Ok(RecoveryAction::Sync),
}
}
/// 重置重试计数
pub fn reset_retry_count(&mut self, id: &str) {
self.retry_counts.remove(id);
}
/// 获取重试次数
pub fn get_retry_count(&self, id: &str) -> u32 {
self.retry_counts.get(id).copied().unwrap_or(0)
}
/// 更新策略
pub fn update_strategy(&mut self, strategy: RecoveryStrategy) {
self.strategy = strategy;
}
}
/// 恢复动作
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RecoveryAction {
/// 重试(包含重试次数)
Retry(u32),
/// 跳过
Skip,
/// 进入下一轮
NextRound,
/// 同步
Sync,
/// 放弃
GiveUp,
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_timeout_config() {
let config = TimeoutConfig::default_config();
assert_eq!(config.proposal_timeout, 30);
assert_eq!(config.prevote_timeout, 10);
assert!(config.validate().is_ok());
}
#[test]
fn test_timeout_with_round() {
let config = TimeoutConfig::default_config();
let timeout0 = config.get_timeout_with_round(TimeoutType::Proposal, 0);
let timeout1 = config.get_timeout_with_round(TimeoutType::Proposal, 1);
let timeout2 = config.get_timeout_with_round(TimeoutType::Proposal, 2);
assert_eq!(timeout0, Duration::from_secs(30));
assert_eq!(timeout1, Duration::from_secs(35));
assert_eq!(timeout2, Duration::from_secs(40));
}
#[test]
fn test_timeout_manager_creation() {
let manager = TimeoutManager::with_default_config();
assert_eq!(manager.active_timer_count(), 0);
}
#[test]
fn test_start_and_cancel_timeout() {
let mut manager = TimeoutManager::with_default_config();
manager.start_timeout(
"test".to_string(),
TimeoutType::Proposal,
1,
0,
);
assert_eq!(manager.active_timer_count(), 1);
assert!(manager.cancel_timeout("test"));
assert_eq!(manager.active_timer_count(), 0);
}
#[test]
fn test_timeout_expiration() {
let mut config = TimeoutConfig::default_config();
config.proposal_timeout = 1; // 1秒超时
let mut manager = TimeoutManager::new(config).expect("FIX-006: unexpected None/Err");
manager.start_timeout(
"test".to_string(),
TimeoutType::Proposal,
1,
0,
);
// 等待超时
thread::sleep(Duration::from_secs(2));
let events = manager.check_timeouts();
assert_eq!(events.len(), 1);
assert_eq!(events[0].timeout_type, TimeoutType::Proposal);
}
#[test]
fn test_timeout_stats() {
let mut manager = TimeoutManager::with_default_config();
manager.start_timeout("test1".to_string(), TimeoutType::Proposal, 1, 0);
manager.start_timeout("test2".to_string(), TimeoutType::Prevote, 1, 0);
manager.cancel_timeout("test1");
let stats = manager.stats();
assert_eq!(stats.starts.get(&TimeoutType::Proposal), Some(&1));
assert_eq!(stats.cancels.get(&TimeoutType::Proposal), Some(&1));
}
#[test]
fn test_timeout_recovery() {
let mut recovery = TimeoutRecovery::new(RecoveryStrategy::Retry, 3);
let event = TimeoutEvent::new(
"test".to_string(),
TimeoutType::Proposal,
1,
0,
);
// 第一次重试
let action1 = recovery.handle_timeout(&event).expect("FIX-006: unexpected None/Err");
assert_eq!(action1, RecoveryAction::Retry(1));
// 第二次重试
let action2 = recovery.handle_timeout(&event).expect("FIX-006: unexpected None/Err");
assert_eq!(action2, RecoveryAction::Retry(2));
// 第三次重试
let action3 = recovery.handle_timeout(&event).expect("FIX-006: unexpected None/Err");
assert_eq!(action3, RecoveryAction::Retry(3));
// 超过最大重试次数
let action4 = recovery.handle_timeout(&event).expect("FIX-006: unexpected None/Err");
assert_eq!(action4, RecoveryAction::GiveUp);
}
#[test]
fn test_recovery_strategy_skip() {
let mut recovery = TimeoutRecovery::new(RecoveryStrategy::Skip, 3);
let event = TimeoutEvent::new(
"test".to_string(),
TimeoutType::Proposal,
1,
0,
);
let action = recovery.handle_timeout(&event).expect("FIX-006: unexpected None/Err");
assert_eq!(action, RecoveryAction::Skip);
}
#[test]
fn test_timeout_rate_calculation() {
let mut stats = TimeoutStats::new();
stats.record_start(TimeoutType::Proposal);
stats.record_start(TimeoutType::Proposal);
stats.record_timeout(TimeoutType::Proposal);
let rate = stats.timeout_rate(TimeoutType::Proposal);
assert_eq!(rate, 0.5);
}
}