Day 4:分析缓存——已经跑过的不重跑
昨天的问题
昨天你同时处理了 50 份文档,速度很快。但仔细想想:如果同一份合同
今天分析了,明天又被上传了一次,Agent 会傻傻地重新跑一遍流程。 天的问题昨天你同时处理了 50 份文档,速度很快。但仔细想想:如果同一份合同今天分析了,明天又被上传了一次,Agent 会傻傻地重新跑一遍流程。
贵吗?一份文档 0.5 秒,看起来不贵。但换个角度看:调用真实 LLM API 是花钱的。如果公司每天有上千份文档流转,重复率 20%,一年下来多花的钱可能上千块。更不用说 Token 消耗、API 配额这些隐形成本。
Agent 需要"记住"分析过的结果。 不是像项目一的 MemorySaver 那样记住对话,而是记住"这份文档我分析过,结果在这里"。
今天做什么
实现一个文档分析缓存。用文档内容的哈希值作为 Key——同一份文档(即使文件名不同),哈希值相同就判等,直接返回缓存。
核心很简单:一个字典 {哈希值: 分析结果}。再加一个统计功能——让你知道缓存命中了多少次,避免了多少次重复分析。
📁 代码目录
doc_analyzer/
├── sample_contract.txt
├── day1.py
├── day2.py
├── day3.py
└── day4.py ← 新增!带缓存的并发分析,80行代码
python
# day4.py — 带缓存的并发文档分析
import asyncio
import hashlib
import time
from typing import List, Optional
# ===== Day3 的结构定义(复用)=====
from pydantic import BaseModel, Field
class DocSummary(BaseModel):
doc_id: int = Field(description="文档编号")
title: str = Field(description="文档标题")
summary: str = Field(description="100字以内的摘要")
category: str = Field(description="分类:合同 / 简历 / 报告 / 其他")
# ===== Day3 结束 =====
# ===== Day4 新增:分析缓存 =====
class AnalysisCache:
"""
文档分析缓存。
核心思路:
- 用文档内容的 SHA256 哈希值作为 Key
- 同一份文档,无论上传几次、改什么文件名,哈希值都一样
- 缓存命中时直接返回,跳过分析
"""
def __init__(self):
self._store: dict = {} # {hash_value: DocSummary}
self._hits: int = 0 # 缓存命中次数
self._misses: int = 0 # 缓存未命中次数
def _hash(self, content: str) -> str:
"""计算文档内容的哈希值"""
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def get(self, content: str) -> Optional[DocSummary]:
"""查询缓存——命中返回结果,未命中返回 None"""
key = self._hash(content)
cached = self._store.get(key)
if cached:
self._hits += 1
else:
self._misses += 1
return cached
def set(self, content: str, result: DocSummary):
"""存入缓存"""
key = self._hash(content)
self._store[key] = result
def stats(self):
"""缓存统计"""
total = self._hits + self._misses
hit_rate = self._hits / total * 100 if total > 0 else 0
return {
"缓存条目数": len(self._store),
"命中次数": self._hits,
"未命中次数": self._misses,
"命中率": f"{hit_rate:.1f}%"
}
# ===== Day3 的异步分析(加缓存版)=====
async def analyze_one_with_cache(
doc_id: int, content: str, cache: AnalysisCache
) -> tuple[DocSummary, bool]:
"""
异步分析单份文档(优先读缓存)。
返回 (结果, 是否缓存命中)。
"""
# 先查缓存
cached = cache.get(content)
if cached:
print(f" ⚡ 缓存命中: 文档 {doc_id}(跳过分析)")
return cached, True
# 缓存未命中 —— 走分析流程
print(f" 🔍 分析中: 文档 {doc_id}")
await asyncio.sleep(0.5) # 模拟分析耗时
result = DocSummary(
doc_id=doc_id,
title=f"文档{doc_id}",
summary=f"这是文档{doc_id}的内容摘要",
category="合同"
)
cache.set(content, result) # 存入缓存
return result, False
async def batch_analyze_cached(documents: List[tuple], batch_size: int = 10):
"""
批量分析(带缓存)。
documents: [(doc_id, content), ...]
"""
cache = AnalysisCache()
results = []
total = len(documents)
for start in range(0, total, batch_size):
batch = documents[start:start + batch_size]
print(f"\n📦 处理第 {start // batch_size + 1} 批({len(batch)} 份)")
tasks = [
analyze_one_with_cache(did, content, cache)
for did, content in batch
]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
return results, cache
# ===== 运行测试 =====
if __name__ == "__main__":
# 模拟50份文档 —— 其中有10份是重复的
base_docs = [(i, f"文档{i}的内容——这是一份合同文件") for i in range(1, 41)]
duplicates = [(i, f"文档{i}的内容——这是一份合同文件") for i in range(1, 11)] # 前10份重复
all_docs = base_docs + duplicates # 40 + 10 = 50份,10份重复
print(f"📚 共 {len(all_docs)} 份文档(其中 10 份重复),开始分析...\n")
start = time.time()
results, cache = asyncio.run(batch_analyze_cached(all_docs, batch_size=10))
elapsed = time.time() - start
# 统计缓存命中
hit_count = sum(1 for _, is_hit in results if is_hit)
miss_count = len(results) - hit_count
print(f"\n" + "=" * 50)
print(f"📊 分析完成!")
print(f" 总文档数: {len(results)}")
print(f" 实际分析: {miss_count} 份({hit_count} 份走缓存)")
print(f" 耗时: {elapsed:.1f} 秒")
print(f" 如果全部重跑: {len(results) * 0.5:.0f} 秒")
print()
for key, value in cache.stats().items():
print(f" {key}: {value}")运行
bash
python day4.py你应该看到:
📚 共 50 份文档(其中 10 份重复),开始分析...
📦 处理第 1 批(10 份)
🔍 分析中: 文档 1
🔍 分析中: 文档 2
...
📦 处理第 2 批(10 份)
...
📦 处理第 5 批(10 份) ← 这里包含10份重复文档
⚡ 缓存命中: 文档 1(跳过分析)
⚡ 缓存命中: 文档 2(跳过分析)
...
==================================================
📊 分析完成!
总文档数: 50
实际分析: 40 份(10 份走缓存)
耗时: 2.2 秒
如果全部重跑: 25 秒
缓存条目数: 40
命中次数: 10
未命中次数: 40
命中率: 20.0%那 10 份重复文档——Agent 看都不看直接返回,一行代码没多跑。
你学到了什么
缓存的 Key 设计是一门学问。 这里用 SHA256 哈希——同一份文档内容,无论文件名怎么改,哈希值相同。但如果文档内容变了一个标点符号,哈希值就完全不一样了。方案不完美——真实场景可能需要"相似度判断"而不是"完全相等"——但这已经是生产级缓存的雏形。
跟项目一 MemorySaver 的区别: 项目一的 MemorySaver 记的是"对话",Key 是 thread_id;我们的缓存记的是"分析结果",Key 是文档哈希。一个管用户上下文,一个管计算重复——都是"记忆",但目的完全不同。
缓存在生产环境是标配。 真实的文档分析系统里,缓存层级可能有三层:内存缓存(今天的实现)、文件缓存(存磁盘)、Redis 缓存(分布式共享)。今天你先掌握了最核心的一层。
明天的预告
前四天我们一直在"写代码"。明天的任务完全不同——把代码变成服务。 用 MCP 协议把你的分析能力包装成标准工具,任何系统都能通过 MCP 调你的 Agent。最后用 Streamlit 做一个 Web 界面——上传文档,点按钮,看分析结果。
Day 4 完成。Agent 现在有"记忆"了——分析过的文档不重跑,缓存命中秒出结果。

