Skip to content

Day 4:分析缓存——已经跑过的不重跑

昨天的问题

昨天你同时处理了 50 份文档,速度很快。但仔细想想:如果同一份合同

今天分析了,明天又被上传了一次,Agent 会傻傻地重新跑一遍流程。 天的问题昨天你同时处理了 50 份文档,速度很快。但仔细想想:如果同一份合同今天分析了,明天又被上传了一次,Agent 会傻傻地重新跑一遍流程。

贵吗?一份文档 0.5 秒,看起来不贵。但换个角度看:调用真实 LLM API 是花钱的。如果公司每天有上千份文档流转,重复率 20%,一年下来多花的钱可能上千块。更不用说 Token 消耗、API 配额这些隐形成本。

Agent 需要"记住"分析过的结果。 不是像项目一的 MemorySaver 那样记住对话,而是记住"这份文档我分析过,结果在这里"。

今天做什么

实现一个文档分析缓存。用文档内容的哈希值作为 Key——同一份文档(即使文件名不同),哈希值相同就判等,直接返回缓存。

核心很简单:一个字典 {哈希值: 分析结果}。再加一个统计功能——让你知道缓存命中了多少次,避免了多少次重复分析。

📁 代码目录

doc_analyzer/
├── sample_contract.txt
├── day1.py
├── day2.py
├── day3.py
└── day4.py                ← 新增!带缓存的并发分析,80行

代码

python
# day4.py — 带缓存的并发文档分析

import asyncio
import hashlib
import time
from typing import List, Optional

# ===== Day3 的结构定义(复用)=====
from pydantic import BaseModel, Field

class DocSummary(BaseModel):
    doc_id: int = Field(description="文档编号")
    title: str = Field(description="文档标题")
    summary: str = Field(description="100字以内的摘要")
    category: str = Field(description="分类:合同 / 简历 / 报告 / 其他")
# ===== Day3 结束 =====

# ===== Day4 新增:分析缓存 =====
class AnalysisCache:
    """
    文档分析缓存。
    
    核心思路:
    - 用文档内容的 SHA256 哈希值作为 Key
    - 同一份文档,无论上传几次、改什么文件名,哈希值都一样
    - 缓存命中时直接返回,跳过分析
    """
    
    def __init__(self):
        self._store: dict = {}      # {hash_value: DocSummary}
        self._hits: int = 0          # 缓存命中次数
        self._misses: int = 0        # 缓存未命中次数
    
    def _hash(self, content: str) -> str:
        """计算文档内容的哈希值"""
        return hashlib.sha256(content.encode("utf-8")).hexdigest()
    
    def get(self, content: str) -> Optional[DocSummary]:
        """查询缓存——命中返回结果,未命中返回 None"""
        key = self._hash(content)
        cached = self._store.get(key)
        if cached:
            self._hits += 1
        else:
            self._misses += 1
        return cached
    
    def set(self, content: str, result: DocSummary):
        """存入缓存"""
        key = self._hash(content)
        self._store[key] = result
    
    def stats(self):
        """缓存统计"""
        total = self._hits + self._misses
        hit_rate = self._hits / total * 100 if total > 0 else 0
        return {
            "缓存条目数": len(self._store),
            "命中次数": self._hits,
            "未命中次数": self._misses,
            "命中率": f"{hit_rate:.1f}%"
        }

# ===== Day3 的异步分析(加缓存版)=====
async def analyze_one_with_cache(
    doc_id: int, content: str, cache: AnalysisCache
) -> tuple[DocSummary, bool]:
    """
    异步分析单份文档(优先读缓存)。
    
    返回 (结果, 是否缓存命中)。
    """
    # 先查缓存
    cached = cache.get(content)
    if cached:
        print(f"  ⚡ 缓存命中: 文档 {doc_id}(跳过分析)")
        return cached, True
    
    # 缓存未命中 —— 走分析流程
    print(f"  🔍 分析中: 文档 {doc_id}")
    await asyncio.sleep(0.5)  # 模拟分析耗时
    
    result = DocSummary(
        doc_id=doc_id,
        title=f"文档{doc_id}",
        summary=f"这是文档{doc_id}的内容摘要",
        category="合同"
    )
    cache.set(content, result)  # 存入缓存
    return result, False

async def batch_analyze_cached(documents: List[tuple], batch_size: int = 10):
    """
    批量分析(带缓存)。
    
    documents: [(doc_id, content), ...]
    """
    cache = AnalysisCache()
    results = []
    total = len(documents)
    
    for start in range(0, total, batch_size):
        batch = documents[start:start + batch_size]
        print(f"\n📦 处理第 {start // batch_size + 1} 批({len(batch)} 份)")
        
        tasks = [
            analyze_one_with_cache(did, content, cache)
            for did, content in batch
        ]
        batch_results = await asyncio.gather(*tasks)
        results.extend(batch_results)
    
    return results, cache

# ===== 运行测试 =====
if __name__ == "__main__":
    # 模拟50份文档 —— 其中有10份是重复的
    base_docs = [(i, f"文档{i}的内容——这是一份合同文件") for i in range(1, 41)]
    duplicates = [(i, f"文档{i}的内容——这是一份合同文件") for i in range(1, 11)]  # 前10份重复
    
    all_docs = base_docs + duplicates  # 40 + 10 = 50份,10份重复
    
    print(f"📚 共 {len(all_docs)} 份文档(其中 10 份重复),开始分析...\n")
    
    start = time.time()
    results, cache = asyncio.run(batch_analyze_cached(all_docs, batch_size=10))
    elapsed = time.time() - start
    
    # 统计缓存命中
    hit_count = sum(1 for _, is_hit in results if is_hit)
    miss_count = len(results) - hit_count
    
    print(f"\n" + "=" * 50)
    print(f"📊 分析完成!")
    print(f"   总文档数: {len(results)}")
    print(f"   实际分析: {miss_count} 份({hit_count} 份走缓存)")
    print(f"   耗时: {elapsed:.1f} 秒")
    print(f"   如果全部重跑: {len(results) * 0.5:.0f} 秒")
    print()
    for key, value in cache.stats().items():
        print(f"   {key}: {value}")

运行

bash
python day4.py

你应该看到:

📚 共 50 份文档(其中 10 份重复),开始分析...

📦 处理第 1 批(10 份)
  🔍 分析中: 文档 1
  🔍 分析中: 文档 2
  ...

📦 处理第 2 批(10 份)
  ...

📦 处理第 5 批(10 份)  ← 这里包含10份重复文档
  ⚡ 缓存命中: 文档 1(跳过分析)
  ⚡ 缓存命中: 文档 2(跳过分析)
  ...

==================================================
📊 分析完成!
   总文档数: 50
   实际分析: 40 份(10 份走缓存)
   耗时: 2.2 秒
   如果全部重跑: 25 秒
   
   缓存条目数: 40
   命中次数: 10
   未命中次数: 40
   命中率: 20.0%

那 10 份重复文档——Agent 看都不看直接返回,一行代码没多跑。

你学到了什么

缓存的 Key 设计是一门学问。 这里用 SHA256 哈希——同一份文档内容,无论文件名怎么改,哈希值相同。但如果文档内容变了一个标点符号,哈希值就完全不一样了。方案不完美——真实场景可能需要"相似度判断"而不是"完全相等"——但这已经是生产级缓存的雏形。

跟项目一 MemorySaver 的区别: 项目一的 MemorySaver 记的是"对话",Key 是 thread_id;我们的缓存记的是"分析结果",Key 是文档哈希。一个管用户上下文,一个管计算重复——都是"记忆",但目的完全不同。

缓存在生产环境是标配。 真实的文档分析系统里,缓存层级可能有三层:内存缓存(今天的实现)、文件缓存(存磁盘)、Redis 缓存(分布式共享)。今天你先掌握了最核心的一层。

明天的预告

前四天我们一直在"写代码"。明天的任务完全不同——把代码变成服务。 用 MCP 协议把你的分析能力包装成标准工具,任何系统都能通过 MCP 调你的 Agent。最后用 Streamlit 做一个 Web 界面——上传文档,点按钮,看分析结果。


Day 4 完成。Agent 现在有"记忆"了——分析过的文档不重跑,缓存命中秒出结果。