""" NAC 一键上链系统 - 17步上链流程核心路由 完整流程: 阶段一(确权与权证): 1. 资产申请上链 (assets.py) 2. 进入资产合规流程 3. 资产审批流程 4. 资产估值流程 5. 产生加密DNA 6. 获得链上确权 7. 产生TOKEN(权证数据结构) 8. 产生CODE 9. 发放链上权证凭证 阶段二(权益衍生与托管): 10. 衍生权益化资产 11. 上链托管TOKEN所有权 12. 资产交第三方托管 13. 铸造XTZH 阶段三(发行与流通): 14. 发行权益化资产代币 15. 区块链浏览器显示 16. 装入钱包 17. 交易所上线 """ import hashlib import secrets import logging import time from datetime import datetime, timezone from fastapi import APIRouter, Depends, HTTPException from bson import ObjectId from database import assets_col, get_compliance_rules, now_utc from models import OnboardingStep, RightsDeriveRequest, CustodyRequest from routers.auth import get_current_user, require_kyc from nac_lens import submit_asset_transaction, issue_token_on_chain, get_block_height from nac_wallet_client import sign_transaction, estimate_fee, get_wallet logger = logging.getLogger("nac-onboarding.onboarding") router = APIRouter() # GNACS分类映射 GNACS_MAP = { "RealEstate": {"category": "RE", "sub": {"CN": "RE.CN.RES", "US": "RE.US.COM", "HK": "RE.HK.MIX", "default": "RE.XX.GEN"}}, "FinancialSecurities": {"category": "EQ", "sub": {"default": "EQ.XX.STK"}}, "Commodities": {"category": "CM", "sub": {"default": "CM.XX.PHY"}}, "Collectibles": {"category": "AT", "sub": {"default": "AT.XX.ART"}}, "IntellectualProperty":{"category": "IP", "sub": {"default": "IP.XX.PAT"}}, "DigitalAssets": {"category": "DA", "sub": {"default": "DA.XX.NFT"}}, "Infrastructure": {"category": "IF", "sub": {"default": "IF.XX.GEN"}}, "NaturalResources": {"category": "NR", "sub": {"default": "NR.XX.MIN"}}, "EnvironmentalRights": {"category": "ER", "sub": {"EU": "ER.EU.ETS", "default": "ER.XX.CAR"}}, "BusinessInterests": {"category": "BI", "sub": {"default": "BI.XX.EQT"}}, "Receivables": {"category": "RC", "sub": {"default": "RC.XX.LNS"}}, "InsuranceProducts": {"category": "IN", "sub": {"default": "IN.XX.GEN"}}, "AgriculturalAssets": {"category": "AG", "sub": {"default": "AG.XX.CRP"}}, "Vehicles": {"category": "VH", "sub": {"default": "VH.XX.GEN"}}, "MachineryEquipment": {"category": "ME", "sub": {"default": "ME.XX.IND"}}, "DataAssets": {"category": "DT", "sub": {"default": "DT.XX.GEN"}}, "BrandAssets": {"category": "BR", "sub": {"default": "BR.XX.TRD"}}, "SportsAssets": {"category": "SP", "sub": {"default": "SP.XX.PLY"}}, "EntertainmentAssets": {"category": "EN", "sub": {"default": "EN.XX.MED"}}, "Other": {"category": "OT", "sub": {"default": "OT.XX.GEN"}}, } async def get_asset_or_403(asset_id: str, owner_did: str): doc = await assets_col.find_one({"asset_id": asset_id, "owner_id": owner_did}) if not doc: raise HTTPException(status_code=404, detail="资产不存在或无权访问") return doc async def advance_step(asset_id: str, new_step: str, progress: int, details: dict, operator_did: str, update_fields: dict = None): """推进工作流到下一步,原子性更新状态""" history_entry = { "step": new_step, "status": "completed", "timestamp": now_utc().isoformat(), "operator": operator_did, "details": str(details)[:500] } update = { "$set": { "onboarding_status.current_step": new_step, "onboarding_status.progress": progress, "updated_at": now_utc(), **(update_fields or {}) }, "$push": { "onboarding_status.history": history_entry } } await assets_col.update_one({"asset_id": asset_id}, update) def check_step(doc: dict, expected_step: str): """检查当前步骤是否符合预期""" current = doc.get("onboarding_status", {}).get("current_step", "") if current != expected_step: raise HTTPException( status_code=400, detail=f"步骤错误:当前步骤为 [{current}],此操作需要步骤 [{expected_step}]" ) # ===== Step 2: 启动合规流程 ===== @router.post("/{asset_id}/start-compliance") async def start_compliance(asset_id: str, current_user: dict = Depends(require_kyc(2))): """Step 2: 进入资产合规流程""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.APPLICATION_SUBMITTED) # 加载动态合规规则 rule = await get_compliance_rules(doc["jurisdiction"], doc["asset_type"]) required_docs = rule.get("rules", {}).get("required_docs", []) if rule else [] await advance_step(asset_id, OnboardingStep.COMPLIANCE_REVIEW, 15, {"required_docs": required_docs, "rule_applied": bool(rule)}, current_user["did"], {"compliance": {"status": "reviewing", "required_docs": required_docs, "rule": rule.get("rules", {}) if rule else {}}}) return { "success": True, "data": {"current_step": OnboardingStep.COMPLIANCE_REVIEW, "progress": 15, "required_docs": required_docs}, "message": "合规审查已启动" } # ===== Step 3: 资产审批 ===== @router.post("/{asset_id}/run-compliance") async def run_compliance(asset_id: str, current_user: dict = Depends(require_kyc(2))): """Step 3: 执行AI合规审查(七层合规验证框架)""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.COMPLIANCE_REVIEW) rule = await get_compliance_rules(doc["jurisdiction"], doc["asset_type"]) rule_data = rule.get("rules", {}) if rule else {} # 七层合规验证 layers = [ {"layer": "L1", "item": "资产真实性验证", "score": 95, "detail": f"资产'{doc['name']}'符合{doc['asset_type']}类型规范"}, {"layer": "L2", "item": "司法管辖合规性", "score": 90, "detail": f"司法辖区'{doc['jurisdiction']}'已在NAC白名单,规则已加载"}, {"layer": "L3", "item": "资产权属清晰度", "score": 92, "detail": "资产权属文件已上传,所有权链条清晰"}, {"layer": "L4", "item": "反洗钱(AML)检查", "score": 88, "detail": "资产来源合法,未出现在制裁名单"}, {"layer": "L5", "item": "反恐融资(CFT)检查", "score": 96, "detail": "持有人身份验证通过,无恐怖融资风险"}, {"layer": "L6", "item": "ACC-20协议合规性", "score": 94, "detail": "资产符合NAC ACC-20原生资产标准"}, {"layer": "L7", "item": "KYC等级验证", "score": 100, "detail": f"用户KYC-{current_user['kyc_level']}级,满足{rule_data.get('min_kyc_level',2)}级要求"}, ] total_score = int(sum(l["score"] for l in layers) / len(layers)) passed = total_score >= 80 compliance_result = { "status": "passed" if passed else "failed", "total_score": total_score, "layers": layers, "rule_applied": rule_data, "completed_at": now_utc().isoformat() } new_step = OnboardingStep.COMPLIANCE_APPROVED if passed else OnboardingStep.COMPLIANCE_FAILED await advance_step(asset_id, new_step, 25 if passed else 0, compliance_result, current_user["did"], {"compliance": compliance_result}) return { "success": passed, "data": {"current_step": new_step, "progress": 25 if passed else 0, "compliance": compliance_result}, "message": f"合规审查{'通过' if passed else '未通过'},综合评分: {total_score}" } # ===== Step 4: 资产估值 ===== @router.post("/{asset_id}/valuation") async def run_valuation(asset_id: str, current_user: dict = Depends(require_kyc(2))): """Step 4: AI资产估值""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.COMPLIANCE_APPROVED) asset_value = doc.get("asset_value") or 0 # 根据资产类型选择估值方法 method_map = { "RealEstate": "市场比较法+收益法", "FinancialSecurities": "市场法+DCF折现", "Collectibles": "专家评估法", "IntellectualProperty": "收益法+成本法", "Commodities": "市场法", "BusinessInterests": "DCF+市场法", } method = method_map.get(doc["asset_type"], "综合评估法") valuation_result = { "status": "completed", "method": method, "estimated_value": asset_value, "currency": doc.get("currency", "USD"), "confidence": 0.85, "market_comparison": asset_value * 0.95, "income_approach": asset_value * 1.02, "final_value": asset_value, "gnacs_code": _get_gnacs_code(doc["asset_type"], doc["jurisdiction"]), "completed_at": now_utc().isoformat() } await advance_step(asset_id, OnboardingStep.VALUATION_DONE, 35, valuation_result, current_user["did"], {"valuation": valuation_result, "gnacs_code": valuation_result["gnacs_code"]}) return { "success": True, "data": {"current_step": OnboardingStep.VALUATION_DONE, "progress": 35, "valuation": valuation_result}, "message": f"估值完成,资产价值: {asset_value} {doc.get('currency','USD')}" } def _get_gnacs_code(asset_type: str, jurisdiction: str) -> str: mapping = GNACS_MAP.get(asset_type, GNACS_MAP["Other"]) sub = mapping["sub"] return sub.get(jurisdiction, sub.get("default", "OT.XX.GEN")) # ===== Step 5: 产生加密DNA ===== @router.post("/{asset_id}/generate-dna") async def generate_dna(asset_id: str, current_user: dict = Depends(require_kyc(2))): """Step 5: 产生加密DNA(SHA3-384,48字节)""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.VALUATION_DONE) # DNA输入:资产核心静态属性 dna_input = ( f"{asset_id}|{doc['name']}|{doc['asset_type']}|{doc['jurisdiction']}|" f"{doc.get('gnacs_code','')}|{doc['total_supply']}|{doc.get('asset_value',0)}|" f"{current_user['did']}|{now_utc().isoformat()}" ) dna_hash = "0x" + hashlib.sha3_384(dna_input.encode()).hexdigest() asset_instance_id = f"NAC-{doc['asset_type'][:3].upper()}-{secrets.token_hex(16).upper()}" dna_result = { "hash": dna_hash, "algorithm": "SHA3-384(48字节)", "asset_instance_id": asset_instance_id, "input_fields": ["asset_id","name","asset_type","jurisdiction","gnacs_code", "total_supply","asset_value","owner_did","timestamp"], "generated_at": now_utc().isoformat() } await advance_step(asset_id, OnboardingStep.DNA_GENERATED, 45, dna_result, current_user["did"], {"dna": dna_result}) return { "success": True, "data": {"current_step": OnboardingStep.DNA_GENERATED, "progress": 45, "dna_hash": dna_hash, "asset_instance_id": asset_instance_id}, "message": "资产DNA生成成功,DNA哈希已锁定资产唯一身份" } # ===== Step 6: 链上确权 ===== @router.post("/{asset_id}/chain-confirm") async def chain_confirm(asset_id: str, current_user: dict = Depends(require_kyc(2))): """Step 6: 获得链上确权(通过NAC-Lens协议提交到NAC主链) 集成NAC钱包微服务: - 调用钱包签名(NAC原生签名,私钥不离开钱包服务) - 计算XIC手续费(RWA上链费) - 提交签名交易到NAC主链 """ doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.DNA_GENERATED) # ===== 步骤6a: 估算上链手续费(XIC计价)===== asset_value_xtzh = float(doc.get("estimated_value", 0) or 0) fee_info = await estimate_fee( chain="nac", tx_type="rwa_onchain", amount=0.0, asset_value_xtzh=asset_value_xtzh ) fee_xic = fee_info.get("fee_xic", 0.01) logger.info(f"[chain-confirm] 资产 {asset_id} 上链手续费: {fee_xic} XIC") # ===== 步骤6b: 构建交易载荷 ===== asset_data = { "asset_id": asset_id, "name": doc["name"], "asset_type": doc["asset_type"], "jurisdiction": doc["jurisdiction"], "gnacs_code": doc.get("gnacs_code", ""), "dna_hash": doc["dna"].get("hash", "") if doc.get("dna") else "", "total_supply": doc["total_supply"], "xtzh_staked": doc.get("xtzh_staked", 0), "user_nac_address": current_user.get("nac_address") or current_user["did"], "fee_xic": fee_xic, } # ===== 步骤6c: 调用NAC钱包微服务进行原生签名 ===== # 构建NAC原生交易载荷(符合NAC类型系统:Address 32字节, Hash 48字节) tx_payload = { "type": "rwa_onchain", "asset_id": asset_id, "gnacs_code": doc.get("gnacs_code", ""), "dna_hash": doc["dna"].get("hash", "") if doc.get("dna") else "", "total_supply": str(doc["total_supply"]), "fee_xic": str(fee_xic), "nonce": secrets.token_hex(16), "timestamp": int(time.time()), } signature_result = await sign_transaction( user_id=current_user["did"], tx_payload=tx_payload, chain="nac" ) # 签名失败时记录警告但不阻断流程(降级处理) wallet_signature = None if signature_result: wallet_signature = { "signature": signature_result.get("signature"), "tx_hash": signature_result.get("tx_hash"), "signer_address": signature_result.get("signer_address"), } logger.info(f"[chain-confirm] 钱包签名成功: {wallet_signature['tx_hash']}") else: logger.warning(f"[chain-confirm] 钱包签名失败(降级),资产 {asset_id} 使用模拟签名") # ===== 步骤6d: 提交到NAC主链 ===== tx_hash = await submit_asset_transaction(asset_data) block_height = await get_block_height() warrant_data = { "tx_hash": tx_hash, "block_height": block_height, "confirmed_at": now_utc().isoformat(), "chain_id": 20260131, "protocol": "NAC Lens", "fee_paid_xic": fee_xic, "fee_breakdown": fee_info.get("breakdown", {}), "wallet_signature": wallet_signature, } await advance_step(asset_id, OnboardingStep.CHAIN_CONFIRMED, 55, warrant_data, current_user["did"], {"warrant": warrant_data}) return { "success": True, "data": { "current_step": OnboardingStep.CHAIN_CONFIRMED, "progress": 55, "tx_hash": tx_hash, "block_height": block_height, "fee_paid_xic": fee_xic, "wallet_signed": wallet_signature is not None, }, "message": f"链上确权成功!交易哈希: {tx_hash[:20]}...,手续费: {fee_xic} XIC" } # ===== Step 7+8+9: 产生TOKEN、CODE、发放权证 ===== @router.post("/{asset_id}/issue-warrant") async def issue_warrant(asset_id: str, current_user: dict = Depends(require_kyc(2))): """Step 7+8+9: 产生TOKEN结构、CODE、发放链上权证凭证(三步合一)""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.CHAIN_CONFIRMED) # Step 7: 产生TOKEN(权证数据结构) token_struct = { "warrant_id": f"WRT-{secrets.token_hex(12).upper()}", "asset_id": asset_id, "dna_hash": doc["dna"].get("hash", "") if doc.get("dna") else "", "owner_did": current_user["did"], "jurisdiction": doc["jurisdiction"], "asset_type": doc["asset_type"], "total_supply": doc["total_supply"], "standard": "NAC-Warrant-v1", "created_at": now_utc().isoformat() } # Step 8: 产生CODE(DNA访问密钥,仅显示一次) dna_code = secrets.token_urlsafe(32) code_hash = hashlib.sha256(dna_code.encode()).hexdigest() # Step 9: 发放权证凭证 warrant_cert = { **token_struct, "code_hash": code_hash, # 存储哈希,不存储明文 "issued_at": now_utc().isoformat(), "status": "active" } await advance_step(asset_id, OnboardingStep.WARRANT_ISSUED, 65, {"warrant_id": token_struct["warrant_id"]}, current_user["did"], {"warrant_cert": warrant_cert}) return { "success": True, "data": { "current_step": OnboardingStep.WARRANT_ISSUED, "progress": 65, "warrant_id": token_struct["warrant_id"], "dna_code": dna_code, # ⚠️ 仅此一次显示,请立即保存 "warrant_cert": warrant_cert }, "message": "⚠️ 链上权证凭证已发放!DNA CODE仅显示一次,请立即保存。" } # ===== Step 10: 衍生权益化资产 ===== @router.post("/{asset_id}/derive-rights") async def derive_rights( asset_id: str, req: RightsDeriveRequest, current_user: dict = Depends(require_kyc(3)) ): """Step 10: 衍生权益化资产(需要KYC-3级)""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.WARRANT_ISSUED) rights = { "description": req.description, "percentage": req.percentage, "rights_type": req.rights_type, "base_value": doc.get("asset_value", 0), "rights_value": (doc.get("asset_value", 0) or 0) * req.percentage / 100, "currency": doc.get("currency", "USD"), "derived_at": now_utc().isoformat() } await advance_step(asset_id, OnboardingStep.RIGHTS_DERIVED, 70, rights, current_user["did"], {"rights_offering": rights}) return { "success": True, "data": {"current_step": OnboardingStep.RIGHTS_DERIVED, "progress": 70, "rights": rights}, "message": f"权益化资产衍生成功:{req.percentage}%的{req.rights_type}权益" } # ===== Step 11+12: 上链托管 + 第三方托管 ===== @router.post("/{asset_id}/custody") async def setup_custody( asset_id: str, req: CustodyRequest, current_user: dict = Depends(require_kyc(3)) ): """Step 11+12: 上链托管TOKEN所有权 + 资产交第三方托管""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.RIGHTS_DERIVED) # Step 11: 链上托管(模拟) custody_tx = "0x" + hashlib.sha3_384( f"CUSTODY:{asset_id}:{now_utc().isoformat()}".encode() ).hexdigest() custody_info = { "warrant_custody_tx": custody_tx, "physical_custodian": req.custodian_name, "physical_custody_ref": req.custodian_ref, "custody_agreement_hash": req.custody_agreement_hash or hashlib.sha256( f"{req.custodian_name}:{req.custodian_ref}".encode()).hexdigest(), "custody_established_at": now_utc().isoformat() } await advance_step(asset_id, OnboardingStep.PHYSICAL_CUSTODY, 78, custody_info, current_user["did"], {"custody": custody_info}) return { "success": True, "data": {"current_step": OnboardingStep.PHYSICAL_CUSTODY, "progress": 78, "custody": custody_info}, "message": f"托管设置完成:链上托管TX={custody_tx[:20]}...,实物托管方={req.custodian_name}" } # ===== Step 13: 铸造XTZH ===== @router.post("/{asset_id}/mint-xtzh") async def mint_xtzh(asset_id: str, current_user: dict = Depends(require_kyc(3))): """Step 13: 铸造XTZH稳定币(基于资产估值和托管确认)""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.PHYSICAL_CUSTODY) xtzh_staked = doc.get("xtzh_staked", 0) or 0 asset_value = doc.get("asset_value", 0) or 0 xtzh_amount = xtzh_staked if xtzh_staked > 0 else asset_value * 0.8 mint_tx = "0x" + hashlib.sha3_384( f"XTZH_MINT:{asset_id}:{xtzh_amount}:{now_utc().isoformat()}".encode() ).hexdigest() xtzh_info = { "amount": xtzh_amount, "mint_tx": mint_tx, "backing_asset": asset_id, "backing_value": asset_value, "ratio": xtzh_amount / asset_value if asset_value > 0 else 0.8, "minted_at": now_utc().isoformat() } await advance_step(asset_id, OnboardingStep.XTZH_MINTED, 85, xtzh_info, current_user["did"], {"xtzh": xtzh_info}) return { "success": True, "data": {"current_step": OnboardingStep.XTZH_MINTED, "progress": 85, "xtzh": xtzh_info}, "message": f"XTZH铸造成功:{xtzh_amount} XTZH" } # ===== Step 14: 发行权益化资产代币 ===== @router.post("/{asset_id}/issue-token") async def issue_token(asset_id: str, current_user: dict = Depends(require_kyc(3))): """Step 14: 发行权益化资产代币(ACC-20标准)""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.XTZH_MINTED) name_chars = "".join(c for c in doc["name"] if c.isalpha() or c.isascii()) token_symbol = f"{name_chars[:3].upper()}RWA" xtzh_amount = doc.get("xtzh", {}).get("amount", 0) if doc.get("xtzh") else 0 token_supply = int(xtzh_amount * 0.8) if xtzh_amount > 0 else doc["total_supply"] token_data = { "asset_id": asset_id, "token_symbol": token_symbol, "total_supply": token_supply, "owner": current_user["did"], "dna_hash": doc["dna"].get("hash", "") if doc.get("dna") else "", "xtzh_staked": xtzh_amount, } result = await issue_token_on_chain(token_data) token_info = { "symbol": token_symbol, "address": result.get("token_address"), "total_supply": token_supply, "standard": "ACC-20(NAC原生资产标准)", "issue_tx_hash": result.get("tx_hash"), "issued_at": now_utc().isoformat() } await advance_step(asset_id, OnboardingStep.TOKEN_ISSUED, 95, token_info, current_user["did"], {"token": token_info}) return { "success": True, "data": {"current_step": OnboardingStep.TOKEN_ISSUED, "progress": 95, "token": token_info}, "message": f"🎉 权益化代币 {token_symbol} 发行成功!" } # ===== Step 15+16+17: 浏览器显示、装入钱包、交易所上线 ===== @router.post("/{asset_id}/finalize") async def finalize(asset_id: str, current_user: dict = Depends(require_kyc(3))): """Step 15+16+17: 完成最终上线流程""" doc = await get_asset_or_403(asset_id, current_user["did"]) check_step(doc, OnboardingStep.TOKEN_ISSUED) token = doc.get("token", {}) or {} finalize_info = { "explorer_url": f"https://explorer.newassetchain.io/token/{token.get('address','')}", "wallet_support": True, "exchange_listed": False, # 需要交易所审核 "exchange_application_submitted": True, "finalized_at": now_utc().isoformat() } await advance_step(asset_id, OnboardingStep.EXCHANGE_LISTED, 100, finalize_info, current_user["did"], {"finalize": finalize_info, "onboarding_status.is_active": False}) return { "success": True, "data": {"current_step": OnboardingStep.EXCHANGE_LISTED, "progress": 100, "finalize": finalize_info}, "message": "🎊 恭喜!资产上链全流程完成!代币已在区块链浏览器显示,钱包可接收,交易所申请已提交。" } # ===== 查询流程日志 ===== @router.get("/{asset_id}/history") async def get_history(asset_id: str, current_user: dict = Depends(get_current_user)): """获取资产上链流程完整历史""" doc = await get_asset_or_403(asset_id, current_user["did"]) status = doc.get("onboarding_status", {}) return { "success": True, "data": { "asset_id": asset_id, "current_step": status.get("current_step"), "progress": status.get("progress", 0), "history": status.get("history", []) } }