nac-onboarding/backend/routers/assets.py

259 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
NAC 一键上链系统 - 资产管理路由
版本: 2.1 (集成GNACS微服务 + 20大类 + 跨境交易双轨合规)
"""
import logging
import random
import string
import httpx
from datetime import datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Header
from database import assets_col, now_utc, GNACS_SERVICE_URL
from models import AssetCreateRequest, OnboardingStep
from routers.auth import get_current_user as auth_get_current_user
from bson import ObjectId
logger = logging.getLogger("nac-onboarding.assets")
router = APIRouter()
def serialize_doc(doc: dict) -> dict:
if doc is None:
return {}
result = {}
for k, v in doc.items():
if k == "_id":
continue
elif isinstance(v, ObjectId):
result[k] = str(v)
elif isinstance(v, datetime):
result[k] = v.isoformat()
elif isinstance(v, dict):
result[k] = serialize_doc(v)
elif isinstance(v, list):
result[k] = [serialize_doc(i) if isinstance(i, dict) else i for i in v]
else:
result[k] = v
return result
def generate_asset_id(asset_type: str) -> str:
type_map = {
"RealEstate": "RE", "FinancialSecurities": "FS", "Commodities": "CM",
"ArtCollectibles": "AT", "IntellectualProperty": "IP", "DigitalAssets": "DA",
"Infrastructure": "IF", "NaturalResources": "NR", "EnvironmentalRights": "ER",
"CorporateEquity": "CE", "DebtAssets": "DB", "InsuranceAssets": "IN",
"AgriculturalAssets": "AG", "TransportationAssets": "TR", "EquipmentMachinery": "EM",
"DataAssets": "DT", "IntangibleBusiness": "IB", "SportsAssets": "SP",
"CulturalEntertainment": "CU", "Custom": "CX",
}
prefix = type_map.get(asset_type, "XX")
rand = "".join(random.choices("0123456789ABCDEF", k=12))
return f"NAC-{prefix}-{rand}"
async def call_gnacs(endpoint: str, params: dict = None) -> dict:
try:
async with httpx.AsyncClient(timeout=3.0) as client:
resp = await client.get(f"{GNACS_SERVICE_URL}{endpoint}", params=params)
if resp.status_code == 200:
return resp.json()
except Exception as e:
logger.warning(f"GNACS调用失败: {e}")
return {}
def determine_tx_type(jurisdiction: str, investor_jurisdiction: str = None) -> str:
if not investor_jurisdiction:
return "domestic"
j, ij = jurisdiction.upper(), investor_jurisdiction.upper()
if j == ij:
return "domestic"
asean = {"SG", "MY", "TH", "ID", "PH", "VN", "MM", "KH", "LA", "BN"}
gcc = {"AE", "SA", "QA", "KW", "BH", "OM"}
if j in asean and ij in asean:
return "domestic_asean"
if j in gcc and ij in gcc:
return "domestic_gcc"
return "cross_border"
def get_kyc_req(asset_type: str, tx_type: str) -> int:
base = {
"RealEstate": 2, "FinancialSecurities": 3, "Commodities": 2,
"ArtCollectibles": 2, "IntellectualProperty": 2, "DigitalAssets": 1,
"Infrastructure": 3, "NaturalResources": 3, "EnvironmentalRights": 2,
"CorporateEquity": 3, "DebtAssets": 3, "InsuranceAssets": 3,
"AgriculturalAssets": 2, "TransportationAssets": 2, "EquipmentMachinery": 2,
"DataAssets": 2, "IntangibleBusiness": 2, "SportsAssets": 2,
"CulturalEntertainment": 2, "Custom": 2,
}
kyc = base.get(asset_type, 2)
if tx_type == "cross_border":
kyc = max(kyc, 3)
return kyc
def get_acc_standard(asset_type: str) -> str:
acc_map = {
"RealEstate": "ACC-20", "FinancialSecurities": "ACC-1400", "Commodities": "ACC-20",
"ArtCollectibles": "ACC-721", "IntellectualProperty": "ACC-721", "DigitalAssets": "ACC-20",
"Infrastructure": "ACC-20", "NaturalResources": "ACC-1155", "EnvironmentalRights": "ACC-1155",
"CorporateEquity": "ACC-1400", "DebtAssets": "ACC-1400", "InsuranceAssets": "ACC-1400",
"AgriculturalAssets": "ACC-1155", "TransportationAssets": "ACC-20", "EquipmentMachinery": "ACC-20",
"DataAssets": "ACC-20", "IntangibleBusiness": "ACC-721", "SportsAssets": "ACC-721",
"CulturalEntertainment": "ACC-721", "Custom": "ACC-20",
}
return acc_map.get(asset_type, "ACC-20")
# Use auth.py's get_current_user for proper DEV_MODE support
get_current_user = auth_get_current_user
@router.post("")
async def create_asset(req: AssetCreateRequest, current_user: dict = Depends(get_current_user)):
tx_type = req.transaction_type or determine_tx_type(req.jurisdiction, req.investor_jurisdiction)
required_kyc = get_kyc_req(req.asset_type, tx_type)
user_kyc = current_user.get("kyc_level", 0)
if user_kyc < required_kyc:
raise HTTPException(
status_code=403,
detail=f"KYC等级不足{req.asset_type}({tx_type})需要KYC-{required_kyc}当前KYC-{user_kyc}"
)
gnacs_info = await call_gnacs("/api/gnacs/classify/info", {"asset_type": req.asset_type})
acc_standard = get_acc_standard(req.asset_type)
asset_id = generate_asset_id(req.asset_type)
doc = {
"asset_id": asset_id,
"name": req.name,
"asset_type": req.asset_type,
"asset_subtype": req.asset_subtype,
"acc_standard": acc_standard,
"gnacs_info": gnacs_info.get("data", {}),
"jurisdiction": req.jurisdiction,
"investor_jurisdiction": req.investor_jurisdiction or req.jurisdiction,
"transaction_type": tx_type,
"owner_id": current_user["did"],
"owner_kyc_level": user_kyc,
"total_supply": req.total_supply,
"asset_value": req.asset_value,
"currency": req.currency,
"xtzh_staked": req.xtzh_staked,
"xtzh_ratio": 0.8,
"description": req.description,
"details": req.details,
"created_at": now_utc(),
"updated_at": now_utc(),
"onboarding_status": {
"current_step": OnboardingStep.APPLICATION_SUBMITTED,
"progress": 5,
"is_active": True,
"history": [{
"step": OnboardingStep.APPLICATION_SUBMITTED,
"status": "completed",
"timestamp": now_utc().isoformat(),
"operator": current_user["did"],
"details": f"申请提交,类型:{req.asset_type},辖区:{req.jurisdiction},交易类型:{tx_type}ACC:{acc_standard}"
}]
},
"compliance": None, "valuation": None, "dna": None, "warrant": None,
"rights_offering": None, "custody": None, "xtzh": None, "token": None, "documents": []
}
await assets_col.insert_one(doc)
logger.info(f"资产申请: {asset_id} by {current_user['did']} ({tx_type})")
return {
"success": True,
"data": {
"asset_id": asset_id,
"current_step": OnboardingStep.APPLICATION_SUBMITTED,
"progress": 5,
"transaction_type": tx_type,
"acc_standard": acc_standard,
"required_kyc": required_kyc
},
"message": f"资产申请已提交: {asset_id},交易类型: {tx_type}"
}
@router.get("")
async def list_assets(current_user: dict = Depends(get_current_user), page: int = 1, limit: int = 20):
skip = (page - 1) * limit
cursor = assets_col.find({"owner_id": current_user["did"]}, sort=[("created_at", -1)]).skip(skip).limit(limit)
assets = []
async for doc in cursor:
assets.append(serialize_doc(doc))
total = await assets_col.count_documents({"owner_id": current_user["did"]})
return {"success": True, "data": {"assets": assets, "total": total, "page": page, "limit": limit}}
@router.get("/admin/all")
async def admin_list_all(current_user: dict = Depends(get_current_user)):
if current_user.get("role") not in ["admin", "operator"]:
raise HTTPException(status_code=403, detail="需要管理员权限")
cursor = assets_col.find({}, sort=[("created_at", -1)]).limit(100)
assets = []
async for doc in cursor:
assets.append(serialize_doc(doc))
total = await assets_col.count_documents({})
return {"success": True, "data": {"assets": assets, "total": total}}
@router.get("/{asset_id}")
async def get_asset(asset_id: str, current_user: dict = Depends(get_current_user)):
doc = await assets_col.find_one({"asset_id": asset_id, "owner_id": current_user["did"]})
if not doc:
raise HTTPException(status_code=404, detail="资产不存在")
serialized = serialize_doc(doc)
# 聚合链上关键信息到 onchain_info 字段
serialized["onchain_info"] = _build_onchain_info(doc)
return {"success": True, "data": serialized}
def _build_onchain_info(doc: dict) -> dict:
"""从各步骤的专用字段中聚合链上关键信息"""
# dna_hash: Step 5 generate-dna 写入 doc["dna"]["hash"]
dna = doc.get("dna") or {}
dna_hash = dna.get("hash") or "--"
# chain_tx: Step 6 chain-confirm 写入 doc["warrant"]["tx_hash"]
warrant = doc.get("warrant") or {}
chain_tx = warrant.get("tx_hash") or "--"
block_height = warrant.get("block_height") or "--"
# token_symbol / token_address: Step 14 issue-token 写入 doc["token"]
token = doc.get("token") or {}
token_symbol = token.get("symbol") or "--"
token_address = token.get("address") or "--"
token_supply = token.get("total_supply") or "--"
issue_tx_hash = token.get("issue_tx_hash") or "--"
# xtzh_minted: Step 13 mint-xtzh 写入 doc["xtzh"]["amount"]
xtzh = doc.get("xtzh") or {}
xtzh_minted = xtzh.get("amount") or "--"
xtzh_mint_tx = xtzh.get("mint_tx") or "--"
# warrant_id: Step 9 issue-warrant 写入 doc["warrant_cert"]
warrant_cert = doc.get("warrant_cert") or {}
warrant_id = warrant_cert.get("warrant_id") or "--"
# custody_tx: Step 11 custody 写入 doc["custody"]["warrant_custody_tx"]
custody = doc.get("custody") or {}
custody_tx = custody.get("warrant_custody_tx") or "--"
return {
"dna_hash": dna_hash,
"chain_tx": chain_tx,
"block_height": block_height,
"token_symbol": token_symbol,
"token_address": token_address,
"token_supply": token_supply,
"issue_tx_hash": issue_tx_hash,
"xtzh_minted": xtzh_minted,
"xtzh_mint_tx": xtzh_mint_tx,
"warrant_id": warrant_id,
"custody_tx": custody_tx,
"is_complete": doc.get("onboarding_status", {}).get("progress", 0) == 100
}