fix(#1): 修复 onchain_info 字段聚合不完整 - 新增 _build_onchain_info()

This commit is contained in:
nacadmin 2026-03-22 23:06:18 +08:00
parent 096580bdc3
commit d576e9bf90
1 changed files with 258 additions and 0 deletions

258
backend/routers/assets.py Normal file
View File

@ -0,0 +1,258 @@
"""
NAC 一键上链系统 - 资产管理路由
版本: 2.1 (集成GNACS微服务 + 20大类 + 跨境交易双轨合规)
"""
import logging
import random
import string
import httpx
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Header
from database import assets_col, now_utc, GNACS_SERVICE_URL
from models import AssetCreateRequest, OnboardingStep
from routers.auth import get_current_user as auth_get_current_user
from bson import ObjectId
logger = logging.getLogger("nac-onboarding.assets")
router = APIRouter()
def serialize_doc(doc: dict) -> dict:
if doc is None:
return {}
result = {}
for k, v in doc.items():
if k == "_id":
continue
elif isinstance(v, ObjectId):
result[k] = str(v)
elif isinstance(v, datetime):
result[k] = v.isoformat()
elif isinstance(v, dict):
result[k] = serialize_doc(v)
elif isinstance(v, list):
result[k] = [serialize_doc(i) if isinstance(i, dict) else i for i in v]
else:
result[k] = v
return result
def generate_asset_id(asset_type: str) -> str:
type_map = {
"RealEstate": "RE", "FinancialSecurities": "FS", "Commodities": "CM",
"ArtCollectibles": "AT", "IntellectualProperty": "IP", "DigitalAssets": "DA",
"Infrastructure": "IF", "NaturalResources": "NR", "EnvironmentalRights": "ER",
"CorporateEquity": "CE", "DebtAssets": "DB", "InsuranceAssets": "IN",
"AgriculturalAssets": "AG", "TransportationAssets": "TR", "EquipmentMachinery": "EM",
"DataAssets": "DT", "IntangibleBusiness": "IB", "SportsAssets": "SP",
"CulturalEntertainment": "CU", "Custom": "CX",
}
prefix = type_map.get(asset_type, "XX")
rand = "".join(random.choices("0123456789ABCDEF", k=12))
return f"NAC-{prefix}-{rand}"
async def call_gnacs(endpoint: str, params: dict = None) -> dict:
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"{GNACS_SERVICE_URL}{endpoint}", params=params)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.warning(f"GNACS调用失败: {e}")
return {}
def determine_tx_type(jurisdiction: str, investor_jurisdiction: str = None) -> str:
if not investor_jurisdiction:
return "domestic"
j, ij = jurisdiction.upper(), investor_jurisdiction.upper()
if j == ij:
return "domestic"
asean = {"SG", "MY", "TH", "ID", "PH", "VN", "MM", "KH", "LA", "BN"}
gcc = {"AE", "SA", "QA", "KW", "BH", "OM"}
if j in asean and ij in asean:
return "domestic_asean"
if j in gcc and ij in gcc:
return "domestic_gcc"
return "cross_border"
def get_kyc_req(asset_type: str, tx_type: str) -> int:
base = {
"RealEstate": 2, "FinancialSecurities": 3, "Commodities": 2,
"ArtCollectibles": 2, "IntellectualProperty": 2, "DigitalAssets": 1,
"Infrastructure": 3, "NaturalResources": 3, "EnvironmentalRights": 2,
"CorporateEquity": 3, "DebtAssets": 3, "InsuranceAssets": 3,
"AgriculturalAssets": 2, "TransportationAssets": 2, "EquipmentMachinery": 2,
"DataAssets": 2, "IntangibleBusiness": 2, "SportsAssets": 2,
"CulturalEntertainment": 2, "Custom": 2,
}
kyc = base.get(asset_type, 2)
if tx_type == "cross_border":
kyc = max(kyc, 3)
return kyc
def get_acc_standard(asset_type: str) -> str:
acc_map = {
"RealEstate": "ACC-20", "FinancialSecurities": "ACC-1400", "Commodities": "ACC-20",
"ArtCollectibles": "ACC-721", "IntellectualProperty": "ACC-721", "DigitalAssets": "ACC-20",
"Infrastructure": "ACC-20", "NaturalResources": "ACC-1155", "EnvironmentalRights": "ACC-1155",
"CorporateEquity": "ACC-1400", "DebtAssets": "ACC-1400", "InsuranceAssets": "ACC-1400",
"AgriculturalAssets": "ACC-1155", "TransportationAssets": "ACC-20", "EquipmentMachinery": "ACC-20",
"DataAssets": "ACC-20", "IntangibleBusiness": "ACC-721", "SportsAssets": "ACC-721",
"CulturalEntertainment": "ACC-721", "Custom": "ACC-20",
}
return acc_map.get(asset_type, "ACC-20")
# Use auth.py's get_current_user for proper DEV_MODE support
get_current_user = auth_get_current_user
@router.post("")
async def create_asset(req: AssetCreateRequest, current_user: dict = Depends(get_current_user)):
tx_type = req.transaction_type or determine_tx_type(req.jurisdiction, req.investor_jurisdiction)
required_kyc = get_kyc_req(req.asset_type, tx_type)
user_kyc = current_user.get("kyc_level", 0)
if user_kyc < required_kyc:
raise HTTPException(
status_code=403,
detail=f"KYC等级不足{req.asset_type}({tx_type})需要KYC-{required_kyc}当前KYC-{user_kyc}"
)
gnacs_info = await call_gnacs("/api/gnacs/classify/info", {"asset_type": req.asset_type})
acc_standard = get_acc_standard(req.asset_type)
asset_id = generate_asset_id(req.asset_type)
doc = {
"asset_id": asset_id,
"name": req.name,
"asset_type": req.asset_type,
"asset_subtype": req.asset_subtype,
"acc_standard": acc_standard,
"gnacs_info": gnacs_info.get("data", {}),
"jurisdiction": req.jurisdiction,
"investor_jurisdiction": req.investor_jurisdiction or req.jurisdiction,
"transaction_type": tx_type,
"owner_id": current_user["did"],
"owner_kyc_level": user_kyc,
"total_supply": req.total_supply,
"asset_value": req.asset_value,
"currency": req.currency,
"xtzh_staked": req.xtzh_staked,
"xtzh_ratio": 0.8,
"description": req.description,
"details": req.details,
"created_at": now_utc(),
"updated_at": now_utc(),
"onboarding_status": {
"current_step": OnboardingStep.APPLICATION_SUBMITTED,
"progress": 5,
"is_active": True,
"history": [{
"step": OnboardingStep.APPLICATION_SUBMITTED,
"status": "completed",
"timestamp": now_utc().isoformat(),
"operator": current_user["did"],
"details": f"申请提交,类型:{req.asset_type},辖区:{req.jurisdiction},交易类型:{tx_type}ACC:{acc_standard}"
}]
},
"compliance": None, "valuation": None, "dna": None, "warrant": None,
"rights_offering": None, "custody": None, "xtzh": None, "token": None, "documents": []
}
await assets_col.insert_one(doc)
logger.info(f"资产申请: {asset_id} by {current_user['did']} ({tx_type})")
return {
"success": True,
"data": {
"asset_id": asset_id,
"current_step": OnboardingStep.APPLICATION_SUBMITTED,
"progress": 5,
"transaction_type": tx_type,
"acc_standard": acc_standard,
"required_kyc": required_kyc
},
"message": f"资产申请已提交: {asset_id},交易类型: {tx_type}"
}
@router.get("")
async def list_assets(current_user: dict = Depends(get_current_user), page: int = 1, limit: int = 20):
skip = (page - 1) * limit
cursor = assets_col.find({"owner_id": current_user["did"]}, sort=[("created_at", -1)]).skip(skip).limit(limit)
assets = []
async for doc in cursor:
assets.append(serialize_doc(doc))
total = await assets_col.count_documents({"owner_id": current_user["did"]})
return {"success": True, "data": {"assets": assets, "total": total, "page": page, "limit": limit}}
@router.get("/admin/all")
async def admin_list_all(current_user: dict = Depends(get_current_user)):
if current_user.get("role") not in ["admin", "operator"]:
raise HTTPException(status_code=403, detail="需要管理员权限")
cursor = assets_col.find({}, sort=[("created_at", -1)]).limit(100)
assets = []
async for doc in cursor:
assets.append(serialize_doc(doc))
total = await assets_col.count_documents({})
return {"success": True, "data": {"assets": assets, "total": total}}
@router.get("/{asset_id}")
async def get_asset(asset_id: str, current_user: dict = Depends(get_current_user)):
doc = await assets_col.find_one({"asset_id": asset_id, "owner_id": current_user["did"]})
if not doc:
raise HTTPException(status_code=404, detail="资产不存在")
serialized = serialize_doc(doc)
# 聚合链上关键信息到 onchain_info 字段
serialized["onchain_info"] = _build_onchain_info(doc)
return {"success": True, "data": serialized}
def _build_onchain_info(doc: dict) -> dict:
"""从各步骤的专用字段中聚合链上关键信息"""
# dna_hash: Step 5 generate-dna 写入 doc["dna"]["hash"]
dna = doc.get("dna") or {}
dna_hash = dna.get("hash") or "--"
# chain_tx: Step 6 chain-confirm 写入 doc["warrant"]["tx_hash"]
warrant = doc.get("warrant") or {}
chain_tx = warrant.get("tx_hash") or "--"
block_height = warrant.get("block_height") or "--"
# token_symbol / token_address: Step 14 issue-token 写入 doc["token"]
token = doc.get("token") or {}
token_symbol = token.get("symbol") or "--"
token_address = token.get("address") or "--"
token_supply = token.get("total_supply") or "--"
issue_tx_hash = token.get("issue_tx_hash") or "--"
# xtzh_minted: Step 13 mint-xtzh 写入 doc["xtzh"]["amount"]
xtzh = doc.get("xtzh") or {}
xtzh_minted = xtzh.get("amount") or "--"
xtzh_mint_tx = xtzh.get("mint_tx") or "--"
# warrant_id: Step 9 issue-warrant 写入 doc["warrant_cert"]
warrant_cert = doc.get("warrant_cert") or {}
warrant_id = warrant_cert.get("warrant_id") or "--"
# custody_tx: Step 11 custody 写入 doc["custody"]["warrant_custody_tx"]
custody = doc.get("custody") or {}
custody_tx = custody.get("warrant_custody_tx") or "--"
return {
"dna_hash": dna_hash,
"chain_tx": chain_tx,
"block_height": block_height,
"token_symbol": token_symbol,
"token_address": token_address,
"token_supply": token_supply,
"issue_tx_hash": issue_tx_hash,
"xtzh_minted": xtzh_minted,
"xtzh_mint_tx": xtzh_mint_tx,
"warrant_id": warrant_id,
"custody_tx": custody_tx,
"is_complete": doc.get("onboarding_status", {}).get("progress", 0) == 100
}