Skip to content

Day 3:asyncio 并发——50 份文档同时分析

昨天的问题

Day 2 的分析链很漂亮——一份合同进去,一个 ContractInfo 出来。但如果老板说"帮我把这 50 份合同都分析了",你会怎么做? 天的问题Day 2 的分析链很漂亮——一份合同进去,一个 ContractInfo 出来。但如果老板说"帮我把这 50 份合同都分析了",你会怎么做?

在脑子里跑一下:一份合同 3 秒,50 份串行处理就是 150 秒。两个半分钟你能忍。但如果以后发展到 500 份、5000 份呢?串行的短板就暴露了——时间线性增长,不能靠"多等一会儿"解决。

之前在第 3 章学过 asyncio 的原理。今天是你第一次在真实项目里用它。

今天做什么

把 Day 2 的串行分析链改成支持并发。核心思路:把文档分批,每批同时处理,批内并发、批间顺序。

不引入真实 LLM 调用(保持零成本),用 asyncio.sleep() 模拟每份文档 0.5 秒的分析耗时。50 份串行 = 25 秒,并发后 ≈ 2.5 秒。

📁 代码目录

doc_analyzer/
├── sample_contract.txt
├── day1.py
├── day2.py
└── day3.py                ← 新增!并发分析,50行

代码

python
# day3.py — 并发文档分析

import asyncio
import time
from typing import List

# ===== Day2 的结构定义(复用)=====
from pydantic import BaseModel, Field

class DocSummary(BaseModel):
    """单份文档的分析结果"""
    doc_id: int = Field(description="文档编号")
    title: str = Field(description="文档标题")
    summary: str = Field(description="100字以内的摘要")
    category: str = Field(description="分类:合同 / 简历 / 报告 / 其他")
# ===== Day2 结束 =====

# ===== Day3 新增:异步分析函数 =====
async def analyze_one(doc_id: int, content: str) -> DocSummary:
    """
    异步分析单份文档。
    在真实场景中,这里调用 LLM API。
    今天用 asyncio.sleep 模拟 IO 等待时间。
    """
    print(f"  🔍 开始分析文档 {doc_id}...")
    await asyncio.sleep(0.5)  # 模拟分析耗时(比如调用 LLM API)
    
    # 模拟分析结果
    result = DocSummary(
        doc_id=doc_id,
        title=f"文档{doc_id}",
        summary=f"这是文档{doc_id}的内容摘要",
        category="合同"
    )
    print(f"  ✅ 文档 {doc_id} 分析完成")
    return result

async def batch_analyze(documents: List[str], batch_size: int = 10) -> List[DocSummary]:
    """
    批量并发分析。
    
    分批处理——每批 batch_size 份同时跑,批与批之间顺序执行。
    这样既利用了并发,又不会把系统资源打满。
    """
    all_results = []
    total = len(documents)
    
    for batch_start in range(0, total, batch_size):
        batch = documents[batch_start:batch_start + batch_size]
        batch_ids = list(range(batch_start + 1, batch_start + len(batch) + 1))
        
        print(f"\n📦 处理第 {batch_start // batch_size + 1} 批({len(batch)} 份)")
        
        # ⭐ asyncio.gather —— 同时启动多个协程
        tasks = [analyze_one(did, doc) for did, doc in zip(batch_ids, batch)]
        results = await asyncio.gather(*tasks)
        all_results.extend(results)
    
    return all_results

# ===== 运行 =====
if __name__ == "__main__":
    # 模拟50份文档(在真实场景中,这里从文件系统读取)
    documents = [f"文档{i}的合同内容..." for i in range(1, 51)]
    
    print(f"📚 共 {len(documents)} 份文档,开始并发分析...\n")
    
    start = time.time()
    results = asyncio.run(batch_analyze(documents, batch_size=10))
    elapsed = time.time() - start
    
    # 输出结果统计
    print(f"\n" + "=" * 50)
    print(f"📊 分析完成!")
    print(f"   文档总数: {len(results)}")
    print(f"   耗时: {elapsed:.1f} 秒")
    print(f"   如果串行: {len(documents) * 0.5:.0f} 秒")
    print(f"   提速: {len(documents) * 0.5 / elapsed:.1f} 倍")
    print(f"   前5份结果:")
    for r in results[:5]:
        print(f"     → {r.title} | {r.category} | {r.summary}")

运行

bash
python day3.py

你应该看到:

📚 共 50 份文档,开始并发分析...

📦 处理第 1 批(10 份)
  🔍 开始分析文档 1...
  🔍 开始分析文档 2...
  🔍 开始分析文档 3...
  ...(10份同时开始)
  ✅ 文档 1 分析完成
  ✅ 文档 3 分析完成
  ✅ 文档 2 分析完成
  ...(完成顺序不一定按编号——谁快谁先返回)

📦 处理第 2 批(10 份)
  ...

==================================================
📊 分析完成!
   文档总数: 50
   耗时: 2.7 秒
   如果串行: 25 秒
   提速: 9.3 倍
   前5份结果:
     → 文档1 | 合同 | 这是文档1的内容摘要
     → 文档2 | 合同 | 这是文档2的内容摘要
     ...

你学到了什么

asyncio.gather() 是批量并发的心脏。 把 N 个协程打包传给 gather,Python 的事件循环自动调度——谁在等 IO 就让位给别人。这就是为什么 10 份文档"同时"处理完毕,而不是一个一个来。

分批不是多余的。 有人问:为什么不全扔进去并发,还要分批?因为真实环境有资源上限——LLM API 有并发限制,服务器内存有限,数据库连接数有限。分批是你控制并发规模的手段:既不浪费资源,也不打垮服务。

跟项目一的区别: 项目一的 asyncio 用于多并发对话(Day 3),我们是多并发文档处理。场景不同,但底层原理完全一样——都是利用 IO 等待时间让 Python 去干别的事。

明天的预告

如果同一份文档被分析了两次,Agent 应该直接拿上一次的结果,而不是重新跑一遍。明天实现分析缓存——把历史结果存起来,第二次遇到秒出。


Day 3 完成。50 份文档,2.7 秒搞定。比串行快近 10 倍——这就是并发的力量。