Skip to main content

业务:数据一致性

你的记忆大体正确。plan-detail.tsx 主要是编排层,三条流水线分别在三个 hook/模块里;并发和批量落库在 useSignPlanSqlite + SQLite 服务里。

整体架构(三条线)

职责位置说明
只管签名useSignPlanSqlite + runWithConcurrency有界签名 worker,签完立刻入队广播,不等待广播
只管广播同文件内 runBroadcastWorker × N独立队列 pendingBroadcastQueue + 唤醒机制
只管刷新/确认useBroadcastResultPollingSqlite每 5s 轮询 broadcasting 交易的 receipt

1. 签名与广播解耦(并发)

核心在 useSignPlanSqlite.ts,不是 plan-detail.tsx 本体:

  • 签名并发runWithConcurrency,上限由 resolvePlanSigningConcurrency 控制(默认 8,硬上限 6)。
  • 广播并发broadcastWorkerTasks 启动 maxBroadcastConcurrency 个 worker(默认 10,上限 12)。
  • 流水线:签成功后 enqueueBroadcastJob,注释写明 queue broadcast and continue next signing task immediately —— 广播慢不会堵住后续签名。
// step 12: queue broadcast and continue next signing task immediately
enqueueBroadcastJob(transaction, signResult.rawTransaction);

配置在 src/constants/planSigning.ts

export const DEFAULT_PLAN_SIGNING_CONCURRENCY = 8;
export const DEFAULT_PLAN_BROADCAST_CONCURRENCY = 10;
export const DEFAULT_PLAN_SIGNER_POOL_SIZE = 8;

可选 SignerPool:多 signer 实例 + 按 account_id 分片(默认 ENABLE_PLAN_SIGNER_POOL = false)。


2. plan-detail.tsx 里你接的三块

签名 + 广播入口

const { signPlan: handleSignPlan, stopSignPlan } = useSignPlanSqlite({
planId,
plan: planTasks.length > 0 ? { tasks: planTasks } : null,
originalPlan,
chain,
coldWalletMap,
onMissingAddresses: handleMissingAddresses,
setIsSigningPlan,
setSigningProgress,
setSigningTotal,
onPlanMetaUpdate: updatePlanMeta,
});

链上确认轮询(“刷新”)

useBroadcastResultPollingSqlite({
transactions: transactionsToPoll,
chain: chain,
enabled: Boolean(planId),
pollingInterval: 5000,
onStatusUpdate: useCallback(
(transactionId: string, status: 'confirmed' | 'failed', receipt: any) => {
// ... 更新 runtime store,不阻塞签名流程
emitRuntimeTaskUpdates(planId, [{ transactionId, updates: runtimeUpdates }]);
},

轮询 tick 里对多笔 broadcasting 交易 并发 Promise.allSettled 查 receipt。

从 DB 整表 reload

  • 签名结束:effectiveIsSigningPlan 变 false → reloadPlan()
  • Run 结束:handleSignPlanfinally 里也会 reloadPlan()

3. UI 与 DB 的“双写”策略

  • 即时 UIusePlanRuntimeStore.emitTaskUpdatesplan-detailruntimeTaskUpdateSnapshot 合并到列表,不必等整 plan reload。
  • 落库
    • 单笔关键路径(广播成功拿到 txHash):updateTransactionInDbAndUI 立刻写 SQLite。
    • 批量updateTransactionsInDbAndUIplanService.updateTransactions,在 PlanTransactionService 里包一层 withTransactionAsync 事务批量 UPDATE(例如一批没有 MPC share 的失败标记)。
const updateTransactionsInDbAndUI = useCallback(
async (updates: Array<{...}>) => {
if (planService.isOpen() && updates.length > 0) {
await planService.updateTransactions(updates, { syncMeta: false });
}
// ... emitTaskUpdates 批量推 UI

进度写库用 signingProgressWriteQueue 串行队列,保证单调递增,避免并发写 progress 乱序。


4. 其它细节优化

  • mergeBroadcastStatusForPlanReloadreloadPlan 后不让 confirmed/failed 被弱状态覆盖。
  • reloadPlanInFlightRef:防止 reload 重入。
  • 中断恢复usePlanDetailSqlite 30s 无进度则清 signing 状态。
  • 旧 READMEsrc/app/(plan)/README.mdBATCH_UPDATE_INTERVAL / JSON 文件批量刷盘是旧方案;当前已是 SQLite,批量主要是 updateTransactions 事务,不再有 plan-detail 里的 5 笔/1 秒定时 flush。

一句话对照你的记忆

你的说法现状
只管签名runWithConcurrency 签名 worker
只管广播✅ 独立 pendingBroadcastQueue + 广播 worker 池
只管刷新useBroadcastResultPollingSqlite(与签名/广播并行)
并发✅ 签名、广播、轮询各自有界并发
批量落库updateTransactions + SQLite transaction;单笔 txHash 仍即时写

如果你要对照某次 commit 或 OpenSpec(plan-concurrent-signing-controloptimize-signer-pool-concurrency),可以说一下时间范围,我可以帮你对一下设计文档和当前代码是否完全一致。