NAC_Blockchain/nac-api-server/src/main.rs

655 lines
24 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// NAC API Server v3.0.0 - 主网统一入口
//
// 架构:反向代理 + 聚合健康检查
// 所有主网微服务通过本服务统一对外暴露
//
// 端口映射:
// 本服务: 0.0.0.0:9550
// CBPP节点: localhost:9545
// CSNP服务: localhost:9546
// NVM服务: localhost:9547
// 宪法层: localhost:9548 (需Bearer Token)
// ACC服务: localhost:9554
// Charter服务: localhost:9555
// CNNL服务: localhost:8765
// GNACS服务: localhost:8001
// Wallet服务: localhost:9556
// Exchange服务: localhost:9557
// CIB服务: localhost:8091
// Onboarding: localhost:9552
//
// Chain ID: 5132611
// 协议: NRPC/4.0
use axum::{
body::Body,
extract::{Request, State},
http::{HeaderMap, HeaderValue, Method, StatusCode},
response::{IntoResponse, Response},
routing::{any, get},
Json, Router,
};
use reqwest::Client;
use serde::Serialize;
use std::{env, sync::Arc, time::Duration};
use tokio::time::timeout;
use tower_http::cors::{Any, CorsLayer};
use tracing::{error, info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
// ============================================================
// 服务注册表
// ============================================================
/// 主网微服务定义(保留供后续扩展)
#[allow(dead_code)]
#[derive(Clone, Debug)]
struct ServiceDef {
/// 服务名称
name: &'static str,
/// 服务描述
desc: &'static str,
/// 上游地址
upstream: &'static str,
/// 健康检查路径
health_path: &'static str,
/// 是否需要认证Bearer Token
auth_token: Option<&'static str>,
}
/// 应用状态
#[derive(Clone)]
struct AppState {
client: Client,
constitution_token: String,
}
// ============================================================
// 主函数
// ============================================================
#[tokio::main]
async fn main() {
// 初始化日志
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::new(
env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()),
))
.with(tracing_subscriber::fmt::layer())
.init();
let port = env::var("API_SERVER_PORT")
.unwrap_or_else(|_| "9550".to_string())
.parse::<u16>()
.expect("mainnet: API_SERVER_PORT must be a valid port number");
let host = env::var("API_SERVER_HOST").unwrap_or_else(|_| "0.0.0.0".to_string());
// 读取宪法层 Token
let constitution_token = env::var("CONSTITUTION_API_TOKEN").unwrap_or_else(|_| {
warn!("CONSTITUTION_API_TOKEN 未设置,宪法层代理将无法认证");
String::new()
});
// HTTP 客户端(带超时)
let client = Client::builder()
.timeout(Duration::from_secs(30))
.build()
.expect("mainnet: failed to build HTTP client");
let state = Arc::new(AppState {
client,
constitution_token,
});
// 构建路由
let app = build_router(state);
let addr = format!("{}:{}", host, port);
info!("╔══════════════════════════════════════════════════════╗");
info!("║ NAC API Server v3.0.0 - 主网统一入口 ║");
info!("║ 协议: NRPC/4.0 Chain ID: 5132611 ║");
info!("╚══════════════════════════════════════════════════════╝");
info!("监听地址: {}", addr);
info!("集成模块: CBPP / CSNP / NVM / 宪法层 / ACC / Charter");
info!(" CNNL / GNACS / Wallet / Exchange / CIB / Onboarding");
let listener = tokio::net::TcpListener::bind(&addr)
.await
.expect("mainnet: failed to bind address");
axum::serve(listener, app)
.await
.expect("mainnet: server error");
}
// ============================================================
// 路由构建
// ============================================================
fn build_router(state: Arc<AppState>) -> Router {
Router::new()
// ── 根端点 ──────────────────────────────────────────
.route("/", get(root_info))
.route("/health", get(health_check))
// ── 聚合健康检查 ─────────────────────────────────────
.route("/api/v1/health", get(aggregated_health))
.route("/api/v1/status", get(aggregated_health))
// ── CBPP 共识层 (9545) ───────────────────────────────
.route("/api/v1/cbpp/*path", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9545", req)
}))
.route("/api/v1/cbpp", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9545", req)
}))
// ── CSNP 网络层 (9546) ───────────────────────────────
.route("/api/v1/csnp/*path", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9546", req)
}))
.route("/api/v1/csnp", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9546", req)
}))
// ── NVM 虚拟机 (9547) ────────────────────────────────
.route("/api/v1/nvm/*path", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9547", req)
}))
.route("/api/v1/nvm", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9547", req)
}))
// ── 宪法层 (9548, 需Token) ───────────────────────────
.route("/api/v1/constitution/*path", any({
let s = state.clone();
move |req: Request| proxy_constitution(s.clone(), req)
}))
.route("/api/v1/constitution", any({
let s = state.clone();
move |req: Request| proxy_constitution(s.clone(), req)
}))
// ── ACC 协议层 (9554) ────────────────────────────────
.route("/api/v1/acc/*path", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9554", req)
}))
.route("/api/v1/acc", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9554", req)
}))
// ── Charter 智能合约 (9555) ──────────────────────────
.route("/api/v1/charter/*path", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9555", req)
}))
.route("/api/v1/charter", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9555", req)
}))
// ── CNNL 神经网络语言 (8765) ─────────────────────────
.route("/api/v1/cnnl/*path", any({
let s = state.clone();
move |req: Request| proxy_cnnl(s.clone(), req)
}))
.route("/api/v1/cnnl", any({
let s = state.clone();
move |req: Request| proxy_cnnl(s.clone(), req)
}))
// ── GNACS 资产分类 (8001) ────────────────────────────
.route("/api/v1/gnacs/*path", any({
let s = state.clone();
move |req: Request| proxy_gnacs(s.clone(), req)
}))
.route("/api/v1/gnacs", any({
let s = state.clone();
move |req: Request| proxy_gnacs(s.clone(), req)
}))
// ── Wallet 钱包服务 (9556) ───────────────────────────
.route("/api/v1/wallet/*path", any({
let s = state.clone();
move |req: Request| proxy_wallet(s.clone(), req)
}))
.route("/api/v1/wallet", any({
let s = state.clone();
move |req: Request| proxy_wallet(s.clone(), req)
}))
// ── Exchange 交易所 (9557) ───────────────────────────
.route("/api/v1/exchange/*path", any({
let s = state.clone();
move |req: Request| proxy_exchange(s.clone(), req)
}))
.route("/api/v1/exchange/", any({
let s = state.clone();
move |req: Request| proxy_exchange(s.clone(), req)
}))
.route("/api/v1/exchange", any({
let s = state.clone();
move |req: Request| proxy_exchange(s.clone(), req)
}))
// ── CIB 身份桥 (8091) ────────────────────────────────
.route("/api/v1/cib/*path", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:8091", req)
}))
.route("/api/v1/cib", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:8091", req)
}))
// ── Onboarding 资产上链 (9552) ───────────────────────
.route("/api/v1/onboarding/*path", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9552", req)
}))
.route("/api/v1/onboarding", any({
let s = state.clone();
move |req: Request| proxy_to(s.clone(), "http://localhost:9552", req)
}))
// ── CORS ─────────────────────────────────────────────
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any),
)
.with_state(state)
}
// ============================================================
// 根端点
// ============================================================
async fn root_info() -> Json<serde_json::Value> {
Json(serde_json::json!({
"service": "NAC API Server",
"version": "3.0.0",
"protocol": "NRPC/4.0",
"chain_id": 5132611,
"network": "mainnet",
"modules": [
{"name": "cbpp", "path": "/api/v1/cbpp", "upstream": "9545"},
{"name": "csnp", "path": "/api/v1/csnp", "upstream": "9546"},
{"name": "nvm", "path": "/api/v1/nvm", "upstream": "9547"},
{"name": "constitution", "path": "/api/v1/constitution", "upstream": "9548"},
{"name": "acc", "path": "/api/v1/acc", "upstream": "9554"},
{"name": "charter", "path": "/api/v1/charter", "upstream": "9555"},
{"name": "cnnl", "path": "/api/v1/cnnl", "upstream": "8765"},
{"name": "gnacs", "path": "/api/v1/gnacs", "upstream": "8001"},
{"name": "wallet", "path": "/api/v1/wallet", "upstream": "9556"},
{"name": "exchange", "path": "/api/v1/exchange", "upstream": "9557"},
{"name": "cib", "path": "/api/v1/cib", "upstream": "8091"},
{"name": "onboarding", "path": "/api/v1/onboarding", "upstream": "9552"}
],
"docs": "https://docs.newassetchain.io/api"
}))
}
async fn health_check() -> Json<serde_json::Value> {
Json(serde_json::json!({
"status": "ok",
"service": "nac-api-server",
"version": "3.0.0",
"protocol": "NRPC/4.0",
"chain_id": 5132611
}))
}
// ============================================================
// 聚合健康检查
// ============================================================
#[allow(dead_code)]
#[derive(Serialize)]
struct ServiceHealth {
name: String,
status: String,
upstream: String,
response_ms: u64,
detail: Option<serde_json::Value>,
}
async fn aggregated_health(State(state): State<Arc<AppState>>) -> Json<serde_json::Value> {
// 定义所有需要检查的服务
let services: Vec<(&str, &str, &str)> = vec![
("cbpp", "http://localhost:9545/health", "9545"),
("csnp", "http://localhost:9546/health", "9546"),
("nvm", "http://localhost:9547/health", "9547"),
("constitution", "http://localhost:9548/health", "9548"),
("acc", "http://localhost:9554/health", "9554"),
("charter", "http://localhost:9555/health", "9555"),
("cnnl", "http://localhost:8765/api/v1/health", "8765"),
("gnacs", "http://localhost:8001/api/health", "8001"),
("wallet", "http://localhost:9556/v1/health", "9556"),
("exchange", "http://localhost:9557/health", "9557"),
("cib", "http://localhost:8091/health", "8091"),
("onboarding", "http://localhost:9552/api/v1/health", "9552"),
];
let mut results = Vec::new();
let mut all_healthy = true;
for (name, url, upstream) in services {
let start = std::time::Instant::now();
let resp = timeout(
Duration::from_secs(3),
state.client.get(url).send(),
)
.await;
let elapsed_ms = start.elapsed().as_millis() as u64;
let (status, detail) = match resp {
Ok(Ok(r)) if r.status().is_success() => {
let body = r.json::<serde_json::Value>().await.ok();
("healthy".to_string(), body)
}
Ok(Ok(r)) => {
all_healthy = false;
(format!("unhealthy({})", r.status()), None)
}
Ok(Err(e)) => {
all_healthy = false;
warn!("服务 {} 健康检查失败: {}", name, e);
("unreachable".to_string(), None)
}
Err(_) => {
all_healthy = false;
warn!("服务 {} 健康检查超时", name);
("timeout".to_string(), None)
}
};
results.push(serde_json::json!({
"name": name,
"status": status,
"upstream": upstream,
"response_ms": elapsed_ms,
"detail": detail
}));
}
let overall = if all_healthy { "healthy" } else { "degraded" };
Json(serde_json::json!({
"status": overall,
"service": "nac-api-server",
"version": "3.0.0",
"protocol": "NRPC/4.0",
"chain_id": 5132611,
"network": "mainnet",
"timestamp": chrono::Utc::now().to_rfc3339(),
"services": results
}))
}
// ============================================================
// 通用反向代理
// ============================================================
async fn proxy_to(
state: Arc<AppState>,
upstream_base: &str,
req: Request,
) -> impl IntoResponse {
let (parts, body) = req.into_parts();
// 提取路径(去掉 /api/v1/{module} 前缀)
let path = parts.uri.path();
let upstream_path = strip_api_prefix(path);
// 构建上游 URL
let upstream_url = if let Some(query) = parts.uri.query() {
format!("{}{}?{}", upstream_base, upstream_path, query)
} else {
format!("{}{}", upstream_base, upstream_path)
};
forward_request(&state.client, parts.method, &upstream_url, parts.headers, body, None).await
}
async fn proxy_constitution(state: Arc<AppState>, req: Request) -> impl IntoResponse {
let (parts, body) = req.into_parts();
let path = parts.uri.path();
let upstream_path = strip_api_prefix(path);
let upstream_url = if let Some(query) = parts.uri.query() {
format!("http://localhost:9548{}?{}", upstream_path, query)
} else {
format!("http://localhost:9548{}", upstream_path)
};
// 宪法层需要 Bearer Token
let token = if !state.constitution_token.is_empty() {
Some(format!("Bearer {}", state.constitution_token))
} else {
None
};
forward_request(&state.client, parts.method, &upstream_url, parts.headers, body, token.as_deref()).await
}
async fn proxy_gnacs(state: Arc<AppState>, req: Request) -> impl IntoResponse {
let (parts, body) = req.into_parts();
let path = parts.uri.path();
// GNACS 路由结构:
// /api/health -> 健康检查
// /api/gnacs/classify -> 资产分类
// /api/gnacs/... -> 其他路由
//
// 代理路径映射:
// /api/v1/gnacs -> /api/gnacs/classify/list (默认)
// /api/v1/gnacs/health -> /api/health
// /api/v1/gnacs/xxx -> /api/gnacs/xxx
let upstream_path = if path.ends_with("/gnacs") || path == "/api/v1/gnacs" {
"/api/gnacs/classify/list".to_string()
} else {
let stripped = strip_api_prefix(path); // /health, /classify/list, etc.
if stripped == "/health" || stripped == "/" {
"/api/health".to_string()
} else {
format!("/api/gnacs{}", stripped)
}
};
let upstream_url = if let Some(query) = parts.uri.query() {
format!("http://localhost:8001{}?{}", upstream_path, query)
} else {
format!("http://localhost:8001{}", upstream_path)
};
forward_request(&state.client, parts.method, &upstream_url, parts.headers, body, None).await
}
async fn proxy_wallet(state: Arc<AppState>, req: Request) -> impl IntoResponse {
let (parts, body) = req.into_parts();
let path = parts.uri.path();
// Wallet 路由前缀是 /v1/...
let upstream_path = if path.ends_with("/wallet") || path == "/api/v1/wallet" {
"/v1/health".to_string()
} else {
let stripped = strip_api_prefix(path);
format!("/v1{}", stripped)
};
let upstream_url = if let Some(query) = parts.uri.query() {
format!("http://localhost:9556{}?{}", upstream_path, query)
} else {
format!("http://localhost:9556{}", upstream_path)
};
forward_request(&state.client, parts.method, &upstream_url, parts.headers, body, None).await
}
async fn proxy_cnnl(state: Arc<AppState>, req: Request) -> impl IntoResponse {
let (parts, body) = req.into_parts();
let path = parts.uri.path();
// CNNL 路由结构NAC 宪政神经网络语言服务):
// /api/v1/health -> 健康检查
// /api/v1/xxx -> 业务路由
//
// 代理路径映射NAC API Server -> CNNL 8765:
// /api/v1/cnnl -> /api/v1/health
// /api/v1/cnnl/health -> /api/v1/health
// /api/v1/cnnl/xxx -> /api/v1/xxx
let stripped = strip_api_prefix(path);
let upstream_path = if stripped == "/" || stripped.is_empty() || stripped == "/health" {
"/api/v1/health".to_string()
} else {
format!("/api/v1{}", stripped)
};
let upstream_url = if let Some(query) = parts.uri.query() {
format!("http://localhost:8765{}?{}", upstream_path, query)
} else {
format!("http://localhost:8765{}", upstream_path)
};
forward_request(&state.client, parts.method, &upstream_url, parts.headers, body, None).await
}
async fn proxy_exchange(state: Arc<AppState>, req: Request) -> impl IntoResponse {
let (parts, body) = req.into_parts();
let path = parts.uri.path();
// Exchange 路由结构NAC 原生,非以太坊):
// /health -> 健康检查
// 其他路径 -> 直接转发到 Exchange 服务
//
// 代理路径映射NAC API Server -> Exchange 9557:
// /api/v1/exchange -> /health
// /api/v1/exchange/ -> /health
// /api/v1/exchange/health -> /health
// /api/v1/exchange/xxx -> /xxx
let stripped = strip_api_prefix(path);
let upstream_path = if stripped == "/" || stripped.is_empty() {
"/health".to_string()
} else {
stripped
};
let upstream_url = if let Some(query) = parts.uri.query() {
format!("http://localhost:9557{}?{}", upstream_path, query)
} else {
format!("http://localhost:9557{}", upstream_path)
};
forward_request(&state.client, parts.method, &upstream_url, parts.headers, body, None).await
}
// ============================================================
// 路径处理工具
// ============================================================
/// 去掉 /api/v1/{module} 前缀,保留后续路径
fn strip_api_prefix(path: &str) -> String {
// /api/v1/cbpp/health -> /health
// /api/v1/cbpp -> /
let parts: Vec<&str> = path.splitn(5, '/').collect();
// parts: ["", "api", "v1", "module", "rest..."]
if parts.len() >= 5 && !parts[4].is_empty() {
format!("/{}", parts[4])
} else {
"/".to_string()
}
}
// ============================================================
// HTTP 转发核心
// ============================================================
async fn forward_request(
client: &Client,
method: Method,
url: &str,
headers: HeaderMap,
body: Body,
extra_auth: Option<&str>,
) -> Response {
// 收集请求体
let body_bytes = match axum::body::to_bytes(body, 10 * 1024 * 1024).await {
Ok(b) => b,
Err(e) => {
error!("读取请求体失败: {}", e);
return (StatusCode::BAD_GATEWAY, "Failed to read request body").into_response();
}
};
// 构建上游请求
let mut req_builder = client.request(
reqwest::Method::from_bytes(method.as_str().as_bytes())
.unwrap_or(reqwest::Method::GET),
url,
);
// 转发请求头(过滤 host 和 connection
for (name, value) in &headers {
let name_str = name.as_str().to_lowercase();
if name_str == "host" || name_str == "connection" || name_str == "transfer-encoding" {
continue;
}
if let Ok(v) = value.to_str() {
req_builder = req_builder.header(name.as_str(), v);
}
}
// 注入额外认证头
if let Some(auth) = extra_auth {
req_builder = req_builder.header("Authorization", auth);
}
// 设置请求体
if !body_bytes.is_empty() {
req_builder = req_builder.body(body_bytes.to_vec());
}
// 发送请求
match req_builder.send().await {
Ok(resp) => {
let status = StatusCode::from_u16(resp.status().as_u16())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let mut response_headers = HeaderMap::new();
for (name, value) in resp.headers() {
let name_str = name.as_str().to_lowercase();
if name_str == "transfer-encoding" || name_str == "connection" {
continue;
}
if let Ok(v) = HeaderValue::from_bytes(value.as_bytes()) {
if let Ok(n) = axum::http::HeaderName::from_bytes(name.as_str().as_bytes()) {
response_headers.insert(n, v);
}
}
}
let body_bytes = resp.bytes().await.unwrap_or_default();
let mut response = Response::new(Body::from(body_bytes));
*response.status_mut() = status;
*response.headers_mut() = response_headers;
response
}
Err(e) => {
error!("代理请求失败 {}: {}", url, e);
(
StatusCode::BAD_GATEWAY,
Json(serde_json::json!({
"error": "upstream service unavailable",
"url": url,
"detail": e.to_string()
})),
)
.into_response()
}
}
}