业务:数据一致性
你的记忆大体正确。plan-detail.tsx 主要是编排层,三条流水线分别在三个 hook/模块里;并发和批量落库在 useSignPlanSqlite + SQLite 服务里。
整体架构(三条线)
| 职责 | 位置 | 说明 |
|---|---|---|
| 只管签名 | useSignPlanSqlite + runWithConcurrency | 有界签名 worker,签完立刻入队广播,不等待广播 |
| 只管广播 | 同文件内 runBroadcastWorker × N | 独立队列 pendingBroadcastQueue + 唤醒机制 |
| 只管刷新/确认 | useBroadcastResultPollingSqlite | 每 5s 轮询 broadcasting 交易的 receipt |
1. 签名与广播解耦(并发)
核心在 useSignPlanSqlite.ts,不是 plan-detail.tsx 本体:
- 签名并发:
runWithConcurrency,上限由resolvePlanSigningConcurrency控制(默认 8,硬上限 6)。 - 广播并发:
broadcastWorkerTasks启动maxBroadcastConcurrency个 worker(默认 10,上限 12)。 - 流水线:签成功后
enqueueBroadcastJob,注释写明 queue broadcast and continue next signing task immediately —— 广播慢不会堵住后续签名。
// step 12: queue broadcast and continue next signing task immediately
enqueueBroadcastJob(transaction, signResult.rawTransaction);
配置在 src/constants/planSigning.ts:
export const DEFAULT_PLAN_SIGNING_CONCURRENCY = 8;
export const DEFAULT_PLAN_BROADCAST_CONCURRENCY = 10;
export const DEFAULT_PLAN_SIGNER_POOL_SIZE = 8;
可选 SignerPool:多 signer 实例 + 按 account_id 分片(默认 ENABLE_PLAN_SIGNER_POOL = false)。
2. plan-detail.tsx 里你接的三块
签名 + 广播入口
const { signPlan: handleSignPlan, stopSignPlan } = useSignPlanSqlite({
planId,
plan: planTasks.length > 0 ? { tasks: planTasks } : null,
originalPlan,
chain,
coldWalletMap,
onMissingAddresses: handleMissingAddresses,
setIsSigningPlan,
setSigningProgress,
setSigningTotal,
onPlanMetaUpdate: updatePlanMeta,
});
链上确认轮询(“刷新”)
useBroadcastResultPollingSqlite({
transactions: transactionsToPoll,
chain: chain,
enabled: Boolean(planId),
pollingInterval: 5000,
onStatusUpdate: useCallback(
(transactionId: string, status: 'confirmed' | 'failed', receipt: any) => {
// ... 更新 runtime store,不阻塞签名流程
emitRuntimeTaskUpdates(planId, [{ transactionId, updates: runtimeUpdates }]);
},
轮询 tick 里对多笔 broadcasting 交易 并发 Promise.allSettled 查 receipt。
从 DB 整表 reload
- 签名结束:
effectiveIsSigningPlan变 false →reloadPlan() - Run 结束:
handleSignPlan的finally里也会reloadPlan()
3. UI 与 DB 的“双写”策略
- 即时 UI:
usePlanRuntimeStore.emitTaskUpdates→plan-detail用runtimeTaskUpdateSnapshot合并到列表,不必等整 plan reload。 - 落库:
- 单笔关键路径(广播成功拿到
txHash):updateTransactionInDbAndUI立刻写 SQLite。 - 批量:
updateTransactionsInDbAndUI→planService.updateTransactions,在PlanTransactionService里包一层withTransactionAsync事务批量 UPDATE(例如一批没有 MPC share 的失败标记)。
- 单笔关键路径(广播成功拿到
const updateTransactionsInDbAndUI = useCallback(
async (updates: Array<{...}>) => {
if (planService.isOpen() && updates.length > 0) {
await planService.updateTransactions(updates, { syncMeta: false });
}
// ... emitTaskUpdates 批量推 UI
进度写库用 signingProgressWriteQueue 串行队列,保证单调递增,避免并发写 progress 乱序。
4. 其它细节优化
mergeBroadcastStatusForPlanReload:reloadPlan后不让confirmed/failed被弱状态覆盖。reloadPlanInFlightRef:防止 reload 重入。- 中断恢复:
usePlanDetailSqlite30s 无进度则清signing状态。 - 旧 README:
src/app/(plan)/README.md里BATCH_UPDATE_INTERVAL/ JSON 文件批量刷盘是旧方案;当前已是 SQLite,批量主要是updateTransactions事务,不再有 plan-detail 里的 5 笔/1 秒定时 flush。
一句话对照你的记忆
| 你的说法 | 现状 |
|---|---|
| 只管签名 | ✅ runWithConcurrency 签名 worker |
| 只管广播 | ✅ 独立 pendingBroadcastQueue + 广播 worker 池 |
| 只管刷新 | ✅ useBroadcastResultPollingSqlite(与签名/广播并行) |
| 并发 | ✅ 签名、广播、轮询各自有界并发 |
| 批量落库 | ✅ updateTransactions + SQLite transaction;单笔 txHash 仍即时写 |
如果你要对照某次 commit 或 OpenSpec(plan-concurrent-signing-control、optimize-signer-pool-concurrency),可以说一下时间范围,我可以帮你对一下设计文档和当前代码是否完全一致。