Day 3:asyncio 并发——50 份文档同时分析
昨天的问题
Day 2 的分析链很漂亮——一份合同进去,一个 ContractInfo 出来。但如果老板说"帮我把这 50 份合同都分析了",你会怎么做? 天的问题Day 2 的分析链很漂亮——一份合同进去,一个 ContractInfo 出来。但如果老板说"帮我把这 50 份合同都分析了",你会怎么做?
在脑子里跑一下:一份合同 3 秒,50 份串行处理就是 150 秒。两个半分钟你能忍。但如果以后发展到 500 份、5000 份呢?串行的短板就暴露了——时间线性增长,不能靠"多等一会儿"解决。
之前在第 3 章学过 asyncio 的原理。今天是你第一次在真实项目里用它。
今天做什么
把 Day 2 的串行分析链改成支持并发。核心思路:把文档分批,每批同时处理,批内并发、批间顺序。
不引入真实 LLM 调用(保持零成本),用 asyncio.sleep() 模拟每份文档 0.5 秒的分析耗时。50 份串行 = 25 秒,并发后 ≈ 2.5 秒。
📁 代码目录
doc_analyzer/
├── sample_contract.txt
├── day1.py
├── day2.py
└── day3.py ← 新增!并发分析,50行代码
python
# day3.py — 并发文档分析
import asyncio
import time
from typing import List
# ===== Day2 的结构定义(复用)=====
from pydantic import BaseModel, Field
class DocSummary(BaseModel):
"""单份文档的分析结果"""
doc_id: int = Field(description="文档编号")
title: str = Field(description="文档标题")
summary: str = Field(description="100字以内的摘要")
category: str = Field(description="分类:合同 / 简历 / 报告 / 其他")
# ===== Day2 结束 =====
# ===== Day3 新增:异步分析函数 =====
async def analyze_one(doc_id: int, content: str) -> DocSummary:
"""
异步分析单份文档。
在真实场景中,这里调用 LLM API。
今天用 asyncio.sleep 模拟 IO 等待时间。
"""
print(f" 🔍 开始分析文档 {doc_id}...")
await asyncio.sleep(0.5) # 模拟分析耗时(比如调用 LLM API)
# 模拟分析结果
result = DocSummary(
doc_id=doc_id,
title=f"文档{doc_id}",
summary=f"这是文档{doc_id}的内容摘要",
category="合同"
)
print(f" ✅ 文档 {doc_id} 分析完成")
return result
async def batch_analyze(documents: List[str], batch_size: int = 10) -> List[DocSummary]:
"""
批量并发分析。
分批处理——每批 batch_size 份同时跑,批与批之间顺序执行。
这样既利用了并发,又不会把系统资源打满。
"""
all_results = []
total = len(documents)
for batch_start in range(0, total, batch_size):
batch = documents[batch_start:batch_start + batch_size]
batch_ids = list(range(batch_start + 1, batch_start + len(batch) + 1))
print(f"\n📦 处理第 {batch_start // batch_size + 1} 批({len(batch)} 份)")
# ⭐ asyncio.gather —— 同时启动多个协程
tasks = [analyze_one(did, doc) for did, doc in zip(batch_ids, batch)]
results = await asyncio.gather(*tasks)
all_results.extend(results)
return all_results
# ===== 运行 =====
if __name__ == "__main__":
# 模拟50份文档(在真实场景中,这里从文件系统读取)
documents = [f"文档{i}的合同内容..." for i in range(1, 51)]
print(f"📚 共 {len(documents)} 份文档,开始并发分析...\n")
start = time.time()
results = asyncio.run(batch_analyze(documents, batch_size=10))
elapsed = time.time() - start
# 输出结果统计
print(f"\n" + "=" * 50)
print(f"📊 分析完成!")
print(f" 文档总数: {len(results)}")
print(f" 耗时: {elapsed:.1f} 秒")
print(f" 如果串行: {len(documents) * 0.5:.0f} 秒")
print(f" 提速: {len(documents) * 0.5 / elapsed:.1f} 倍")
print(f" 前5份结果:")
for r in results[:5]:
print(f" → {r.title} | {r.category} | {r.summary}")运行
bash
python day3.py你应该看到:
📚 共 50 份文档,开始并发分析...
📦 处理第 1 批(10 份)
🔍 开始分析文档 1...
🔍 开始分析文档 2...
🔍 开始分析文档 3...
...(10份同时开始)
✅ 文档 1 分析完成
✅ 文档 3 分析完成
✅ 文档 2 分析完成
...(完成顺序不一定按编号——谁快谁先返回)
📦 处理第 2 批(10 份)
...
==================================================
📊 分析完成!
文档总数: 50
耗时: 2.7 秒
如果串行: 25 秒
提速: 9.3 倍
前5份结果:
→ 文档1 | 合同 | 这是文档1的内容摘要
→ 文档2 | 合同 | 这是文档2的内容摘要
...你学到了什么
asyncio.gather() 是批量并发的心脏。 把 N 个协程打包传给 gather,Python 的事件循环自动调度——谁在等 IO 就让位给别人。这就是为什么 10 份文档"同时"处理完毕,而不是一个一个来。
分批不是多余的。 有人问:为什么不全扔进去并发,还要分批?因为真实环境有资源上限——LLM API 有并发限制,服务器内存有限,数据库连接数有限。分批是你控制并发规模的手段:既不浪费资源,也不打垮服务。
跟项目一的区别: 项目一的 asyncio 用于多并发对话(Day 3),我们是多并发文档处理。场景不同,但底层原理完全一样——都是利用 IO 等待时间让 Python 去干别的事。
明天的预告
如果同一份文档被分析了两次,Agent 应该直接拿上一次的结果,而不是重新跑一遍。明天实现分析缓存——把历史结果存起来,第二次遇到秒出。
Day 3 完成。50 份文档,2.7 秒搞定。比串行快近 10 倍——这就是并发的力量。

