Vitest Testing
提供 Vitest 单元测试与集成测试的模式与最佳实践,涵盖断言、异步测试与模拟方法。
面向AI代理商业系统的测试与可观测性实践手册,涵盖四层测试金字塔和生产环境监控。
openclaw skills install @mirni/greenhelix-agent-testing-observability命令、参数、文件名以原文为准
注意:本指南为教学用途,包含示例代码。
不执行任何代码,也不安装依赖项。
所有示例均使用 GreenHelix 沙箱(https://sandbox.greenhelix.net),提供 500 个免费积分——无需 API 密钥即可开始使用。
参考凭证(您需在自己的环境中提供):
GREENHELIX_API_KEY:GreenHelix 网关的 API 认证(仅对已购买的 API 工具具有读写权限)AGENT_SIGNING_KEY:代理身份的加密签名密钥(Ed25519 密钥对,用于请求签名)
你的代理商业系统在本地电脑上运行良好,通过了 GreenHelix 沙箱的烟雾测试。你于周五下午部署到生产环境后便回家休息。到了周六早上,重试循环导致创建了 47 个重复的托管资金,一个绩效托管因使用过时指标而提前释放资金,且结算回调 Webhook 静默失败长达六小时,因为目标端点返回 503 错误,却无人监控。这些故障从未被测试覆盖,原因在于传统的测试金字塔——底层是单元测试,中间是集成测试,顶层是端到端测试——并非为自主代理设计。这类代理需在不可靠网络中跨对手方做出金融决策,而对手方自身也可能出现故障。本指南重构适用于代理商业的测试金字塔,并在此基础上叠加生产环境可观测性、混沌测试、告警机制和 CI/CD 流水线。每个模式均配有可运行的 Python 代码,基于 GreenHelix 网关自带的 260 个测试用例,可直接复制到你的项目中。
你的代理商业系统在本地电脑上运行良好,通过了 GreenHelix 沙箱的烟雾测试。你于周五下午部署到生产环境后便回家休息。到了周六早上,重试循环导致创建了 47 个重复的托管资金,一个绩效托管因使用过时指标而提前释放资金,且结算回调 Webhook 静默失败长达六小时,因为目标端点返回 503 错误,却无人监控。这些故障从未被测试覆盖,原因在于传统的测试金字塔——底层是单元测试,中间是集成测试,顶层是端到端测试——并非为自主代理设计。这类代理需在不可靠网络中跨对手方做出金融决策,而对手方自身也可能出现故障。本指南重构适用于代理商业的测试金字塔,并在此基础上叠加生产环境可观测性、混沌测试、告警机制和 CI/CD 流水线。每个模式均配有可运行的 Python 代码,基于 GreenHelix 网关自带的 260 个测试用例,可直接复制到你的项目中。
标准测试金字塔假设你的代码调用函数并返回值。单元测试验证单个函数的行为,集成测试验证模块间的组合是否正确,端到端测试验证完整用户流程。该模型适用于确定性系统,当函数调用不产生金融副作用,且故障模式仅限于“返回错误值”或“抛出异常”的场景。
但代理商业系统违背了上述三个前提。调用 create_escrow 会锁定真实资金,调用 release_escrow 会转移真实金额。一次重试触发两次调用,会产生两个托管资金而非一次错误。其故障模式并非“返回值错误”——而是“代理为同一工作支付两次”、“托管超时但资金仍被锁定”、“网关结算成功但回调通知丢失”。传统单元测试无法发现这些问题,因其在隔离环境中测试代码,未考虑金融状态机的实际影响;传统端到端测试也无法发现,因其仅运行一次正常路径即视为通过。
代理商业需要一个四层测试金字塔,以映射实际的故障模式:
markdown
╱╲
╱ ╲
╱混沌测试╱ 第4层:混沌测试
╱(故障注入、超时、
╱并发负载)
╱ 多智能体工作流 ╲ 第3层:多智能体工作流测试
╱ (Saga模式、回滚、Webhook交付)
╱─────────────────╲
╱ 工具合约测试 ╲ 第2层:工具级合约测试
╱ (Schema、幂等性、权限)
╱────────────────────────╲
╱ 确定性模拟器 ╲ 第1层:基于模拟的单元测试
╱ (快速、离线运行) ╲ (业务逻辑、验证)
╱───────────────────────────────╲
第1层:确定性模拟器 在不调用网关的情况下测试你的业务逻辑。这些测试在毫秒级内完成,可捕捉逻辑错误:金额计算错误、缺失信任检查、状态转换错误。这类测试占总测试量的60%。
第2层:工具合约测试 验证每个 GreenHelix 工具是否接受预期的输入 Schema,返回预期的输出结构,并对无效输入产生正确的错误码。这些测试在沙箱环境中运行,用于发现 API 合约变更。占总测试量的25%。
第3层:多智能体工作流测试 验证完整的业务流程:从市场商品上架,经托管释放,到结算完成。测试 Saga 模式(多步骤失败回滚)和 Webhook 交付机制。占总测试量的10%。
第4层:混沌测试 注入故障——网络超时、随机工具错误、并发重复请求——并验证系统在不造成财务不一致的前提下能够恢复。这类测试仅占总测试量的5%,但能发现最昂贵的缺陷。
本指南中的所有测试均使用 AgentTestHarness 类。它负责管理测试夹具,为第1层提供确定性模拟器,并在第2至第4层切换至沙箱模式。
import pytest
import time
import json
import uuid
import requests
from unittest.mock import MagicMock, patch
from typing import Optional
from dataclasses import dataclass, field
@dataclass
class MockResponse:
"""用于 GreenHelix 工具响应的确定性模拟。"""
tool: str
status: str = "success"
data: dict = field(default_factory=dict)
error_code: Optional[str] = None
error_message: Optional[str] = None
def to_dict(self) -> dict:
result = {"status": self.status}
if self.status == "success":
result.update(self.data)
else:
result["error"] = {
"code": self.error_code or "unknown_error",
"message": self.error_message or "发生错误",
}
return result
class AgentTestHarness:
"""GreenHelix 代理商业系统的测试工具箱。
管理四个层级的代理测试金字塔所需的测试用例、模拟和沙箱连接。
使用示例:
harness = AgentTestHarness(
api_key="test-key",
agent_id="test-agent",
base_url="https://sandbox.greenhelix.net/v1",
)
# 第一层:确定性模拟
harness.mock_tool("get_balance", {"balance": "100.00"})
result = harness.execute("get_balance", {})
assert result["balance"] == "100.00"
# 第二层及以上:沙箱模式
harness.use_sandbox()
result = harness.execute("get_balance", {})
"""
def __init__(
self,
api_key: str,
agent_id: str,
base_url: str = "https://sandbox.greenhelix.net/v1",
):
self.api_key = api_key
self.agent_id = agent_id
self.base_url = base_url
self._mocks: dict[str, MockResponse] = {}
self._call_log: list[dict] = []
self._sandbox_mode = False
self._session = requests.Session()
self._session.headers.update({
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
})
# ── 模式控制 ───────────────────────────────────────────
def use_mocks(self):
"""切换到确定性模拟模式(第 1 层)。"""
self._sandbox_mode = False
def use_sandbox(self):
"""切换到实时沙箱模式(第 2 层及以上)。"""
self._sandbox_mode = True
# ── 模拟注册 ──────────────────────────────────────
def mock_tool(self, tool: str, data: dict, status: str = "success"):
"""为某个工具注册一个确定性的模拟响应。"""
self._mocks[tool] = MockResponse(tool=tool, status=status, data=data)
def mock_tool_error(
self, tool: str, error_code: str, error_message: str
):
"""为某个工具注册一个确定性的错误响应。"""
self._mocks[tool] = MockResponse(
tool=tool,
status="error",
error_code=error_code,
error_message=error_message,
)
def mock_tool_sequence(self, tool: str, responses: list[dict]):
"""为连续调用注册一系列响应。"""
self._mock_sequences = getattr(self, "_mock_sequences", {})
self._mock_sequences[tool] = list(responses)
# ── 执行 ──────────────────────────────────────────────
def execute(self, tool: str, input_data: dict) -> dict:
"""在模拟或沙箱中执行一个工具。"""
call_record = {
"tool": tool,
"input": input_data,
"timestamp": time.time(),
}
if self._sandbox_mode:
resp = self._session.post(
f"{self.base_url}/v1",
json={"tool": tool, "input": input_data},
)
resp.raise_for_status()
result = resp.json()
else:
# 先检查序列
sequences = getattr(self, "_mock_sequences", {})
if tool in sequences and sequences[tool]:
result = sequences[tool].pop(0)
elif tool in self._mocks:
result = self._mocks[tool].to_dict()
else:
raise ValueError(
f"未为工具 '{tool}' 注册模拟。请使用 harness.mock_tool('{tool}', {{...}}) 注册"
)
call_record["result"] = result
self._call_log.append(call_record)
return result
# ── 断言 ─────────────────────────────────────────────
def assert_tool_called(self, tool: str, times: Optional[int] = None):
"""断言某个工具被调用过,可选指定调用次数。"""
calls = [c for c in self._call_log if c["tool"] == tool]
assert len(calls) > 0, f"工具 '{tool}' 从未被调用"
if times is not None:
assert len(calls) == times, (
f"工具 '{tool}' 被调用了 {len(calls)} 次,期望 {times} 次"
)
def assert_tool_not_called(self, tool: str):
"""断言某个工具从未被调用。"""
calls = [c for c in self._call_log if c["tool"] == tool]
assert len(calls) == 0, (
f"工具 '{tool}' 被调用了 {len(calls)} 次,期望 0 次"
)
def assert_call_order(self, tools: list[str]):
"""断言工具按特定顺序被调用。"""
called_tools = [c["tool"] for c in self._call_log]
idx = 0
for tool in tools:
try:
idx = called_tools.index(tool, idx) + 1
except ValueError:
assert False, (
f"期望在位置 {idx} 之后出现 '{tool}'。实际调用顺序:{called_tools}"
)
def get_calls(self, tool: Optional[str] = None) -> list[dict]:
"""获取调用日志,可选按工具名称过滤。"""
if tool:
return [c for c in self._call_log if c["tool"] == tool]
return list(self._call_log)
def reset(self):
"""清除所有模拟和调用历史。"""
self._mocks.clear()
self._call_log.clear()
if hasattr(self, "_mock_sequences"):
self._mock_sequences.clear()将此 conftest.py 文件放入你的测试目录中。本指南中的每个测试文件都会从此文件导入。
# tests/conftest.py
import os
import uuid
import pytest
@pytest.fixture
def api_key():
"""用于沙箱测试的 API 密钥。优先使用环境变量,否则使用测试默认值。"""
return os.environ.get("GREENHELIX_API_KEY", "test-api-key-sandbox")
@pytest.fixture
def base_url():
"""集成测试使用的沙箱 URL。"""
return os.environ.get(
"GREENHELIX_BASE_URL", "https://sandbox.greenhelix.net/v1"
)
@pytest.fixture
def agent_id():
"""每次测试运行时的唯一代理 ID,防止冲突。"""
return f"test-agent-{uuid.uuid4().hex[:12]}"
@pytest.fixture
def buyer_id():
"""唯一的买家代理 ID。"""
return f"test-buyer-{uuid.uuid4().hex[:12]}"
@pytest.fixture
def seller_id():
"""唯一的卖家代理 ID。"""
return f"test-seller-{uuid.uuid4().hex[:12]}"
@pytest.fixture
def harness(api_key, agent_id, base_url):
"""在模拟模式下的 AgentTestHarness。调用 harness.use_sandbox() 可切换为真实环境。"""
h = AgentTestHarness(
api_key=api_key,
agent_id=agent_id,
base_url=base_url,
)
h.use_mocks()
return h
@pytest.fixture
def sandbox_harness(api_key, agent_id, base_url):
"""用于集成测试的沙箱模式 AgentTestHarness。"""
h = AgentTestHarness(
api_key=api_key,
agent_id=agent_id,
base_url=base_url,
)
h.use_sandbox()
return h
@pytest.fixture
def mock_session():
"""为单元测试预配置的 requests.Session 模拟对象。"""
session = MagicMock()
response = MagicMock()
response.status_code = 200
response.json.return_value = {"status": "success"}
response.raise_for_status.return_value = None
session.post.return_value = response
return session
@pytest.fixture
def mock_response():
"""用于创建 MockResponse 对象的工厂 fixture。"""
def _make(tool, data=None, status="success", error_code=None):
return MockResponse(
tool=tool,
status=status,
data=data or {},
error_code=error_code,
)
return _make
# ── 每个类的 fixture,用于隔离的测试套件 ─────────────
class AgentFixtures:
"""提供标准模拟的混合类,适用于代理商业测试。"""
@pytest.fixture(autouse=True)
def setup_agent_mocks(self, harness):
"""在类中每个测试前注册常用模拟。"""
self.harness = harness
harness.mock_tool("get_balance", {"balance": "500.00", "currency": "USD"})
harness.mock_tool("create_wallet", {"wallet_id": "w-test-001", "status": "active"})
harness.mock_tool("register_agent", {"agent_id": harness.agent_id, "status": "registered"})
harness.mock_tool("get_trust_score", {"agent_id": "any", "score": 0.85})
harness.mock_tool("get_budget_status", {
"daily_limit": "100.00",
"spent_today": "25.00",
"remaining": "75.00",
})
class EscrowFixtures(AgentFixtures):
"""用于与托管相关的测试的扩展 fixture。"""
@pytest.fixture(autouse=True)
def setup_escrow_mocks(self, harness):
"""在代理模拟基础上添加托管相关模拟。"""
self.escrow_id = f"escrow-{uuid.uuid4().hex[:8]}"
harness.mock_tool("create_escrow", {
"escrow_id": self.escrow_id,
"status": "funded",
"amount": "50.00",
})
harness.mock_tool("release_escrow", {
"escrow_id": self.escrow_id,
"status": "released",
})
harness.mock_tool("cancel_escrow", {
"escrow_id": self.escrow_id,
"status": "cancelled",
})该测试工具支持两种模式。
markdown
class TestBudgetGuardrails(AgentFixtures):
"""层级 1:使用确定性模拟测试预算逻辑。"""
def test_blocks_escrow_when_over_budget(self, harness):
"""当日预算耗尽时,应阻止创建托管资金。"""
harness.mock_tool("get_budget_status", {
"daily_limit": "100.00",
"spent_today": "99.00",
"remaining": "1.00",
})
budget = harness.execute("get_budget_status", {})
remaining = float(budget["remaining"])
escrow_amount = 50.00
# 业务逻辑:若金额超过剩余额度,不应创建托管
assert escrow_amount > remaining
harness.assert_tool_not_called("create_escrow")
def test_allows_escrow_within_budget(self, harness):
"""当预算允许时,应允许创建托管资金。"""
budget = harness.execute("get_budget_status", {})
remaining = float(budget["remaining"])
escrow_amount = 25.00
assert escrow_amount <= remaining
harness.execute("create_escrow", {
"payer_agent_id": harness.agent_id,
"payee_agent_id": "seller-001",
"amount": str(escrow_amount),
})
harness.assert_tool_called("create_escrow", times=1)
@pytest.mark.sandbox
class TestBudgetGuardrailsSandbox:
"""层级 2:在沙箱环境中验证预算工具的实际表现。"""
def test_budget_cap_enforced(self, sandbox_harness):
"""沙箱环境应拒绝超出预算上限的托管操作。"""
h = sandbox_harness
h.execute("create_wallet", {})
h.execute("deposit", {"amount": "100.00"})
h.execute("set_budget_cap", {
"agent_id": h.agent_id,
"daily_limit": "10.00",
})
# 此操作应失败,因托管金额超过日限额
result = h.execute("create_escrow", {
"payer_agent_id": h.agent_id,
"payee_agent_id": "seller-test",
"amount": "50.00",
})
# 网关在工具层面强制执行预算限制
assert result.get("status") in ("error", "rejected")
每个 GreenHelix 工具都有一个隐式契约:它接受特定的输入格式,返回特定的输出结构,并对无效输入产生已记录的错误码。工具契约测试用于验证这三个方面。当网关更新 API 版本或添加必填字段时,你的契约测试会先于生产代码失败,从而提前发现问题。
class ToolContract:
"""定义 GreenHelix 工具的预期契约。
用于契约测试,以验证沙箱环境中的模式、输出结构和错误行为。
"""
def __init__(
self,
tool: str,
required_fields: list[str],
output_fields: list[str],
error_cases: dict[str, dict],
):
self.tool = tool
self.required_fields = required_fields
self.output_fields = output_fields
self.error_cases = error_cases # {case_name: {input: ..., expected_error: ...}}
# ── 核心工具的契约定义 ─────────────────────────────
BILLING_CONTRACTS = {
"get_balance": ToolContract(
tool="get_balance",
required_fields=[],
output_fields=["balance", "currency"],
error_cases={
"no_wallet": {
"input": {},
"expected_error": "wallet_not_found",
},
},
),
"deposit": ToolContract(
tool="deposit",
required_fields=["amount"],
output_fields=["balance", "transaction_id"],
error_cases={
"negative_amount": {
"input": {"amount": "-10.00"},
"expected_error": "invalid_amount",
},
"zero_amount": {
"input": {"amount": "0"},
"expected_error": "invalid_amount",
},
},
),
"set_budget_cap": ToolContract(
tool="set_budget_cap",
required_fields=["agent_id", "daily_limit"],
output_fields=["agent_id", "daily_limit"],
error_cases={
"negative_limit": {
"input": {"agent_id": "test", "daily_limit": "-50.00"},
"expected_error": "invalid_amount",
},
},
),
}
PAYMENT_CONTRACTS = {
"create_escrow": ToolContract(
tool="create_escrow",
required_fields=["payer_agent_id", "payee_agent_id", "amount"],
output_fields=["escrow_id", "status", "amount"],
error_cases={
"insufficient_funds": {
"input": {
"payer_agent_id": "buyer",
"payee_agent_id": "seller",
"amount": "999999.00",
},
"expected_error": "insufficient_funds",
},
"self_escrow": {
"input": {
"payer_agent_id": "same-agent",
"payee_agent_id": "same-agent",
"amount": "10.00",
},
"expected_error": "invalid_escrow",
},
},
),
"release_escrow": ToolContract(
tool="release_escrow",
required_fields=["escrow_id"],
output_fields=["escrow_id", "status"],
error_cases={
"nonexistent": {
"input": {"escrow_id": "escrow-does-not-exist"},
"expected_error": "escrow_not_found",
},
},
),
}
IDENTITY_CONTRACTS = {
"register_agent": ToolContract(
tool="register_agent",
required_fields=["agent_id", "public_key", "name"],
output_fields=["agent_id", "status"],
error_cases={
"missing_key": {
"input": {"agent_id": "test", "name": "Test"},
"expected_error": "missing_field",
},
},
),
"get_trust_score": ToolContract(
tool="get_trust_score",
required_fields=["agent_id"],
output_fields=["agent_id", "score"],
error_cases={
"nonexistent_agent": {
"input": {"agent_id": "agent-that-does-not-exist-xyz"},
"expected_error": "agent_not_found",
},
},
),
}
MARKETPLACE_CONTRACTS = {
"register_service": ToolContract(
tool="register_service",
required_fields=["name", "description", "endpoint", "price", "tags", "category"],
output_fields=["service_id"],
error_cases={
"missing_name": {
"input": {
"description": "test",
"endpoint": "agent://test",
"price": 10.0,
"tags": [],
"category": "test",
},
"expected_error": "missing_field",
},
},
),
"search_services": ToolContract(
tool="search_services",
required_fields=["query"],
output_fields=["services"],
error_cases={
"empty_query": {
"input": {"query": ""},
"expected_error": "invalid_query",
},
},
),
}@pytest.mark.sandbox
class TestBillingContracts:
"""层级 2:在沙箱环境中验证计费工具的合约"""
@pytest.fixture(autouse=True)
def setup_wallet(self, sandbox_harness):
self.harness = sandbox_harness
self.harness.execute("create_wallet", {})
self.harness.execute("deposit", {"amount": "100.00"})
@pytest.mark.parametrize("tool_name", BILLING_CONTRACTS.keys())
def test_output_shape(self, tool_name):
"""每个计费工具都返回预期的输出字段"""
contract = BILLING_CONTRACTS[tool_name]
# 构建最小有效的输入
valid_input = {}
if tool_name == "deposit":
valid_input = {"amount": "10.00"}
elif tool_name == "set_budget_cap":
valid_input = {
"agent_id": self.harness.agent_id,
"daily_limit": "50.00",
}
result = self.harness.execute(tool_name, valid_input)
for expected_field in contract.output_fields:
assert expected_field in result, (
f"工具 '{tool_name}' 缺少输出字段 '{expected_field}'。"
f"实际返回字段: {list(result.keys())}"
)
@pytest.mark.parametrize("tool_name", BILLING_CONTRACTS.keys())
def test_error_cases(self, tool_name):
"""每个计费工具对无效输入应返回正确的错误码"""
contract = BILLING_CONTRACTS[tool_name]
for case_name, case in contract.error_cases.items():
result = self.harness.execute(tool_name, case["input"])
assert result.get("status") == "error" or "error" in result, (
f"工具 '{tool_name}' 的用例 '{case_name}' 应该失败。"
f"实际结果: {result}"
)
@pytest.mark.sandbox
class TestPaymentContracts:
"""层级 2:在沙箱环境中验证支付工具的合约"""
@pytest.fixture(autouse=True)
def setup_accounts(self, sandbox_harness, buyer_id, seller_id):
self.harness = sandbox_harness
self.buyer_id = buyer_id
self.seller_id = seller_id
@pytest.mark.parametrize("tool_name", PAYMENT_CONTRACTS.keys())
def test_error_cases(self, tool_name):
"""每个支付工具对无效输入应返回正确的错误信息"""
contract = PAYMENT_CONTRACTS[tool_name]
for case_name, case in contract.error_cases.items():
result = self.harness.execute(tool_name, case["input"])
assert result.get("status") == "error" or "error" in result, (
f"支付工具 '{tool_name}' 的用例 '{case_name}' 未正确失败"
)
### 支付工具的幂等性测试
支付工具必须具备幂等性。使用相同参数调用 `create_escrow` 两次不应创建两个资金托管。调用 `release_escrow` 两次也不应导致资金重复支付。本测试通过提交重复请求并验证财务一致性来检验幂等性(P1, P7)。@pytest.mark.sandbox
class TestPaymentIdempotency:
"""层级 2:验证支付工具能安全处理重复调用"""
def test_duplicate_escrow_creation(self, sandbox_harness, buyer_id, seller_id):
"""两次创建相同的托管账户应返回相同的托管 ID"""
h = sandbox_harness
h.execute("create_wallet", {})
h.execute("deposit", {"amount": "200.00"})
escrow_params = {
"payer_agent_id": buyer_id,
"payee_agent_id": seller_id,
"amount": "50.00",
"description": "幂等性测试托管",
"idempotency_key": f"idem-{uuid.uuid4().hex[:8]}",
}
result_1 = h.execute("create_escrow", escrow_params)
result_2 = h.execute("create_escrow", escrow_params)
# 相同的幂等性键应返回相同的托管 ID
assert result_1["escrow_id"] == result_2["escrow_id"]
# 资金余额只能被扣除一次
balance = h.execute("get_balance", {})
assert float(balance["balance"]) == 150.00
def test_duplicate_release(self, sandbox_harness):
"""重复释放同一笔托管资金不应导致双重支付"""
h = sandbox_harness
h.execute("create_wallet", {})
h.execute("deposit", {"amount": "100.00"})
escrow = h.execute("create_escrow", {
"payer_agent_id": h.agent_id,
"payee_agent_id": "seller-test",
"amount": "25.00",
})
escrow_id = escrow["escrow_id"]
release_1 = h.execute("release_escrow", {"escrow_id": escrow_id})
release_2 = h.execute("release_escrow", {"escrow_id": escrow_id})
# 第二次释放应为无操作或返回 already_released
assert release_1.get("status") == "released"
assert release_2.get("status") in ("released", "already_released")
### 权限边界测试
代理只能操作其自身资源。买家不应能释放由其他买家创建的托管资金。卖家不应能取消未指向自己的托管。权限边界测试用于验证这些不变量(P7)。
markdown
---
name: The Agent Testing & Observability Cookbook: Ship Reliable Agent Commerce Systems
version: 1.3.1
description: 本指南提供构建可靠代理商业系统所需的测试与可观测性实践,涵盖权限边界验证、工作流测试及集成验证。
summary: 通过分层测试策略确保代理系统在复杂交互下的安全性与可靠性。
tags:
- agent commerce
- testing
- observability
- security
- workflow
- sandbox
- python
- pytest
---
## 第 8 块:权限边界测试(权限隔离验证)
### 层级 2:验证代理无法访问其他代理的资源@pytest.mark.sandbox
class TestPermissionBoundaries:
"""层级 2:验证代理无法访问其他代理的资源。"""
def test_cannot_release_others_escrow(self, sandbox_harness):
"""代理 A 无法释放由代理 B 创建的托管资金。"""
h = sandbox_harness
# 代理 A 创建托管资金
h.execute("create_wallet", {})
h.execute("deposit", {"amount": "100.00"})
escrow = h.execute("create_escrow", {
"payer_agent_id": h.agent_id,
"payee_agent_id": "seller-x",
"amount": "10.00",
})
# 代理 B(不同测试实例)尝试释放该托管资金
other = AgentTestHarness(
api_key=h.api_key,
agent_id="attacker-agent",
base_url=h.base_url,
)
other.use_sandbox()
result = other.execute("release_escrow", {
"escrow_id": escrow["escrow_id"],
})
assert result.get("status") == "error"
def test_cannot_read_others_balance(self, sandbox_harness):
"""代理 A 无法读取代理 B 的钱包余额。"""
h = sandbox_harness
h.execute("create_wallet", {})
h.execute("deposit", {"amount": "100.00"})
other = AgentTestHarness(
api_key=h.api_key,
agent_id="other-agent",
base_url=h.base_url,
)
other.use_sandbox()
result = other.execute("get_balance", {})
# 应返回其他代理的余额(0),而非我们自己的 100
balance = float(result.get("balance", 0))
assert balance != 100.00
def test_cannot_cancel_others_escrow(self, sandbox_harness):
"""卖家无法取消托管资金 —— 只有买家可以。"""
h = sandbox_harness
h.execute("create_wallet", {})
h.execute("deposit", {"amount": "50.00"})
escrow = h.execute("create_escrow", {
"payer_agent_id": h.agent_id,
"payee_agent_id": "seller-y",
"amount": "10.00",
})
seller = AgentTestHarness(
api_key=h.api_key,
agent_id="seller-y",
base_url=h.base_url,
)
seller.use_sandbox()
result = seller.execute("cancel_escrow", {
"escrow_id": escrow["escrow_id"],
})
assert result.get("status") == "error"
---
## 第三章:工作流与集成测试
### 谩歌测试模式(Saga Test Pattern)
代理商业工作流本质上是“漫歌”(Saga):一种多步骤操作流程,每一步都配有对应的补偿动作。若第 3 步失败,则必须回滚第 1 步和第 2 步。
漫歌测试模式用于验证正常流程路径以及所有可能的失败点,确保系统具备完整的容错能力与状态一致性。class MarketplaceSaga:
"""实现完整的市场商品发布工作流,作为可测试的事务(Saga)。
步骤:
1. 卖家在市场注册服务
2. 买家通过搜索发现该服务
3. 买家检查卖家信誉评分
4. 买家创建托管资金
5. 卖家执行任务(模拟)
6. 买家释放托管资金
7. 买家评价服务
8. 结算完成
补偿操作:
第4步失败 → 无需清理(资金未锁定)
第5步失败 → 取消托管(将资金退还给买家)
第6步失败 → 启动争议处理
"""
def __init__(self, harness: AgentTestHarness, buyer_id: str, seller_id: str):
self.harness = harness
self.buyer_id = buyer_id
self.seller_id = seller_id
self.state = {"step": 0, "completed_steps": []}
def run(self) -> dict:
"""执行完整事务流程,在失败时进行回滚。"""
try:
# 步骤1:注册服务
service = self.harness.execute("register_service", {
"name": "测试摘要服务",
"description": "用于测试的文档摘要功能",
"endpoint": f"agent://{self.seller_id}",
"price": 25.00,
"tags": ["test", "summarization"],
"category": "data-processing",
})
self.state["service_id"] = service["service_id"]
self.state["completed_steps"].append("register_service")
# 步骤2:发现服务
results = self.harness.execute("search_services", {
"query": "test summarization",
})
assert len(results.get("services", [])) > 0
self.state["completed_steps"].append("discover_service")
# 步骤3:信誉检查
trust = self.harness.execute("get_trust_score", {
"agent_id": self.seller_id,
})
if trust.get("score", 0) < 0.5:
return {"status": "aborted", "reason": "low_trust"}
self.state["completed_steps"].append("trust_check")
# 步骤4:创建托管
escrow = self.harness.execute("create_escrow", {
"payer_agent_id": self.buyer_id,
"payee_agent_id": self.seller_id,
"amount": "25.00",
"description": "Saga测试托管资金",
})
self.state["escrow_id"] = escrow["escrow_id"]
self.state["completed_steps"].append("create_escrow")
# 步骤5:模拟任务执行(真实测试中调用卖家端点)
work_result = {"quality": 0.95, "documents_processed": 500}
self.state["completed_steps"].append("work_completed")
# 步骤6:释放托管
release = self.harness.execute("release_escrow", {
"escrow_id": escrow["escrow_id"],
})
self.state["completed_steps"].append("release_escrow")
# 步骤7:评价服务
self.harness.execute("rate_service", {
"service_id": service["service_id"],
"rating": 5,
})
self.state["completed_steps"].append("rate_service")
return {"status": "completed", "state": self.state}
except Exception as e:
return self._compensate(str(e))
def _compensate(self, error: str) -> dict:
"""在失败时回滚已完成的步骤。"""
if "create_escrow" in self.state["completed_steps"]:
escrow_id = self.state.get("escrow_id")
if escrow_id and "release_escrow" not in self.state["completed_steps"]:
self.harness.execute("cancel_escrow", {
"escrow_id": escrow_id,
})
self.state["completed_steps"].append("compensate:cancel_escrow")
return {
"status": "rolled_back",
"error": error,
"state": self.state,
}
### 测试事务流程
python
class TestMarketplaceSaga(EscrowFixtures):
"""层级 3:包含回滚验证的完整市场流程。"""
def test_happy_path(self, harness, buyer_id, seller_id):
"""完整执行 saga 的全部 7 个步骤。"""
harness.mock_tool("register_service", {
"service_id": "svc-test-001",
})
harness.mock_tool("search_services", {
"services": [{"name": "Test Service", "agent_id": seller_id}],
})
harness.mock_tool("rate_service", {"status": "rated"})
saga = MarketplaceSaga(harness, buyer_id, seller_id)
result = saga.run()
assert result["status"] == "completed"
assert len(result["state"]["completed_steps"]) == 7
harness.assert_call_order([
"register_service",
"search_services",
"get_trust_score",
"create_escrow",
"release_escrow",
"rate_service",
])
def test_rollback_on_escrow_failure(self, harness, buyer_id, seller_id):
"""escrow 创建失败时不会留下孤立状态。"""
harness.mock_tool("register_service", {"service_id": "svc-test-002"})
harness.mock_tool("search_services", {
"services": [{"name": "Test", "agent_id": seller_id}],
})
harness.mock_tool_error(
"create_escrow", "insufficient_funds", "余额不足"
)
saga = MarketplaceSaga(harness, buyer_id, seller_id)
result = saga.run()
assert result["status"] == "rolled_back"
assert "create_escrow" not in result["state"]["completed_steps"]
harness.assert_tool_not_called("release_escrow")
def test_rollback_cancels_escrow_on_work_failure(self, harness, buyer_id, seller_id):
"""工作步骤失败会触发 escrow 取消。"""
harness.mock_tool("register_service", {"service_id": "svc-test-003"})
harness.mock_tool("search_services", {
"services": [{"name": "Test", "agent_id": seller_id}],
})
saga = MarketplaceSaga(harness, buyer_id, seller_id)
# 通过在 escrow 创建后注入错误来模拟工作失败
original_execute = harness.execute
call_count = {"n": 0}
def failing_execute(tool, input_data):
call_count["n"] += 1
if tool == "release_escrow":
raise RuntimeError("模拟的工作验证失败")
return original_execute(tool, input_data)
harness.execute = failing_execute
result = saga.run()
assert result["status"] == "rolled_back"
assert "compensate:cancel_escrow" in result["state"]["completed_steps"]
### 订阅生命周期测试
订阅是具有状态的流程:创建、续订、暂停、取消。每个状态转换都必须经过测试,包括边缘情况如余额不足时的续订(P2, P6)。class TestSubscriptionLifecycle(AgentFixtures):
"""层级 3:验证订阅状态转换。"""
def test_full_lifecycle(self, harness):
"""创建 → 续订 → 取消 的完整生命周期。"""
sub_id = f"sub-{uuid.uuid4().hex[:8]}"
harness.mock_tool("create_subscription", {
"subscription_id": sub_id,
"status": "active",
"next_payment_date": "2026-05-06",
})
harness.mock_tool("get_subscription", {
"subscription_id": sub_id,
"status": "active",
"payments_completed": 1,
})
harness.mock_tool("cancel_subscription", {
"subscription_id": sub_id,
"status": "cancelled",
})
# 创建
sub = harness.execute("create_subscription", {
"payer_agent_id": harness.agent_id,
"payee_agent_id": "provider-001",
"amount": "15.00",
"interval": "monthly",
})
assert sub["status"] == "active"
# 检查状态
status = harness.execute("get_subscription", {
"subscription_id": sub_id,
})
assert status["payments_completed"] == 1
# 取消
cancel = harness.execute("cancel_subscription", {
"subscription_id": sub_id,
})
assert cancel["status"] == "cancelled"
harness.assert_call_order([
"create_subscription",
"get_subscription",
"cancel_subscription",
])
def test_renewal_with_insufficient_funds(self, harness):
"""当余额不足时,订阅续订应能优雅失败。"""
harness.mock_tool("get_balance", {"balance": "5.00", "currency": "USD"})
harness.mock_tool_error(
"create_subscription",
"insufficient_funds",
"余额不足以支付订阅金额",
)
result = harness.execute("create_subscription", {
"payer_agent_id": harness.agent_id,
"payee_agent_id": "provider-002",
"amount": "15.00",
"interval": "monthly",
})
assert result.get("status") == "error"
Webhook 是支付事件的主要通知机制。遗漏的 webhook 意味着遗漏结算、遗漏争议截止时间或遗漏订阅续订。应将 webhook 交付测试与它所触发的业务逻辑分开进行(P4)。
class TestWebhookDelivery:
"""层级 3:验证 webhook 注册和事件交付。"""
def test_webhook_registration(self, harness):
"""Webhook 注册应返回 webhook_id。"""
harness.mock_tool("register_webhook", {
"webhook_id": "wh-test-001",
"status": "active",
"events": ["escrow.released", "escrow.disputed"],
})
result = harness.execute("register_webhook", {
"url": "https://test.example.com/webhook",
"events": ["escrow.released", "escrow.disputed"],
})
assert "webhook_id" in result
assert result["status"] == "active"
def test_webhook_event_format(self, harness):
"""Webhook payload 应包含必需字段。"""
harness.mock_tool("get_webhook_logs", {
"logs": [{
"webhook_id": "wh-test-001",
"event_type": "escrow.released",
"payload": {
"escrow_id": "escrow-abc",
"amount": "25.00",
"payer_agent_id": "buyer-1",
"payee_agent_id": "seller-1",
"timestamp": "2026-04-06T12:00:00Z",
},
"delivery_status": "delivered",
"response_code": 200,
}],
})
logs = harness.execute("get_webhook_logs", {"webhook_id": "wh-test-001"})
for log_entry in logs["logs"]:
payload = log_entry["payload"]
assert "escrow_id" in payload
assert "amount" in payload
assert "timestamp" in payload
assert log_entry["delivery_status"] == "delivered"传统软件在下游服务不可用时会优雅降级——显示错误页面。而 Agent 商业系统失败的代价很高。例如,在 release_escrow 操作中发生超时,可能导致资金已在网关释放,但调用方未收到确认,从而引发重复释放尝试。在网络分区期间执行拆分支付,可能导致部分结算。混沌测试通过主动注入这些故障,让你能在生产环境出现之前验证系统是否能正确处理。
ChaosMiddleware 包装了 harness 的 execute 方法,可随机注入各类故障:超时、错误响应、延迟响应和损坏的 payload。该中间件支持按工具和故障类型进行配置。
import random
import time
from dataclasses import dataclass, field
@dataclass
class ChaosConfig:
"""用于在特定工具上注入混沌的配置。"""
timeout_pct: float = 0.0 # % 的调用会超时
error_pct: float = 0.0 # % 的调用返回错误
delay_ms: float = 0.0 # 额外延迟,单位为毫秒
corrupt_pct: float = 0.0 # % 的调用返回被破坏的响应
duplicate_pct: float = 0.0 # % 的调用会执行两次
class ChaosMiddleware:
"""包装 AgentTestHarness._execute,支持可配置的故障注入。
使用示例:
harness = AgentTestHarness(api_key, agent_id, base_url)
chaos = ChaosMiddleware(
harness=harness,
default_config=ChaosConfig(error_pct=10, delay_ms=200),
)
chaos.set_tool_config("create_escrow", ChaosConfig(
timeout_pct=20,
duplicate_pct=5,
))
# 所有调用现在都会经过混沌注入
result = chaos.execute("create_escrow", {...})
"""
def __init__(
self,
harness: AgentTestHarness,
default_config: ChaosConfig = None,
seed: int = None,
):
self.harness = harness
self.default_config = default_config or ChaosConfig()
self._tool_configs: dict[str, ChaosConfig] = {}
self._rng = random.Random(seed)
self._chaos_log: list[dict] = []
def set_tool_config(self, tool: str, config: ChaosConfig):
"""为特定工具设置混沌配置。"""
self._tool_configs[tool] = config
def execute(self, tool: str, input_data: dict) -> dict:
"""使用混沌注入执行一个工具调用。"""
config = self._tool_configs.get(tool, self.default_config)
chaos_event = {"tool": tool, "injection": None, "timestamp": time.time()}
# 检查超时注入
if self._rng.random() * 100 < config.timeout_pct:
chaos_event["injection"] = "timeout"
self._chaos_log.append(chaos_event)
raise TimeoutError(
f"Chaos: 模拟超时,工具 '{tool}'"
)
# 检查错误注入
if self._rng.random() * 100 < config.error_pct:
chaos_event["injection"] = "error"
self._chaos_log.append(chaos_event)
return {
"status": "error",
"error": {
"code": "chaos_injected_error",
"message": f"Chaos: 模拟错误,工具 '{tool}'",
},
}
# 应用延迟
if config.delay_ms > 0:
delay_seconds = config.delay_ms / 1000.0
actual_delay = self._rng.uniform(0, delay_seconds * 2)
time.sleep(actual_delay)
chaos_event["injection"] = f"delay:{actual_delay:.3f}s"
# 执行真实调用
result = self.harness.execute(tool, input_data)
# 检查重复执行
if self._rng.random() * 100 < config.duplicate_pct:
chaos_event["injection"] = "duplicate"
self._chaos_log.append(chaos_event)
# 再次执行 —— 用于测试幂等性
duplicate_result = self.harness.execute(tool, input_data)
return duplicate_result
# 检查响应破坏
if self._rng.random() * 100 < config.corrupt_pct:
chaos_event["injection"] = "corrupt"
self._chaos_log.append(chaos_event)
if isinstance(result, dict):
result["_chaos_corrupted"] = True
# 随机移除一个键以模拟部分响应
keys = [k for k in result.keys() if k != "status"]
if keys:
del result[self._rng.choice(keys)]
self._chaos_log.append(chaos_event)
return result
def get_chaos_log(self) -> list[dict]:
"""获取所有混沌注入的日志。"""
return list(self._chaos_log)
def get_injection_stats(self) -> dict:
"""获取混沌注入的统计摘要。"""
stats = {"total": len(self._chaos_log)}
for entry in self._chaos_log:
injection = entry.get("injection") or "none"
category = injection.split(":")[0]
stats[category] = stats.get(category, 0) + 1
return statsmarkdown
def test_escrow_survives_timeout_retry(self, harness):
"""托管创建在超时后能正确重试。"""
harness.use_mocks()
harness.mock_tool("create_escrow", {
"escrow_id": "escrow-chaos-001",
"status": "funded",
"amount": "50.00",
})
chaos = ChaosMiddleware(
harness=harness,
seed=42,
default_config=ChaosConfig(timeout_pct=50),
)
# 重试循环 —— 生产环境代码应包含此逻辑
max_retries = 5
result = None
for attempt in range(max_retries):
try:
result = chaos.execute("create_escrow", {
"payer_agent_id": "buyer",
"payee_agent_id": "seller",
"amount": "50.00",
"idempotency_key": "idem-chaos-001",
})
break
except TimeoutError:
continue
assert result is not None, "所有重试尝试均超时"
assert result["escrow_id"] == "escrow-chaos-001"def test_no_double_payment_under_duplicates(self, harness):
"""重复混沌注入不会导致双重支付。"""
harness.use_mocks()
call_count = {"n": 0}
original_execute = harness.execute
def counting_execute(tool, input_data):
call_count["n"] += 1
return original_execute(tool, input_data)
harness.execute = counting_execute
harness.mock_tool("release_escrow", {
"escrow_id": "escrow-dup-test",
"status": "released",
})
chaos = ChaosMiddleware(
harness=harness,
seed=99,
default_config=ChaosConfig(duplicate_pct=100),
)
result = chaos.execute("release_escrow", {
"escrow_id": "escrow-dup-test",
})
# 中间件调用了 execute 两次,但结果应仍表示一次释放
assert result["status"] == "released"def test_concurrent_escrow_load(self, harness):
"""并发托管创建不会引发竞态条件。"""
import concurrent.futures
harness.use_mocks()
harness.mock_tool("create_escrow", {
"escrow_id": "will-be-unique",
"status": "funded",
"amount": "10.00",
})
chaos = ChaosMiddleware(
harness=harness,
seed=7,
default_config=ChaosConfig(delay_ms=50, error_pct=10),
)
results = []
errors = []
def create_escrow(i):
try:
return chaos.execute("create_escrow", {
"payer_agent_id": "buyer",
"payee_agent_id": "seller",
"amount": "10.00",
"idempotency_key": f"concurrent-{i}",
})
except Exception as e:
return {"status": "error", "message": str(e)}
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
futures = [pool.submit(create_escrow, i) for i in range(20)]
for f in concurrent.futures.as_completed(futures):
r = f.result()
if r.get("status") == "error":
errors.append(r)
else:
results.append(r)
# 尽管存在混沌,仍应有部分成功
assert len(results) > 0
# 混沌环境下出现错误是预期行为 —— 验证其被妥善处理
total = len(results) + len(errors)
assert total == 20def test_escrow_timeout_deadline(self, harness):
"""托管必须在截止时间前释放或取消。"""
harness.use_mocks()
deadline = time.time() + 2 # 2秒截止时间(测试用)
harness.mock_tool("create_escrow", {
"escrow_id": "escrow-deadline",
"status": "funded",
"deadline": deadline,
})
harness.mock_tool("cancel_escrow", {
"escrow_id": "escrow-deadline",
"status": "cancelled",
"reason": "deadline_exceeded",
})
escrow = harness.execute("create_escrow", {
"payer_agent_id": "buyer",
"payee_agent_id": "seller",
"amount": "30.00",
})
# 模拟截止时间到达
time.sleep(0.1) # 在真实测试中应使用时间模拟
current_time = time.time()
if current_time < deadline:
# 仍在截止时间内 —— 释放有效
harness.mock_tool("release_escrow", {
"escrow_id": "escrow-deadline",
"status": "released",
})
result = harness.execute("release_escrow", {
"escrow_id": escrow["escrow_id"],
})
assert result["status"] == "released"
else:
# 截止时间已过 —— 应自动取消
result = harness.execute("cancel_escrow", {
"escrow_id": escrow["escrow_id"],
})
assert result["status"] == "cancelled"当支付在生产环境中失败时,你需要立即知道三件事:哪个工具失败了、耗时多久、输入数据是什么。这些信息必须结构化、可搜索,并且无需通过 SSH 登录服务器即可获取。AgentTracer 会为每个 _execute 调用添加时间记录、成功/失败状态追踪和结构化输出。
import time
import json
import logging
from dataclasses import dataclass, field
from typing import Optional, Callable
@dataclass
class TraceRecord:
"""单次工具执行的追踪记录。"""
tool: str
agent_id: str
started_at: float
ended_at: float
duration_ms: float
success: bool
input_data: dict
output_data: Optional[dict] = None
error: Optional[str] = None
trace_id: str = ""
def to_dict(self) -> dict:
return {
"trace_id": self.trace_id,
"tool": self.tool,
"agent_id": self.agent_id,
"started_at": self.started_at,
"ended_at": self.ended_at,
"duration_ms": round(self.duration_ms, 2),
"success": self.success,
"error": self.error,
}
class AgentTracer:
"""封装 _execute 并记录执行时间、成功/失败状态、工具名称。
为生产环境中的每个工具调用提供结构化可观测性。
使用示例:
tracer = AgentTracer(
api_key="...",
agent_id="production-buyer-01",
base_url="https://api.greenhelix.net/v1",
)
# 包装现有的 AgentCommerce 或测试框架
result = tracer.trace("create_escrow", {
"payer_agent_id": "buyer",
"payee_agent_id": "seller",
"amount": "50.00",
})
# 获取指标数据
print(tracer.get_metrics())
# {'total_calls': 47, 'success_rate': 0.957,
# 'avg_latency_ms': 142.3, 'p99_latency_ms': 890.1,
# 'error_rate_by_tool': {'create_escrow': 0.02}}
"""
def __init__(
self,
api_key: str,
agent_id: str,
base_url: str = "https://api.greenhelix.net/v1",
logger: Optional[logging.Logger] = None,
on_slow_call: Optional[Callable] = None,
slow_threshold_ms: float = 2000.0,
):
self.api_key = api_key
self.agent_id = agent_id
self.base_url = base_url
self.logger = logger or logging.getLogger("agent_tracer")
self.on_slow_call = on_slow_call
self.slow_threshold_ms = slow_threshold_ms
self._traces: list[TraceRecord] = []
self._session = requests.Session()
self._session.headers.update({
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
})
def trace(self, tool: str, input_data: dict) -> dict:
"""以完整追踪方式执行一个工具调用。"""
trace_id = f"trace-{uuid.uuid4().hex[:12]}"
started_at = time.time()
try:
resp = self._session.post(
f"{self.base_url}/v1",
json={"tool": tool, "input": input_data},
)
resp.raise_for_status()
result = resp.json()
success = result.get("status") != "error"
error = None if success else json.dumps(result.get("error", {}))
except Exception as e:
result = {"status": "error", "error": str(e)}
success = False
error = str(e)
ended_at = time.time()
duration_ms = (ended_at - started_at) * 1000
record = TraceRecord(
tool=tool,
agent_id=self.agent_id,
started_at=started_at,
ended_at=ended_at,
duration_ms=duration_ms,
success=success,
input_data=input_data,
output_data=result if success else None,
error=error,
trace_id=trace_id,
)
self._traces.append(record)
# 结构化日志输出
self.logger.info(json.dumps({
"event": "tool_execution",
"trace_id": trace_id,
"tool": tool,
"agent_id": self.agent_id,
"duration_ms": round(duration_ms, 2),
"success": success,
"error": error,
}))
# 慢调用回调处理
if duration_ms > self.slow_threshold_ms and self.on_slow_call:
self.on_slow_call(record)
return result
def get_metrics(self) -> dict:
"""从追踪记录中计算聚合指标。"""
if not self._traces:
return {"total_calls": 0}
total = len(self._traces)
successes = sum(1 for t in self._traces if t.success)
durations = sorted(t.duration_ms for t in self._traces)
# 按工具统计错误率
tool_calls: dict[str, dict] = {}
for t in self._traces:
if t.tool not in tool_calls:
tool_calls[t.tool] = {"total": 0, "errors": 0}
tool_calls[t.tool]["total"] += 1
if not t.success:
tool_calls[t.tool]["errors"] += 1
error_rate_by_tool = {
tool: stats["errors"] / stats["total"]
for tool, stats in tool_calls.items()
if stats["errors"] > 0
}
# 按工具统计延迟
tool_latencies: dict[str, list[float]] = {}
for t in self._traces:
tool_latencies.setdefault(t.tool, []).append(t.duration_ms)
avg_latency_by_tool = {
tool: round(sum(lats) / len(lats), 2)
for tool, lats in tool_latencies.items()
}
return {
"total_calls": total,
"success_rate": round(successes / total, 4),
"avg_latency_ms": round(sum(durations) / total, 2),
"p50_latency_ms": round(durations[total // 2], 2),
"p95_latency_ms": round(durations[int(total * 0.95)], 2),
"p99_latency_ms": round(durations[int(total * 0.99)], 2),
"error_rate_by_tool": error_rate_by_tool,
"avg_latency_by_tool": avg_latency_by_tool,
}
def get_traces(
self,
tool: Optional[str] = None,
success: Optional[bool] = None,
min_duration_ms: Optional[float] = None,
) -> list[dict]:
"""按可选条件筛选追踪记录。"""
filtered = self._traces
if tool:
filtered = [t for t in filtered if t.tool == tool]
if success is not None:
filtered = [t for t in filtered if t.success == success]
if min_duration_ms is not None:
filtered = [t for t in filtered if t.duration_ms >= min_duration_ms]
return [t.to_dict() for t in filtered]
def get_revenue_metrics(self) -> dict:
"""从追踪记录中提取与收入相关的指标。"""
escrow_creates = [
t for t in self._traces
if t.tool == "create_escrow" and t.success
]
escrow_releases = [
t for t in self._traces
if t.tool == "release_escrow" and t.success
]
deposits = [
t for t in self._traces
if t.tool == "deposit" and t.success
]
total_escrowed = sum(
float(t.input_data.get("amount", 0))
for t in escrow_creates
)
total_deposited = sum(
float(t.input_data.get("amount", 0))
for t in deposits
)
return {
"escrows_created": len(escrow_creates),
"escrows_released": len(escrow_releases),
"total_escrowed": round(total_escrowed, 2),
"total_deposited": round(total_deposited, 2),
"release_rate": (
round(len(escrow_releases) / len(escrow_creates), 4)
if escrow_creates else 0
),
}The tracer 的结构化日志可与任意日志聚合系统(如 Datadog、ELK、CloudWatch)集成。每条日志行均为包含一致字段的 JSON 对象。关键决策:记录每次调用的工具名称和耗时,但在生产环境中对输入数据进行脱敏处理,以避免记录敏感信息(如 API 密钥或钱包金额)。仅在预发环境(staging)中启用完整的输入输出日志记录(遵循 P7 安全模式)。
# 生产环境日志配置
import logging
def configure_production_logging():
"""为代理商业系统设置结构化 JSON 日志。"""
logger = logging.getLogger("agent_tracer")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
return logger
def configure_staging_logging():
"""预发环境日志器,支持完整输入/输出捕获。"""
logger = logging.getLogger("agent_tracer")
logger.setLevel(logging.DEBUG)
handler = logging.FileHandler("/var/log/agent-commerce/traces.jsonl")
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)
return logger从 tracer 中提取所需指标,用于 Grafana 或 Datadog 仪表盘。每个代理商业系统都应监控以下五个核心指标。
class MetricsExporter:
"""将 AgentTracer 指标导出至监控系统。"""
def __init__(self, tracer: AgentTracer):
self.tracer = tracer
def export_prometheus(self) -> str:
"""以 Prometheus 文本格式导出指标。"""
metrics = self.tracer.get_metrics()
revenue = self.tracer.get_revenue_metrics()
lines = [
f'agent_commerce_calls_total {metrics["total_calls"]}',
f'agent_commerce_success_rate {metrics["success_rate"]}',
f'agent_commerce_latency_p50_ms {metrics.get("p50_latency_ms", 0)}',
f'agent_commerce_latency_p99_ms {metrics.get("p99_latency_ms", 0)}',
f'agent_commerce_escrows_created {revenue["escrows_created"]}',
f'agent_commerce_escrows_released {revenue["escrows_released"]}',
f'agent_commerce_total_escrowed {revenue["total_escrowed"]}',
f'agent_commerce_release_rate {revenue["release_rate"]}',
]
for tool, rate in metrics.get("error_rate_by_tool", {}).items():
lines.append(
f'agent_commerce_error_rate{{tool="{tool}"}} {rate}'
)
return "\n".join(lines)
def to_datadog_events(self) -> list[dict]:
"""将失败的追踪记录格式化为 Datadog 事件。"""
failed = self.tracer.get_traces(success=False)
return [
{
"title": f"工具调用失败: {t['tool']}",
"text": t.get("error", "未知错误"),
"tags": [
f"tool:{t['tool']}",
f"agent:{t['agent_id']}",
"service:agent-commerce",
],
"alert_type": "error",
}
for t in failed
]并非所有错误都需立即响应。以下六种情况若未及时处理,可能导致代理商业系统的财务损失。
| 告警类型 | 触发条件 | 严重级别 | 处理建议 |
|---|---|---|---|
| 结算失败 | release_escrow 调用连续返回错误超过 3 次 | 严重 | 检查支付网关状态,暂停新资金托管 |
| 资金托管超时 | 托管资金未到账或未释放,超出截止时间 | 高 | 自动取消或升级至争议处理流程 |
| 账户余额异常 | 单笔交易导致余额下降超过 50% | 严重 | 暂停代理运行,审计最近调用记录 |
| 信誉分下降 | 信任评分低于阈值 | 中等 | 暂停雇佣,调查相关指标 |
| 回调通知失败 | 连续超过 5 次回调发送失败 | 高 | 检查目标地址,启用重试队列 |
| 重复付款 | 同一 escrow_id 在 60 秒内被释放两次 | 严重 | 立即停止操作,审计账本记录 |
HealthChecker 会向沙箱环境(或生产环境中的专用健康检查代理)发起模拟交易,验证整个支付链路是否正常运行。建议由监控系统每 60 秒执行一次。
# HealthChecker:代理商业系统的合成交易健康检查
用于代理商业系统的合成交易健康检查。
执行一个微型托管生命周期(创建钱包 → 存款 → 创建托管 → 释放 → 验证余额),并报告通过/失败状态及延迟指标。
## 使用方法checker = HealthChecker(
api_key="health-check-key",
agent_id="health-check-agent",
base_url="https://sandbox.greenhelix.net/v1",
)
result = checker.run_health_check()
## 类定义class HealthChecker:
"""Synthetic transaction health checks for agent commerce.
Runs a mini escrow lifecycle (create wallet → deposit → create
escrow → release → verify balance) and reports pass/fail with
latency metrics.
Usage:
checker = HealthChecker(
api_key="health-check-key",
agent_id="health-check-agent",
base_url="https://sandbox.greenhelix.net/v1",
)
result = checker.run_health_check()
# {
# "healthy": True,
# "checks": {
# "wallet": {"status": "pass", "latency_ms": 45.2},
# "deposit": {"status": "pass", "latency_ms": 78.1},
# "escrow_create": {"status": "pass", "latency_ms": 112.4},
# "escrow_release": {"status": "pass", "latency_ms": 95.6},
# "balance_verify": {"status": "pass", "latency_ms": 41.0},
# },
# "total_latency_ms": 372.3,
# }
"""
def __init__(
self,
api_key: str,
agent_id: str,
base_url: str = "https://sandbox.greenhelix.net/v1",
timeout_ms: float = 5000.0,
):
self.api_key = api_key
self.agent_id = agent_id
self.base_url = base_url
self.timeout_ms = timeout_ms
self._session = requests.Session()
self._session.headers.update({
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
})
def _timed_execute(self, tool: str, input_data: dict) -> tuple[dict, float]:
"""Execute a tool and return (result, latency_ms)."""
start = time.time()
resp = self._session.post(
f"{self.base_url}/v1",
json={"tool": tool, "input": input_data},
timeout=self.timeout_ms / 1000.0,
)
latency_ms = (time.time() - start) * 1000
resp.raise_for_status()
return resp.json(), latency_ms
def run_health_check(self) -> dict:
"""Run a full synthetic transaction health check."""
checks = {}
healthy = True
health_agent = f"{self.agent_id}-{int(time.time())}"
# Check 1: Wallet creation
try:
result, latency = self._timed_execute("create_wallet", {})
checks["wallet"] = {"status": "pass", "latency_ms": round(latency, 2)}
except Exception as e:
checks["wallet"] = {"status": "fail", "error": str(e)}
healthy = False
# Check 2: Deposit
try:
result, latency = self._timed_execute("deposit", {"amount": "1.00"})
checks["deposit"] = {"status": "pass", "latency_ms": round(latency, 2)}
except Exception as e:
checks["deposit"] = {"status": "fail", "error": str(e)}
healthy = False
# Check 3: Escrow creation
escrow_id = None
try:
result, latency = self._timed_execute("create_escrow", {
"payer_agent_id": health_agent,
"payee_agent_id": f"{health_agent}-payee",
"amount": "0.01",
"description": "Health check escrow",
})
escrow_id = result.get("escrow_id")
checks["escrow_create"] = {"status": "pass", "latency_ms": round(latency, 2)}
except Exception as e:
checks["escrow_create"] = {"status": "fail", "error": str(e)}
healthy = False
# Check 4: Escrow release
if escrow_id:
try:
result, latency = self._timed_execute("release_escrow", {
"escrow_id": escrow_id,
})
checks["escrow_release"] = {"status": "pass", "latency_ms": round(latency, 2)}
except Exception as e:
checks["escrow_release"] = {"status": "fail", "error": str(e)}
healthy = False
else:
checks["escrow_release"] = {"status": "skip", "reason": "no escrow_id"}
# Check 5: Balance verification
try:
result, latency = self._timed_execute("get_balance", {})
checks["balance_verify"] = {"status": "pass", "latency_ms": round(latency, 2)}
except Exception as e:
checks["balance_verify"] = {"status": "fail", "error": str(e)}
healthy = False
total_latency = sum(
c.get("latency_ms", 0) for c in checks.values()
)
return {
"healthy": healthy,
"checks": checks,
"total_latency_ms": round(total_latency, 2),
"timestamp": time.time(),
"agent_id": self.agent_id,
}
def run_and_alert(self, alert_callback: Callable = None) -> dict:
"""Run health check and trigger alert callback on failure."""
result = self.run_health_check()
if not result["healthy"] and alert_callback:
failed_checks = {
name: check for name, check in result["checks"].items()
if check.get("status") == "fail"
}
alert_callback({
"severity": "critical",
"title": "Agent Commerce Health Check Failed",
"failed_checks": failed_checks,
"timestamp": result["timestamp"],
})
return result
### 测试 HealthChecker
- 执行完整的合成交易健康检查流程。
- 每个步骤独立捕获状态和延迟时间。
- 若任一环节失败,标记为 `fail`;若跳过(如无托管 ID),标记为 `skip`。
- 返回结果包含整体健康状态、各步骤详情、总延迟时间与时间戳。
- 支持通过 `run_and_alert` 方法在检测失败时触发告警回调。
python
class TestHealthChecker:
"""验证健康检查器本身是否正常工作。"""
def test_reports_healthy_when_all_pass(self, harness):
"""所有检查通过时,返回 healthy=True。"""
harness.use_mocks()
harness.mock_tool("create_wallet", {"wallet_id": "w-health"})
harness.mock_tool("deposit", {"balance": "1.00", "transaction_id": "tx-h"})
harness.mock_tool("create_escrow", {
"escrow_id": "escrow-health", "status": "funded",
})
harness.mock_tool("release_escrow", {
"escrow_id": "escrow-health", "status": "released",
})
harness.mock_tool("get_balance", {"balance": "0.99"})
# HealthChecker 在测试模式下委托给 harness
checker = HealthChecker(
api_key=harness.api_key,
agent_id=harness.agent_id,
base_url=harness.base_url,
)
# 在真实测试中,需 patch _timed_execute 以使用 harness
# 此处仅验证结构
assert checker.timeout_ms == 5000.0
def test_reports_unhealthy_on_escrow_failure(self):
"""Escrow 检查失败时,返回 healthy=False。"""
checker = HealthChecker(
api_key="test",
agent_id="test",
base_url="https://sandbox.greenhelix.net/v1",
)
alerts_received = []
def mock_alert(alert):
alerts_received.append(alert)
# Patch 以模拟失败 —— 真实代码中应使用沙箱环境
original = checker._timed_execute
def failing_execute(tool, input_data):
if tool == "create_escrow":
raise ConnectionError("Gateway timeout")
return original(tool, input_data)
checker._timed_execute = failing_execute
try:
result = checker.run_and_alert(alert_callback=mock_alert)
except Exception:
pass # 预期行为,因 wallet/deposit 也失败
### Webhook 失败消息队列(死信队列)
当 webhook 交付失败时,事件不应被静默丢弃。需实现一个死信队列,用于捕获失败的交付以供重试(P4)。class WebhookDeadLetterQueue:
"""捕获失败的 webhook 交付,供手动重试。"""
def __init__(self, max_retries: int = 3, retry_delay_seconds: float = 60.0):
self.max_retries = max_retries
self.retry_delay_seconds = retry_delay_seconds
self._queue: list[dict] = []
def enqueue(self, event: dict, error: str):
"""将失败的 webhook 事件加入死信队列。"""
self._queue.append({
"event": event,
"error": error,
"retries": 0,
"enqueued_at": time.time(),
"next_retry_at": time.time() + self.retry_delay_seconds,
})
def get_pending(self) -> list[dict]:
"""获取已准备好重试的事件。"""
now = time.time()
return [
entry for entry in self._queue
if entry["retries"] < self.max_retries
and entry["next_retry_at"] <= now
]
def mark_delivered(self, event_id: str):
"""移除已成功重试的事件。"""
self._queue = [
e for e in self._queue
if e["event"].get("event_id") != event_id
]
def mark_retried(self, event_id: str):
"""增加重试次数并安排下次尝试。"""
for entry in self._queue:
if entry["event"].get("event_id") == event_id:
entry["retries"] += 1
entry["next_retry_at"] = (
time.time()
+ self.retry_delay_seconds * (2 ** entry["retries"])
)
break
def get_dead_letters(self) -> list[dict]:
"""获取已耗尽所有重试机会的事件。"""
return [
entry for entry in self._queue
if entry["retries"] >= self.max_retries
]
---
## 第七章:Agent 系统的 CI/CD
### 在 GitHub Actions 中运行测试
Agent 商业系统测试需要三个标准 CI 不具备的功能:GreenHelix API 密钥用于沙箱测试、并行测试运行之间的隔离(唯一 agent ID)、以及对 `sandbox.greenhelix.net` 的网络访问权限。此 GitHub Actions 模板可处理全部三项需求。name: Agent Commerce Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
env:
GREENHELIX_API_KEY: ${{ secrets.GREENHELIX_API_KEY }}
GREENHELIX_BASE_URL: https://sandbox.greenhelix.net/v1
jobs:
# ── 第一层:快速的模拟测试(无需网络) ──────────
unit-tests:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: 设置 Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: 安装依赖
run: |
pip install pytest requests cryptography
- name: 运行单元测试(第一层)
run: |
pytest tests/ -x -q \
-m "not sandbox and not chaos" \
--tb=short \
--junit-xml=results/unit-tests.xml
- name: 上传测试结果
if: always()
uses: actions/upload-artifact@v4
with:
name: unit-test-results
path: results/unit-tests.xml
# ── 第二层:对沙箱环境的契约测试 ──────────────
contract-tests:
runs-on: ubuntu-latest
needs: unit-tests
steps:
- uses: actions/checkout@v4
- name: 设置 Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: 安装依赖
run: pip install pytest requests cryptography
- name: 运行契约测试(第二层)
run: |
pytest tests/ -x -q \
-m "sandbox and not chaos" \
--tb=short \
--junit-xml=results/contract-tests.xml
env:
GREENHELIX_API_KEY: ${{ secrets.GREENHELIX_API_KEY }}
- name: 上传测试结果
if: always()
uses: actions/upload-artifact@v4
with:
name: contract-test-results
path: results/contract-tests.xml
# ── 第三层 + 第四层:集成测试与混沌测试 ─────────────
integration-tests:
runs-on: ubuntu-latest
needs: contract-tests
steps:
- uses: actions/checkout@v4
- name: 设置 Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: 安装依赖
run: pip install pytest requests cryptography
- name: 运行集成测试(第三层)
run: |
pytest tests/ -x -q \
-m "integration" \
--tb=short \
--junit-xml=results/integration-tests.xml
env:
GREENHELIX_API_KEY: ${{ secrets.GREENHELIX_API_KEY }}
- name: 运行混沌测试(第四层)
run: |
pytest tests/ -q \
-m "chaos" \
--tb=short \
--junit-xml=results/chaos-tests.xml
env:
GREENHELIX_API_KEY: ${{ secrets.GREENHELIX_API_KEY }}
continue-on-error: true # 混沌测试可能包含预期失败
- name: 上传所有测试结果
if: always()
uses: actions/upload-artifact@v4
with:
name: integration-test-results
path: results/
# ── 对预发布环境的健康检查 ─────────────────────────
staging-health:
runs-on: ubuntu-latest
needs: integration-tests
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: 设置 Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
- name: 安装依赖
run: pip install requests
- name: 在预发布环境运行健康检查
run: |
python -c "
from health_checker import HealthChecker
import json, sys
checker = HealthChecker(
api_key='${{ secrets.GREENHELIX_API_KEY }}',
agent_id='ci-health-check',
base_url='https://sandbox.greenhelix.net/v1',
)
result = checker.run_health_check()
print(json.dumps(result, indent=2))
if not result['healthy']:
print('HEALTH CHECK FAILED')
sys.exit(1)
print('HEALTH CHECK PASSED')
"
sandbox.greenhelix.net 的沙箱环境与生产 API 完全一致。请将其作为您的预发布环境使用。第 2 至第 4 层的所有测试均在此环境中运行。沙箱每天夜间重置余额,因此不要依赖 CI 运行之间状态的持久性。每次运行时,请使用 conftest 中的 agent_id 固定用例生成唯一的代理 ID(P1)。
# pytest.ini 或 pyproject.toml
# [tool.pytest.ini_options]
# markers:
# sandbox: 需要 GreenHelix 沙箱环境的测试
# chaos: 注入故障的混沌测试
# integration: 多步骤工作流集成测试
@pytest.fixture
def ci_agent_id():
"""生成一个 CI 唯一的代理 ID,防止冲突。"""
run_id = os.environ.get("GITHUB_RUN_ID", uuid.uuid4().hex[:8])
return f"ci-agent-{run_id}-{uuid.uuid4().hex[:6]}"在部署代理商业功能变更时,采用灰度发布模式:将 5% 的流量导向新版本,监控 15 分钟内的 AgentTracer 指标,然后决定是否正式上线或回滚。
class CanaryDeployment:
"""代理商业系统灰度发布控制器。"""
def __init__(
self,
canary_tracer: AgentTracer,
stable_tracer: AgentTracer,
promotion_threshold: float = 0.95,
):
self.canary = canary_tracer
self.stable = stable_tracer
self.promotion_threshold = promotion_threshold
def evaluate(self) -> dict:
"""对比灰度环境指标与稳定基线。"""
canary_metrics = self.canary.get_metrics()
stable_metrics = self.stable.get_metrics()
if canary_metrics["total_calls"] < 10:
return {"decision": "waiting", "reason": "insufficient_data"}
canary_success = canary_metrics.get("success_rate", 0)
stable_success = stable_metrics.get("success_rate", 1)
canary_latency = canary_metrics.get("p99_latency_ms", 0)
stable_latency = stable_metrics.get("p99_latency_ms", 1)
# 灰度成功率必须不低于稳定版本的98%
success_ok = canary_success >= stable_success * 0.98
# 灰度延迟不能比稳定版本增加超过20%
latency_ok = canary_latency <= stable_latency * 1.20
if success_ok and latency_ok:
return {"decision": "promote", "canary_success": canary_success}
else:
return {
"decision": "rollback",
"reason": (
f"success: {canary_success} vs {stable_success}, "
f"p99: {canary_latency}ms vs {stable_latency}ms"
),
}在 CI 运行之间跟踪关键指标,以在问题进入生产前发现回归。将指标作为 CI 构建产物保存,并与上一次运行进行对比。
class RegressionDetector:
"""检测 CI 运行之间的指标回归。"""
def __init__(self, baseline_metrics: dict, current_metrics: dict):
self.baseline = baseline_metrics
self.current = current_metrics
def check(self) -> list[dict]:
"""返回检测到的回归列表。"""
regressions = []
# 成功率下降即为回归(允许0.01的容忍)
baseline_sr = self.baseline.get("success_rate", 1.0)
current_sr = self.current.get("success_rate", 1.0)
if current_sr < baseline_sr - 0.01:
regressions.append({
"metric": "success_rate",
"baseline": baseline_sr,
"current": current_sr,
"delta": current_sr - baseline_sr,
})
# 延迟回归(p95延迟增加超过20%)
baseline_p95 = self.baseline.get("p95_latency_ms", 0)
current_p95 = self.current.get("p95_latency_ms", 0)
if baseline_p95 > 0 and current_p95 > baseline_p95 * 1.20:
regressions.append({
"metric": "p95_latency_ms",
"baseline": baseline_p95,
"current": current_p95,
"delta_pct": round(
(current_p95 - baseline_p95) / baseline_p95 * 100, 1
),
})
# 每个工具的错误率上升(超过5%视为回归)
baseline_errors = self.baseline.get("error_rate_by_tool", {})
current_errors = self.current.get("error_rate_by_tool", {})
for tool, current_rate in current_errors.items():
baseline_rate = baseline_errors.get(tool, 0)
if current_rate > baseline_rate + 0.05:
regressions.append({
"metric": f"error_rate:{tool}",
"baseline": baseline_rate,
"current": current_rate,
})
return regressions本指南涵盖了代理测试的四层金字塔、工具合约测试、基于 Saga 的工作流测试、混沌故障注入、使用 AgentTracer 实现生产环境可观测性、通过 HealthChecker 实现告警,以及与 GitHub Actions 的 CI/CD 集成。四个核心类——AgentTestHarness、ChaosMiddleware、AgentTracer 和 HealthChecker——共同构成一个可靠性栈,从开发到生产全程包裹每一个 GreenHelix 工具调用。
GreenHelix 网关自身的测试套件(共 260+ 项测试,覆盖 9 个模块,其中网关本身包含 1,353 项测试)也采用了此处描述的相同模式:使用确定性模拟处理业务逻辑,通过沙箱集成测试验证合约,通过类混沌故障注入测试支付幂等性。这些模式并非理论设想——它们均来自保护网关本身的测试基础设施。
如需了解本测试所保护的商业模式,请参阅以下配套指南:
AgentCommerce 类、托管模式、市场发现、订阅机制和争议解决。AgentFinOps 类、按代理预算上限、Webhook 告警、舰队仪表盘和成本归因。AgentVerifier 类、五层信任体系和持续声誉监控。SecureAgent 和 SecurityMonitor。AgentDeveloper、AgentDBA 和 AgentBilling 实现自主微 SaaS 创建。沙箱 sandbox.greenhelix.net 可免费使用,支持任意 API 密钥。请对它运行 HealthChecker。部署 GitHub Actions 模板。使用 ChaosMiddleware 故意制造故障。本指南中的模式可直接复制、调整并立即部署。
本手册及另外八本配套指南均以完整代码包形式提供。每本指南介绍一个可投入生产的 Python 类,共同覆盖代理电商系统的全生命周期:构建、安全、测试、监控与扩展。
如需完整的 API 参考和工具目录(共 128 个工具),请访问 GreenHelix 开发者文档:[https://api.greenhelix.net/docs](https://api.greenhelix.net/docs)。
*价格:$29 | 格式:数字指南 | 更新:终身访问*
已收录 3 个 Skill