Skip to main content

Q13: 解释 LangChain 中的 Async Agent 的实现原理,并提供一个示例,展示如何构建一个能够并行执行多个任务的异步代理系统。

LangChain 中异步代理(Async Agent)的实现原理

一、异步代理的核心架构

在 LangChain 中,异步代理的实现建立在两个核心抽象之上:Runnable 接口AgentExecutor 执行循环

1. Runnable 统一接口

LangChain 中的所有组件(模型、工具、提示词模板、输出解析器等)都继承自 Runnable 抽象基类,提供统一的调用接口:

方法类型说明
invoke同步将单一输入转换为单一输出
ainvoke异步异步版本的 invoke
batch同步批量处理多个输入
abatch异步异步批量处理
stream同步流式输出
astream异步异步流式输出

ainvoke 的默认实现会通过 run_in_executor 将同步的 invoke 放到线程池中执行,确保不会阻塞主事件循环。这一设计使得开发者可以统一使用 async/await 模式编排各种组件。

2. AgentExecutor 的异步执行循环

AgentExecutor 是所有代理运行的核心执行器,负责管理代理的推理-行动循环:

next_action = agent.get_action(...)
while next_action != AgentFinish:
observation = run(next_action) # 调用工具获取观察结果
next_action = agent.get_action(..., next_action, observation)
return next_action

在异步模式下,AgentExecutor 使用 ainvokeastream 等异步方法来驱动整个执行循环,同时在每一步中根据工具类型的差异采用不同的异步处理策略:

  • 带有原生协程的工具:如果工具实现了 coroutine 方法,AgentExecutor 会直接 await 执行。
  • 仅支持同步调用(func)的工具:框架会通过 asyncio.get_event_loop().run_in_executor 将同步调用委托给线程池,避免阻塞主事件循环。

这种设计使得 IO 密集型工具(如网络请求、数据库查询)能够充分利用异步非阻塞的优势,而 CPU 密集型工具则借助线程池保持响应性。

3. 回调系统的异步支持

LangChain 的回调系统在整个异步执行生命周期中都会触发相应的事件。异步代理同样支持 astream 方法,能够在执行过程中流式输出中间步骤。在使用异步代理时,需要为每个代理传递独立的 CallbackManager,以避免回调信息在并发执行时产生冲突。

二、代理内部的并行化实现机制

LangChain 的异步并行能力主要体现在三个层面:

  1. 多代理并发执行:通过 asyncio.gather 同时启动多个 AgentExecutor.ainvoke(),实现任务级别的并行。
  2. 单代理内多步并行:支持将工具调用、子任务分配到多个事件循环中并发执行。
  3. 底层 LCEL 并行:LangChain 的工具类继承了 BaseTool,从而也继承了 Runnable 的所有方法,因此在工具层面也支持 ainvokeabatchRunnableParallel 类可以对多个子链执行 asyncio.gather 风格的并发调度。其 abatch 方法默认使用 asyncio.gather 并行执行多个输入。

这些机制组合起来,使得 LangChain 能够构建从底层组件到高层编排的全面异步并行能力。

三、完整示例:并行执行多个任务的异步代理系统

以下示例展示了一个能够并行处理多项独立任务的异步代理系统。该系统接收多个用户的查询,使用 asyncio.gather 并行启动多个代理实例来回答不同问题,并利用模型自身的并行工具调用能力在单次推理中同时获取多个信息。

import asyncio
import time
from langchain.agents import create_agent
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage


# ==================== 1. 定义异步工具 ====================

@tool
async def get_stock_price(symbol: str) -> str:
"""异步获取股票价格。"""
await asyncio.sleep(0.5) # 模拟 API 延迟
mock_prices = {"AAPL": "175.32", "GOOGL": "138.24", "MSFT": "372.52"}
return f"{symbol} 当前股价: ${mock_prices.get(symbol, 'N/A')}"


@tool
async def get_company_info(company: str) -> str:
"""异步获取公司信息。"""
await asyncio.sleep(0.5)
mock_info = {
"Apple": "Apple Inc. 设计、制造和销售智能手机、电脑等消费电子产品。",
"Google": "Google LLC. 提供互联网搜索、云计算、广告技术等服务。",
"Microsoft": "Microsoft Corp. 开发软件、服务和设备。"
}
return f"{company} 公司信息: {mock_info.get(company, '信息待查')}"


@tool
async def search_news(topic: str) -> str:
"""异步搜索新闻。"""
await asyncio.sleep(0.4)
return f"{topic} 最新新闻: 该公司昨日发布了季度财报,业绩超预期。"


@tool
async def slow_tool(query: str) -> str:
"""模拟耗时较长的同步工具"""
import time
time.sleep(1.5) # 同步阻塞操作
return f"处理结果: {query}"


# ==================== 2. 创建支持并行工具调用的代理 ====================

def create_investment_agent():
"""创建支持并行工具调用的代理"""
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
tools = [get_stock_price, get_company_info, search_news]

agent = create_agent(
model=model,
tools=tools,
system_prompt="你是一个投资分析助手。当用户询问多个公司或多维度信息时,同时调用所有相关的工具来获取信息。"
)
return agent


# ==================== 3. 多任务并行执行 ====================

async def single_agent_parallel_example():
"""场景一:单代理内并行调用多个工具"""
print("\n" + "="*60)
print("场景一:单代理并行调用多个工具")
print("="*60)

agent = create_investment_agent()

# 这个查询会触发模型同时调用 get_stock_price 和 get_company_info
query = "请同时分析 Apple 的股票和公司信息。"

start = time.perf_counter()
result = await agent.ainvoke({"messages": [HumanMessage(content=query)]})
elapsed = time.perf_counter() - start

# 提取最后一条 AI 回复
final_message = result["messages"][-1]
print(f"代理响应: {final_message.content}")
print(f"⏱️ 耗时: {elapsed:.2f} 秒")

return result


async def multi_agent_parallel_example():
"""场景二:多个任务并行执行(使用 asyncio.gather)"""
print("\n" + "="*60)
print("场景二:多个任务并行执行")
print("="*60)

base_agent = create_investment_agent()
tasks = [
("请分析 Microsoft 的最新新闻和股票表现", base_agent),
("请分析 Google 的公司基本信息", base_agent),
("请获取 Apple 的当前股价", base_agent)
]

start = time.perf_counter()

# 关键:使用 asyncio.gather 并行执行多个代理任务
results = await asyncio.gather(*[
agent.ainvoke({"messages": [HumanMessage(content=query)]})
for query, agent in tasks
])

elapsed = time.perf_counter() - start

for i, (query, _) in enumerate(tasks):
final_message = results[i]["messages"][-1]
print(f"\n📋 任务 {i+1}: {query}")
print(f" 响应: {final_message.content[:100]}...")

print(f"\n⏱️ 总耗时: {elapsed:.2f} 秒")
print("💡 所有任务并行执行,总耗时 ≈ 最慢单个任务的耗时")

return results


# ==================== 4. 使用信号量控制并发(防止资源过载) ====================

async def controlled_parallel_example():
"""场景三:使用信号量控制并发度"""
print("\n" + "="*60)
print("场景三:信号量控制并发度(限制最大并行数)")
print("="*60)

# 创建信号量,限制最多同时执行 2 个任务
semaphore = asyncio.Semaphore(2)

# 耗时较长的任务列表(模拟 5 个任务)
tasks = [f"long_running_task_{i}" for i in range(5)]

# 包装函数,受信号量限制
async def run_with_limit(task_name: str, agent, query: str) -> dict:
async with semaphore: # 获取信号量许可
print(f"🟢 开始执行: {task_name}")
start = time.perf_counter()
result = await agent.ainvoke({"messages": [HumanMessage(content=query)]})
elapsed = time.perf_counter() - start
print(f"🔴 完成执行: {task_name} (耗时 {elapsed:.2f}秒)")
return result

agent = create_investment_agent()
start_total = time.perf_counter()

results = await asyncio.gather(*[
run_with_limit(task, agent, "请简要介绍一家科技公司")
for task in tasks
])

total_elapsed = time.perf_counter() - start_total
print(f"\n⏱️ 全部完成,总耗时: {total_elapsed:.2f} 秒")
print("💡 由于并发度限制为 2,总耗时 > 5 × 平均单任务耗时 ÷ 2")

return results


# ==================== 5. 主函数:组合演示 ====================

async def main():
print("🚀 LangChain 异步代理系统演示")
print("演示内容:单代理并行工具调用 | 多任务并行执行 | 并发控制")

try:
# 演示 1:单代理内并行调用多个工具
await single_agent_parallel_example()

# 演示 2:多个代理任务并行执行
await multi_agent_parallel_example()

# 演示 3:使用信号量控制并发度
await controlled_parallel_example()

except Exception as e:
print(f"❌ 执行出错: {e}")
print("提示:请确保已安装 langchain, langchain-openai, 并正确设置 OPENAI_API_KEY")


if __name__ == "__main__":
asyncio.run(main())

四、关键实现要点

  1. 异步工具定义:使用 async def 定义工具的 _run 方法(或使用 @tool 装饰器配合异步函数),这样 AgentExecutor 会直接 await 调用,实现真正的异步非阻塞。

  2. 并行工具调用:构建代理时,模型本身需要支持并行工具调用(如 GPT-4o-mini、GPT-4o 等),框架会自动处理并发调用。如果工具内部存在共享资源访问冲突(如 SQLite 的并发写入),可以使用 asyncio.Semaphore 手动限制并行度。

  3. 多任务并行编排:通过 asyncio.gather 同时启动多个代理的 ainvoke,实现多任务层面的并行执行,显著提升吞吐量。

  4. 回调隔离:在并发调用多个代理时,确保每个代理使用独立的 CallbackManager,避免回调日志和追踪信息互相污染。

  5. 混合工具的线程池转译:对于只实现了同步 func 的工具(如旧版 SerpAPIWrapper),框架会自动通过 run_in_executor 将其转译到线程池中,确保 ainvoke 不会阻塞事件循环。在新代码中,优先使用 async def 实现原生异步工具以获得最佳性能。

五、性能优势

在实际应用中,使用异步代理能够显著提升执行效率。根据实践数据,通过实现 async/await 模式和利用 LangChain 框架的异步能力,团队成功将执行时间相比同步模型减少了约 40%。这种性能提升在处理 IO 密集型任务(如多个 API 调用、数据库查询、网络请求)时尤为明显。