NAC_Blockchain/nac-onboarding-system/src/database.rs

278 lines
8.9 KiB
Rust

// NAC资产一键上链系统 - 数据库配置模块
// 管理数据库连接池和初始化
use sqlx::{MySql, Pool, mysql::MySqlPoolOptions};
use std::time::Duration;
use crate::error::{OnboardingError, Result};
/// 数据库连接池
pub type DbPool = Pool<MySql>;
/// 数据库配置
#[derive(Debug, Clone)]
pub struct DatabaseConfig {
pub host: String,
pub port: u16,
pub username: String,
pub password: String,
pub database: String,
pub max_connections: u32,
pub min_connections: u32,
pub connect_timeout: u64,
pub idle_timeout: u64,
}
impl Default for DatabaseConfig {
fn default() -> Self {
Self {
host: "localhost".to_string(),
port: 3306,
username: "root".to_string(),
password: "".to_string(),
database: "nac_onboarding".to_string(),
max_connections: 100,
min_connections: 10,
connect_timeout: 30,
idle_timeout: 600,
}
}
}
impl DatabaseConfig {
/// 从环境变量创建配置
pub fn from_env() -> Self {
Self {
host: std::env::var("DB_HOST").unwrap_or_else(|_| "localhost".to_string()),
port: std::env::var("DB_PORT")
.unwrap_or_else(|_| "3306".to_string())
.parse()
.unwrap_or(3306),
username: std::env::var("DB_USERNAME").unwrap_or_else(|_| "root".to_string()),
password: std::env::var("DB_PASSWORD").unwrap_or_else(|_| "".to_string()),
database: std::env::var("DB_DATABASE").unwrap_or_else(|_| "nac_onboarding".to_string()),
max_connections: std::env::var("DB_MAX_CONNECTIONS")
.unwrap_or_else(|_| "100".to_string())
.parse()
.unwrap_or(100),
min_connections: std::env::var("DB_MIN_CONNECTIONS")
.unwrap_or_else(|_| "10".to_string())
.parse()
.unwrap_or(10),
connect_timeout: std::env::var("DB_CONNECT_TIMEOUT")
.unwrap_or_else(|_| "30".to_string())
.parse()
.unwrap_or(30),
idle_timeout: std::env::var("DB_IDLE_TIMEOUT")
.unwrap_or_else(|_| "600".to_string())
.parse()
.unwrap_or(600),
}
}
/// 生成数据库连接URL
pub fn connection_url(&self) -> String {
format!(
"mysql://{}:{}@{}:{}/{}",
self.username, self.password, self.host, self.port, self.database
)
}
}
/// 创建数据库连接池
pub async fn create_pool(config: &DatabaseConfig) -> Result<DbPool> {
log::info!("正在创建数据库连接池...");
log::info!("数据库地址: {}:{}", config.host, config.port);
log::info!("数据库名称: {}", config.database);
log::info!("最大连接数: {}", config.max_connections);
log::info!("最小连接数: {}", config.min_connections);
let pool = MySqlPoolOptions::new()
.max_connections(config.max_connections)
.min_connections(config.min_connections)
.acquire_timeout(Duration::from_secs(config.connect_timeout))
.idle_timeout(Duration::from_secs(config.idle_timeout))
.connect(&config.connection_url())
.await
.map_err(|e| {
log::error!("数据库连接失败: {}", e);
OnboardingError::DatabaseError(format!("无法连接到数据库: {}", e))
})?;
log::info!("数据库连接池创建成功!");
Ok(pool)
}
/// 测试数据库连接
pub async fn test_connection(pool: &DbPool) -> Result<()> {
log::info!("正在测试数据库连接...");
sqlx::query("SELECT 1")
.fetch_one(pool)
.await
.map_err(|e| {
log::error!("数据库连接测试失败: {}", e);
OnboardingError::DatabaseError(format!("数据库连接测试失败: {}", e))
})?;
log::info!("数据库连接测试成功!");
Ok(())
}
/// 初始化数据库(创建表)
pub async fn initialize_database(pool: &DbPool) -> Result<()> {
log::info!("正在初始化数据库...");
// 创建users表
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS users (
id VARCHAR(36) PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
email VARCHAR(100) NOT NULL UNIQUE,
full_name VARCHAR(100),
kyc_level INT NOT NULL DEFAULT 0,
role VARCHAR(20) NOT NULL DEFAULT 'user',
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email),
INDEX idx_role (role)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"#
)
.execute(pool)
.await?;
log::info!("users表创建成功");
// 创建assets表
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS assets (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(36) NOT NULL,
asset_type VARCHAR(50) NOT NULL,
asset_info JSON NOT NULL,
legal_docs JSON NOT NULL,
kyc_level INT NOT NULL,
jurisdiction VARCHAR(50) NOT NULL,
state VARCHAR(50) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE,
INDEX idx_user_id (user_id),
INDEX idx_state (state),
INDEX idx_asset_type (asset_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"#
)
.execute(pool)
.await?;
log::info!("assets表创建成功");
// 创建onboarding_records表
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS onboarding_records (
id VARCHAR(36) PRIMARY KEY,
asset_id VARCHAR(36) NOT NULL,
state VARCHAR(50) NOT NULL,
compliance_result JSON,
valuation_result JSON,
dna_result JSON,
custody_result JSON,
xtzh_result JSON,
token_result JSON,
listing_result JSON,
crs JSON,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
FOREIGN KEY (asset_id) REFERENCES assets(id) ON DELETE CASCADE,
INDEX idx_asset_id (asset_id),
INDEX idx_state (state)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"#
)
.execute(pool)
.await?;
log::info!("onboarding_records表创建成功");
// 创建state_transitions表
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS state_transitions (
id VARCHAR(36) PRIMARY KEY,
record_id VARCHAR(36) NOT NULL,
from_state VARCHAR(50) NOT NULL,
to_state VARCHAR(50) NOT NULL,
transition_data JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (record_id) REFERENCES onboarding_records(id) ON DELETE CASCADE,
INDEX idx_record_id (record_id),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
"#
)
.execute(pool)
.await?;
log::info!("state_transitions表创建成功");
log::info!("数据库初始化完成!");
Ok(())
}
/// 创建默认管理员账户
pub async fn create_default_admin(pool: &DbPool) -> Result<()> {
log::info!("正在创建默认管理员账户...");
// 检查是否已存在管理员
let admin_exists: (i64,) = sqlx::query_as(
"SELECT COUNT(*) FROM users WHERE role = 'admin'"
)
.fetch_one(pool)
.await?;
if admin_exists.0 > 0 {
log::info!("管理员账户已存在,跳过创建");
return Ok(());
}
// 创建管理员账户
let admin_id = uuid::Uuid::new_v4().to_string();
let password_hash = bcrypt::hash("admin123456", bcrypt::DEFAULT_COST)
.map_err(|e| OnboardingError::InternalError(format!("密码哈希失败: {}", e)))?;
sqlx::query(
r#"
INSERT INTO users (id, username, password_hash, email, full_name, kyc_level, role, is_active)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
"#
)
.bind(&admin_id)
.bind("admin")
.bind(&password_hash)
.bind("admin@newassetchain.io")
.bind("系统管理员")
.bind(3)
.bind("admin")
.bind(true)
.execute(pool)
.await?;
log::info!("默认管理员账户创建成功!");
log::info!("用户名: admin");
log::info!("密码: admin123456");
log::info!("请在生产环境中立即修改默认密码!");
Ok(())
}