The Agent Testing & Observability Cookbook: Ship Reliable Agent Commerce Systems

面向AI代理商业系统的测试与可观测性实践手册,涵盖四层测试金字塔和生产环境监控。

已扫描
适合谁
AI代理系统开发者、区块链/金融科技工程师
不适合谁
无编程基础的普通用户、仅需简单自动化工具的非技术用户
国内可用性
需网络配置。可能需要网络配置或第三方服务可访问。
安装难度
新手友好(★☆☆)。基于终端操作、依赖、API Key 和本地环境要求的初步判断。

安装与下载

openclaw skills install @mirni/greenhelix-agent-testing-observability

Skill 说明

命令、参数、文件名以原文为准

Agent 测试与可观测性手册:构建可靠的代理商业系统

注意:本指南为教学用途,包含示例代码。

不执行任何代码,也不安装依赖项。

所有示例均使用 GreenHelix 沙箱(https://sandbox.greenhelix.net),提供 500 个免费积分——无需 API 密钥即可开始使用。

参考凭证(您需在自己的环境中提供):

  • GREENHELIX_API_KEY:GreenHelix 网关的 API 认证(仅对已购买的 API 工具具有读写权限)
  • AGENT_SIGNING_KEY:代理身份的加密签名密钥(Ed25519 密钥对,用于请求签名)

你的代理商业系统在本地电脑上运行良好,通过了 GreenHelix 沙箱的烟雾测试。你于周五下午部署到生产环境后便回家休息。到了周六早上,重试循环导致创建了 47 个重复的托管资金,一个绩效托管因使用过时指标而提前释放资金,且结算回调 Webhook 静默失败长达六小时,因为目标端点返回 503 错误,却无人监控。这些故障从未被测试覆盖,原因在于传统的测试金字塔——底层是单元测试,中间是集成测试,顶层是端到端测试——并非为自主代理设计。这类代理需在不可靠网络中跨对手方做出金融决策,而对手方自身也可能出现故障。本指南重构适用于代理商业的测试金字塔,并在此基础上叠加生产环境可观测性、混沌测试、告警机制和 CI/CD 流水线。每个模式均配有可运行的 Python 代码,基于 GreenHelix 网关自带的 260 个测试用例,可直接复制到你的项目中。

  1. [代理系统的测试金字塔](#chapter-1-the-testing-pyramid-for-agent-systems)
  2. [工具级测试模式](#chapter-2-tool-level-testing-patterns)

你将学到的内容

  • 第一章:代理系统的测试金字塔
  • 第二章:工具级测试模式
  • 第三章:工作流与集成测试
  • 第四章:代理商业的混沌测试
  • 第五章:生产环境可观测性
  • 第六章:告警与事件响应
  • 第七章:代理系统的 CI/CD
  • 第八章:下一步该做什么

完整指南

Agent 测试与可观测性手册:构建可靠的代理商业系统

你的代理商业系统在本地电脑上运行良好,通过了 GreenHelix 沙箱的烟雾测试。你于周五下午部署到生产环境后便回家休息。到了周六早上,重试循环导致创建了 47 个重复的托管资金,一个绩效托管因使用过时指标而提前释放资金,且结算回调 Webhook 静默失败长达六小时,因为目标端点返回 503 错误,却无人监控。这些故障从未被测试覆盖,原因在于传统的测试金字塔——底层是单元测试,中间是集成测试,顶层是端到端测试——并非为自主代理设计。这类代理需在不可靠网络中跨对手方做出金融决策,而对手方自身也可能出现故障。本指南重构适用于代理商业的测试金字塔,并在此基础上叠加生产环境可观测性、混沌测试、告警机制和 CI/CD 流水线。每个模式均配有可运行的 Python 代码,基于 GreenHelix 网关自带的 260 个测试用例,可直接复制到你的项目中。


目录

  1. [代理系统的测试金字塔](#chapter-1-the-testing-pyramid-for-agent-systems)
  2. [工具级测试模式](#chapter-2-tool-level-testing-patterns)
  3. [工作流与集成测试](#chapter-3-workflow--integration-testing)
  4. [代理商业的混沌测试](#chapter-4-chaos-testing-for-agent-commerce)
  5. [生产环境可观测性](#chapter-5-production-observability)
  6. [告警与事件响应](#chapter-6-alerting--incident-response)
  7. [代理系统的 CI/CD](#chapter-7-cicd-for-agent-systems)
  8. [下一步该做什么](#chapter-8-what-to-do-next)

第一章:代理系统的测试金字塔

传统测试金字塔为何失效

标准测试金字塔假设你的代码调用函数并返回值。单元测试验证单个函数的行为,集成测试验证模块间的组合是否正确,端到端测试验证完整用户流程。该模型适用于确定性系统,当函数调用不产生金融副作用,且故障模式仅限于“返回错误值”或“抛出异常”的场景。

但代理商业系统违背了上述三个前提。调用 create_escrow 会锁定真实资金,调用 release_escrow 会转移真实金额。一次重试触发两次调用,会产生两个托管资金而非一次错误。其故障模式并非“返回值错误”——而是“代理为同一工作支付两次”、“托管超时但资金仍被锁定”、“网关结算成功但回调通知丢失”。传统单元测试无法发现这些问题,因其在隔离环境中测试代码,未考虑金融状态机的实际影响;传统端到端测试也无法发现,因其仅运行一次正常路径即视为通过。

代理测试金字塔

代理商业需要一个四层测试金字塔,以映射实际的故障模式:

markdown

╱╲

╱ ╲

╱混沌测试╱ 第4层:混沌测试

╱(故障注入、超时、

╱并发负载)

╱ 多智能体工作流 ╲ 第3层:多智能体工作流测试

╱ (Saga模式、回滚、Webhook交付)

╱─────────────────╲

╱ 工具合约测试 ╲ 第2层:工具级合约测试

╱ (Schema、幂等性、权限)

╱────────────────────────╲

╱ 确定性模拟器 ╲ 第1层:基于模拟的单元测试

╱ (快速、离线运行) ╲ (业务逻辑、验证)

╱───────────────────────────────╲

第1层:确定性模拟器 在不调用网关的情况下测试你的业务逻辑。这些测试在毫秒级内完成,可捕捉逻辑错误:金额计算错误、缺失信任检查、状态转换错误。这类测试占总测试量的60%。

第2层:工具合约测试 验证每个 GreenHelix 工具是否接受预期的输入 Schema,返回预期的输出结构,并对无效输入产生正确的错误码。这些测试在沙箱环境中运行,用于发现 API 合约变更。占总测试量的25%。

第3层:多智能体工作流测试 验证完整的业务流程:从市场商品上架,经托管释放,到结算完成。测试 Saga 模式(多步骤失败回滚)和 Webhook 交付机制。占总测试量的10%。

第4层:混沌测试 注入故障——网络超时、随机工具错误、并发重复请求——并验证系统在不造成财务不一致的前提下能够恢复。这类测试仅占总测试量的5%,但能发现最昂贵的缺陷。

AgentTestHarness

本指南中的所有测试均使用 AgentTestHarness 类。它负责管理测试夹具,为第1层提供确定性模拟器,并在第2至第4层切换至沙箱模式。

import pytest
import time
import json
import uuid
import requests
from unittest.mock import MagicMock, patch
from typing import Optional
from dataclasses import dataclass, field

@dataclass
class MockResponse:
    """用于 GreenHelix 工具响应的确定性模拟。"""
    tool: str
    status: str = "success"
    data: dict = field(default_factory=dict)
    error_code: Optional[str] = None
    error_message: Optional[str] = None

    def to_dict(self) -> dict:
        result = {"status": self.status}
        if self.status == "success":
            result.update(self.data)
        else:
            result["error"] = {
                "code": self.error_code or "unknown_error",
                "message": self.error_message or "发生错误",
            }
        return result

class AgentTestHarness:
    """GreenHelix 代理商业系统的测试工具箱。

    管理四个层级的代理测试金字塔所需的测试用例、模拟和沙箱连接。

    使用示例:
        harness = AgentTestHarness(
            api_key="test-key",
            agent_id="test-agent",
            base_url="https://sandbox.greenhelix.net/v1",
        )

        # 第一层:确定性模拟
        harness.mock_tool("get_balance", {"balance": "100.00"})
        result = harness.execute("get_balance", {})
        assert result["balance"] == "100.00"

        # 第二层及以上:沙箱模式
        harness.use_sandbox()
        result = harness.execute("get_balance", {})
    """

    def __init__(
        self,
        api_key: str,
        agent_id: str,
        base_url: str = "https://sandbox.greenhelix.net/v1",
    ):
        self.api_key = api_key
        self.agent_id = agent_id
        self.base_url = base_url
        self._mocks: dict[str, MockResponse] = {}
        self._call_log: list[dict] = []
        self._sandbox_mode = False
        self._session = requests.Session()
        self._session.headers.update({
            "Content-Type": "application/json",
            "Authorization": f"Bearer {api_key}",
        })

    # ── 模式控制 ───────────────────────────────────────────

    def use_mocks(self):
        """切换到确定性模拟模式(第 1 层)。"""
        self._sandbox_mode = False

    def use_sandbox(self):
        """切换到实时沙箱模式(第 2 层及以上)。"""
        self._sandbox_mode = True

    # ── 模拟注册 ──────────────────────────────────────

    def mock_tool(self, tool: str, data: dict, status: str = "success"):
        """为某个工具注册一个确定性的模拟响应。"""
        self._mocks[tool] = MockResponse(tool=tool, status=status, data=data)

    def mock_tool_error(
        self, tool: str, error_code: str, error_message: str
    ):
        """为某个工具注册一个确定性的错误响应。"""
        self._mocks[tool] = MockResponse(
            tool=tool,
            status="error",
            error_code=error_code,
            error_message=error_message,
        )

    def mock_tool_sequence(self, tool: str, responses: list[dict]):
        """为连续调用注册一系列响应。"""
        self._mock_sequences = getattr(self, "_mock_sequences", {})
        self._mock_sequences[tool] = list(responses)

    # ── 执行 ──────────────────────────────────────────────

    def execute(self, tool: str, input_data: dict) -> dict:
        """在模拟或沙箱中执行一个工具。"""
        call_record = {
            "tool": tool,
            "input": input_data,
            "timestamp": time.time(),
        }

        if self._sandbox_mode:
            resp = self._session.post(
                f"{self.base_url}/v1",
                json={"tool": tool, "input": input_data},
            )
            resp.raise_for_status()
            result = resp.json()
        else:
            # 先检查序列
            sequences = getattr(self, "_mock_sequences", {})
            if tool in sequences and sequences[tool]:
                result = sequences[tool].pop(0)
            elif tool in self._mocks:
                result = self._mocks[tool].to_dict()
            else:
                raise ValueError(
                    f"未为工具 '{tool}' 注册模拟。请使用 harness.mock_tool('{tool}', {{...}}) 注册"
                )

        call_record["result"] = result
        self._call_log.append(call_record)
        return result

    # ── 断言 ─────────────────────────────────────────────

    def assert_tool_called(self, tool: str, times: Optional[int] = None):
        """断言某个工具被调用过,可选指定调用次数。"""
        calls = [c for c in self._call_log if c["tool"] == tool]
        assert len(calls) > 0, f"工具 '{tool}' 从未被调用"
        if times is not None:
            assert len(calls) == times, (
                f"工具 '{tool}' 被调用了 {len(calls)} 次,期望 {times} 次"
            )

    def assert_tool_not_called(self, tool: str):
        """断言某个工具从未被调用。"""
        calls = [c for c in self._call_log if c["tool"] == tool]
        assert len(calls) == 0, (
            f"工具 '{tool}' 被调用了 {len(calls)} 次,期望 0 次"
        )

    def assert_call_order(self, tools: list[str]):
        """断言工具按特定顺序被调用。"""
        called_tools = [c["tool"] for c in self._call_log]
        idx = 0
        for tool in tools:
            try:
                idx = called_tools.index(tool, idx) + 1
            except ValueError:
                assert False, (
                    f"期望在位置 {idx} 之后出现 '{tool}'。实际调用顺序:{called_tools}"
                )

    def get_calls(self, tool: Optional[str] = None) -> list[dict]:
        """获取调用日志,可选按工具名称过滤。"""
        if tool:
            return [c for c in self._call_log if c["tool"] == tool]
        return list(self._call_log)

    def reset(self):
        """清除所有模拟和调用历史。"""
        self._mocks.clear()
        self._call_log.clear()
        if hasattr(self, "_mock_sequences"):
            self._mock_sequences.clear()

conftest.py:可复用的测试夹具

将此 conftest.py 文件放入你的测试目录中。本指南中的每个测试文件都会从此文件导入。

# tests/conftest.py
import os
import uuid
import pytest

@pytest.fixture
def api_key():
    """用于沙箱测试的 API 密钥。优先使用环境变量,否则使用测试默认值。"""
    return os.environ.get("GREENHELIX_API_KEY", "test-api-key-sandbox")

@pytest.fixture
def base_url():
    """集成测试使用的沙箱 URL。"""
    return os.environ.get(
        "GREENHELIX_BASE_URL", "https://sandbox.greenhelix.net/v1"
    )

@pytest.fixture
def agent_id():
    """每次测试运行时的唯一代理 ID,防止冲突。"""
    return f"test-agent-{uuid.uuid4().hex[:12]}"

@pytest.fixture
def buyer_id():
    """唯一的买家代理 ID。"""
    return f"test-buyer-{uuid.uuid4().hex[:12]}"

@pytest.fixture
def seller_id():
    """唯一的卖家代理 ID。"""
    return f"test-seller-{uuid.uuid4().hex[:12]}"

@pytest.fixture
def harness(api_key, agent_id, base_url):
    """在模拟模式下的 AgentTestHarness。调用 harness.use_sandbox() 可切换为真实环境。"""
    h = AgentTestHarness(
        api_key=api_key,
        agent_id=agent_id,
        base_url=base_url,
    )
    h.use_mocks()
    return h

@pytest.fixture
def sandbox_harness(api_key, agent_id, base_url):
    """用于集成测试的沙箱模式 AgentTestHarness。"""
    h = AgentTestHarness(
        api_key=api_key,
        agent_id=agent_id,
        base_url=base_url,
    )
    h.use_sandbox()
    return h

@pytest.fixture
def mock_session():
    """为单元测试预配置的 requests.Session 模拟对象。"""
    session = MagicMock()
    response = MagicMock()
    response.status_code = 200
    response.json.return_value = {"status": "success"}
    response.raise_for_status.return_value = None
    session.post.return_value = response
    return session

@pytest.fixture
def mock_response():
    """用于创建 MockResponse 对象的工厂 fixture。"""
    def _make(tool, data=None, status="success", error_code=None):
        return MockResponse(
            tool=tool,
            status=status,
            data=data or {},
            error_code=error_code,
        )
    return _make

# ── 每个类的 fixture,用于隔离的测试套件 ─────────────

class AgentFixtures:
    """提供标准模拟的混合类,适用于代理商业测试。"""

    @pytest.fixture(autouse=True)
    def setup_agent_mocks(self, harness):
        """在类中每个测试前注册常用模拟。"""
        self.harness = harness
        harness.mock_tool("get_balance", {"balance": "500.00", "currency": "USD"})
        harness.mock_tool("create_wallet", {"wallet_id": "w-test-001", "status": "active"})
        harness.mock_tool("register_agent", {"agent_id": harness.agent_id, "status": "registered"})
        harness.mock_tool("get_trust_score", {"agent_id": "any", "score": 0.85})
        harness.mock_tool("get_budget_status", {
            "daily_limit": "100.00",
            "spent_today": "25.00",
            "remaining": "75.00",
        })

class EscrowFixtures(AgentFixtures):
    """用于与托管相关的测试的扩展 fixture。"""

    @pytest.fixture(autouse=True)
    def setup_escrow_mocks(self, harness):
        """在代理模拟基础上添加托管相关模拟。"""
        self.escrow_id = f"escrow-{uuid.uuid4().hex[:8]}"
        harness.mock_tool("create_escrow", {
            "escrow_id": self.escrow_id,
            "status": "funded",
            "amount": "50.00",
        })
        harness.mock_tool("release_escrow", {
            "escrow_id": self.escrow_id,
            "status": "released",
        })
        harness.mock_tool("cancel_escrow", {
            "escrow_id": self.escrow_id,
            "status": "cancelled",
        })

模式:确定性模拟 vs. 沙箱测试

该测试工具支持两种模式。

  • 使用模拟模式进行业务逻辑测试(速度快、无需网络、结果可复现)。
  • 使用沙箱模式进行合约和集成测试(调用真实 API、真实状态、真实延迟)。

markdown

class TestBudgetGuardrails(AgentFixtures):

"""层级 1:使用确定性模拟测试预算逻辑。"""

def test_blocks_escrow_when_over_budget(self, harness):

"""当日预算耗尽时,应阻止创建托管资金。"""

harness.mock_tool("get_budget_status", {

"daily_limit": "100.00",

"spent_today": "99.00",

"remaining": "1.00",

})

budget = harness.execute("get_budget_status", {})

remaining = float(budget["remaining"])

escrow_amount = 50.00

# 业务逻辑:若金额超过剩余额度,不应创建托管

assert escrow_amount > remaining

harness.assert_tool_not_called("create_escrow")

def test_allows_escrow_within_budget(self, harness):

"""当预算允许时,应允许创建托管资金。"""

budget = harness.execute("get_budget_status", {})

remaining = float(budget["remaining"])

escrow_amount = 25.00

assert escrow_amount <= remaining

harness.execute("create_escrow", {

"payer_agent_id": harness.agent_id,

"payee_agent_id": "seller-001",

"amount": str(escrow_amount),

})

harness.assert_tool_called("create_escrow", times=1)

@pytest.mark.sandbox

class TestBudgetGuardrailsSandbox:

"""层级 2:在沙箱环境中验证预算工具的实际表现。"""

def test_budget_cap_enforced(self, sandbox_harness):

"""沙箱环境应拒绝超出预算上限的托管操作。"""

h = sandbox_harness

h.execute("create_wallet", {})

h.execute("deposit", {"amount": "100.00"})

h.execute("set_budget_cap", {

"agent_id": h.agent_id,

"daily_limit": "10.00",

})

# 此操作应失败,因托管金额超过日限额

result = h.execute("create_escrow", {

"payer_agent_id": h.agent_id,

"payee_agent_id": "seller-test",

"amount": "50.00",

})

# 网关在工具层面强制执行预算限制

assert result.get("status") in ("error", "rejected")


第二章:工具级测试模式

工具契约测试

每个 GreenHelix 工具都有一个隐式契约:它接受特定的输入格式,返回特定的输出结构,并对无效输入产生已记录的错误码。工具契约测试用于验证这三个方面。当网关更新 API 版本或添加必填字段时,你的契约测试会先于生产代码失败,从而提前发现问题。

class ToolContract:
    """定义 GreenHelix 工具的预期契约。

    用于契约测试,以验证沙箱环境中的模式、输出结构和错误行为。
    """

    def __init__(
        self,
        tool: str,
        required_fields: list[str],
        output_fields: list[str],
        error_cases: dict[str, dict],
    ):
        self.tool = tool
        self.required_fields = required_fields
        self.output_fields = output_fields
        self.error_cases = error_cases  # {case_name: {input: ..., expected_error: ...}}

# ── 核心工具的契约定义 ─────────────────────────────

BILLING_CONTRACTS = {
    "get_balance": ToolContract(
        tool="get_balance",
        required_fields=[],
        output_fields=["balance", "currency"],
        error_cases={
            "no_wallet": {
                "input": {},
                "expected_error": "wallet_not_found",
            },
        },
    ),
    "deposit": ToolContract(
        tool="deposit",
        required_fields=["amount"],
        output_fields=["balance", "transaction_id"],
        error_cases={
            "negative_amount": {
                "input": {"amount": "-10.00"},
                "expected_error": "invalid_amount",
            },
            "zero_amount": {
                "input": {"amount": "0"},
                "expected_error": "invalid_amount",
            },
        },
    ),
    "set_budget_cap": ToolContract(
        tool="set_budget_cap",
        required_fields=["agent_id", "daily_limit"],
        output_fields=["agent_id", "daily_limit"],
        error_cases={
            "negative_limit": {
                "input": {"agent_id": "test", "daily_limit": "-50.00"},
                "expected_error": "invalid_amount",
            },
        },
    ),
}

PAYMENT_CONTRACTS = {
    "create_escrow": ToolContract(
        tool="create_escrow",
        required_fields=["payer_agent_id", "payee_agent_id", "amount"],
        output_fields=["escrow_id", "status", "amount"],
        error_cases={
            "insufficient_funds": {
                "input": {
                    "payer_agent_id": "buyer",
                    "payee_agent_id": "seller",
                    "amount": "999999.00",
                },
                "expected_error": "insufficient_funds",
            },
            "self_escrow": {
                "input": {
                    "payer_agent_id": "same-agent",
                    "payee_agent_id": "same-agent",
                    "amount": "10.00",
                },
                "expected_error": "invalid_escrow",
            },
        },
    ),
    "release_escrow": ToolContract(
        tool="release_escrow",
        required_fields=["escrow_id"],
        output_fields=["escrow_id", "status"],
        error_cases={
            "nonexistent": {
                "input": {"escrow_id": "escrow-does-not-exist"},
                "expected_error": "escrow_not_found",
            },
        },
    ),
}

IDENTITY_CONTRACTS = {
    "register_agent": ToolContract(
        tool="register_agent",
        required_fields=["agent_id", "public_key", "name"],
        output_fields=["agent_id", "status"],
        error_cases={
            "missing_key": {
                "input": {"agent_id": "test", "name": "Test"},
                "expected_error": "missing_field",
            },
        },
    ),
    "get_trust_score": ToolContract(
        tool="get_trust_score",
        required_fields=["agent_id"],
        output_fields=["agent_id", "score"],
        error_cases={
            "nonexistent_agent": {
                "input": {"agent_id": "agent-that-does-not-exist-xyz"},
                "expected_error": "agent_not_found",
            },
        },
    ),
}

MARKETPLACE_CONTRACTS = {
    "register_service": ToolContract(
        tool="register_service",
        required_fields=["name", "description", "endpoint", "price", "tags", "category"],
        output_fields=["service_id"],
        error_cases={
            "missing_name": {
                "input": {
                    "description": "test",
                    "endpoint": "agent://test",
                    "price": 10.0,
                    "tags": [],
                    "category": "test",
                },
                "expected_error": "missing_field",
            },
        },
    ),
    "search_services": ToolContract(
        tool="search_services",
        required_fields=["query"],
        output_fields=["services"],
        error_cases={
            "empty_query": {
                "input": {"query": ""},
                "expected_error": "invalid_query",
            },
        },
    ),
}

运行契约测试

@pytest.mark.sandbox
class TestBillingContracts:
    """层级 2:在沙箱环境中验证计费工具的合约"""

    @pytest.fixture(autouse=True)
    def setup_wallet(self, sandbox_harness):
        self.harness = sandbox_harness
        self.harness.execute("create_wallet", {})
        self.harness.execute("deposit", {"amount": "100.00"})

    @pytest.mark.parametrize("tool_name", BILLING_CONTRACTS.keys())
    def test_output_shape(self, tool_name):
        """每个计费工具都返回预期的输出字段"""
        contract = BILLING_CONTRACTS[tool_name]
        # 构建最小有效的输入
        valid_input = {}
        if tool_name == "deposit":
            valid_input = {"amount": "10.00"}
        elif tool_name == "set_budget_cap":
            valid_input = {
                "agent_id": self.harness.agent_id,
                "daily_limit": "50.00",
            }
        result = self.harness.execute(tool_name, valid_input)
        for expected_field in contract.output_fields:
            assert expected_field in result, (
                f"工具 '{tool_name}' 缺少输出字段 '{expected_field}'。"
                f"实际返回字段: {list(result.keys())}"
            )

    @pytest.mark.parametrize("tool_name", BILLING_CONTRACTS.keys())
    def test_error_cases(self, tool_name):
        """每个计费工具对无效输入应返回正确的错误码"""
        contract = BILLING_CONTRACTS[tool_name]
        for case_name, case in contract.error_cases.items():
            result = self.harness.execute(tool_name, case["input"])
            assert result.get("status") == "error" or "error" in result, (
                f"工具 '{tool_name}' 的用例 '{case_name}' 应该失败。"
                f"实际结果: {result}"
            )

@pytest.mark.sandbox
class TestPaymentContracts:
    """层级 2:在沙箱环境中验证支付工具的合约"""

    @pytest.fixture(autouse=True)
    def setup_accounts(self, sandbox_harness, buyer_id, seller_id):
        self.harness = sandbox_harness
        self.buyer_id = buyer_id
        self.seller_id = seller_id

    @pytest.mark.parametrize("tool_name", PAYMENT_CONTRACTS.keys())
    def test_error_cases(self, tool_name):
        """每个支付工具对无效输入应返回正确的错误信息"""
        contract = PAYMENT_CONTRACTS[tool_name]
        for case_name, case in contract.error_cases.items():
            result = self.harness.execute(tool_name, case["input"])
            assert result.get("status") == "error" or "error" in result, (
                f"支付工具 '{tool_name}' 的用例 '{case_name}' 未正确失败"
            )

### 支付工具的幂等性测试

支付工具必须具备幂等性。使用相同参数调用 `create_escrow` 两次不应创建两个资金托管。调用 `release_escrow` 两次也不应导致资金重复支付。本测试通过提交重复请求并验证财务一致性来检验幂等性(P1, P7)。

@pytest.mark.sandbox

class TestPaymentIdempotency:

"""层级 2:验证支付工具能安全处理重复调用"""

def test_duplicate_escrow_creation(self, sandbox_harness, buyer_id, seller_id):

"""两次创建相同的托管账户应返回相同的托管 ID"""

h = sandbox_harness

h.execute("create_wallet", {})

h.execute("deposit", {"amount": "200.00"})

escrow_params = {

"payer_agent_id": buyer_id,

"payee_agent_id": seller_id,

"amount": "50.00",

"description": "幂等性测试托管",

"idempotency_key": f"idem-{uuid.uuid4().hex[:8]}",

}

result_1 = h.execute("create_escrow", escrow_params)

result_2 = h.execute("create_escrow", escrow_params)

# 相同的幂等性键应返回相同的托管 ID

assert result_1["escrow_id"] == result_2["escrow_id"]

# 资金余额只能被扣除一次

balance = h.execute("get_balance", {})

assert float(balance["balance"]) == 150.00

def test_duplicate_release(self, sandbox_harness):

"""重复释放同一笔托管资金不应导致双重支付"""

h = sandbox_harness

h.execute("create_wallet", {})

h.execute("deposit", {"amount": "100.00"})

escrow = h.execute("create_escrow", {

"payer_agent_id": h.agent_id,

"payee_agent_id": "seller-test",

"amount": "25.00",

})

escrow_id = escrow["escrow_id"]

release_1 = h.execute("release_escrow", {"escrow_id": escrow_id})

release_2 = h.execute("release_escrow", {"escrow_id": escrow_id})

# 第二次释放应为无操作或返回 already_released

assert release_1.get("status") == "released"

assert release_2.get("status") in ("released", "already_released")

### 权限边界测试

代理只能操作其自身资源。买家不应能释放由其他买家创建的托管资金。卖家不应能取消未指向自己的托管。权限边界测试用于验证这些不变量(P7)。

markdown
---
name: The Agent Testing & Observability Cookbook: Ship Reliable Agent Commerce Systems
version: 1.3.1
description: 本指南提供构建可靠代理商业系统所需的测试与可观测性实践,涵盖权限边界验证、工作流测试及集成验证。
summary: 通过分层测试策略确保代理系统在复杂交互下的安全性与可靠性。
tags:
  - agent commerce
  - testing
  - observability
  - security
  - workflow
  - sandbox
  - python
  - pytest

---

## 第 8 块:权限边界测试(权限隔离验证)

### 层级 2:验证代理无法访问其他代理的资源

@pytest.mark.sandbox

class TestPermissionBoundaries:

"""层级 2:验证代理无法访问其他代理的资源。"""

def test_cannot_release_others_escrow(self, sandbox_harness):

"""代理 A 无法释放由代理 B 创建的托管资金。"""

h = sandbox_harness

# 代理 A 创建托管资金

h.execute("create_wallet", {})

h.execute("deposit", {"amount": "100.00"})

escrow = h.execute("create_escrow", {

"payer_agent_id": h.agent_id,

"payee_agent_id": "seller-x",

"amount": "10.00",

})

# 代理 B(不同测试实例)尝试释放该托管资金

other = AgentTestHarness(

api_key=h.api_key,

agent_id="attacker-agent",

base_url=h.base_url,

)

other.use_sandbox()

result = other.execute("release_escrow", {

"escrow_id": escrow["escrow_id"],

})

assert result.get("status") == "error"

def test_cannot_read_others_balance(self, sandbox_harness):

"""代理 A 无法读取代理 B 的钱包余额。"""

h = sandbox_harness

h.execute("create_wallet", {})

h.execute("deposit", {"amount": "100.00"})

other = AgentTestHarness(

api_key=h.api_key,

agent_id="other-agent",

base_url=h.base_url,

)

other.use_sandbox()

result = other.execute("get_balance", {})

# 应返回其他代理的余额(0),而非我们自己的 100

balance = float(result.get("balance", 0))

assert balance != 100.00

def test_cannot_cancel_others_escrow(self, sandbox_harness):

"""卖家无法取消托管资金 —— 只有买家可以。"""

h = sandbox_harness

h.execute("create_wallet", {})

h.execute("deposit", {"amount": "50.00"})

escrow = h.execute("create_escrow", {

"payer_agent_id": h.agent_id,

"payee_agent_id": "seller-y",

"amount": "10.00",

})

seller = AgentTestHarness(

api_key=h.api_key,

agent_id="seller-y",

base_url=h.base_url,

)

seller.use_sandbox()

result = seller.execute("cancel_escrow", {

"escrow_id": escrow["escrow_id"],

})

assert result.get("status") == "error"

---

## 第三章:工作流与集成测试

### 谩歌测试模式(Saga Test Pattern)

代理商业工作流本质上是“漫歌”(Saga):一种多步骤操作流程,每一步都配有对应的补偿动作。若第 3 步失败,则必须回滚第 1 步和第 2 步。
漫歌测试模式用于验证正常流程路径以及所有可能的失败点,确保系统具备完整的容错能力与状态一致性。

class MarketplaceSaga:

"""实现完整的市场商品发布工作流,作为可测试的事务(Saga)。

步骤:

1. 卖家在市场注册服务

2. 买家通过搜索发现该服务

3. 买家检查卖家信誉评分

4. 买家创建托管资金

5. 卖家执行任务(模拟)

6. 买家释放托管资金

7. 买家评价服务

8. 结算完成

补偿操作:

第4步失败 → 无需清理(资金未锁定)

第5步失败 → 取消托管(将资金退还给买家)

第6步失败 → 启动争议处理

"""

def __init__(self, harness: AgentTestHarness, buyer_id: str, seller_id: str):

self.harness = harness

self.buyer_id = buyer_id

self.seller_id = seller_id

self.state = {"step": 0, "completed_steps": []}

def run(self) -> dict:

"""执行完整事务流程,在失败时进行回滚。"""

try:

# 步骤1:注册服务

service = self.harness.execute("register_service", {

"name": "测试摘要服务",

"description": "用于测试的文档摘要功能",

"endpoint": f"agent://{self.seller_id}",

"price": 25.00,

"tags": ["test", "summarization"],

"category": "data-processing",

})

self.state["service_id"] = service["service_id"]

self.state["completed_steps"].append("register_service")

# 步骤2:发现服务

results = self.harness.execute("search_services", {

"query": "test summarization",

})

assert len(results.get("services", [])) > 0

self.state["completed_steps"].append("discover_service")

# 步骤3:信誉检查

trust = self.harness.execute("get_trust_score", {

"agent_id": self.seller_id,

})

if trust.get("score", 0) < 0.5:

return {"status": "aborted", "reason": "low_trust"}

self.state["completed_steps"].append("trust_check")

# 步骤4:创建托管

escrow = self.harness.execute("create_escrow", {

"payer_agent_id": self.buyer_id,

"payee_agent_id": self.seller_id,

"amount": "25.00",

"description": "Saga测试托管资金",

})

self.state["escrow_id"] = escrow["escrow_id"]

self.state["completed_steps"].append("create_escrow")

# 步骤5:模拟任务执行(真实测试中调用卖家端点)

work_result = {"quality": 0.95, "documents_processed": 500}

self.state["completed_steps"].append("work_completed")

# 步骤6:释放托管

release = self.harness.execute("release_escrow", {

"escrow_id": escrow["escrow_id"],

})

self.state["completed_steps"].append("release_escrow")

# 步骤7:评价服务

self.harness.execute("rate_service", {

"service_id": service["service_id"],

"rating": 5,

})

self.state["completed_steps"].append("rate_service")

return {"status": "completed", "state": self.state}

except Exception as e:

return self._compensate(str(e))

def _compensate(self, error: str) -> dict:

"""在失败时回滚已完成的步骤。"""

if "create_escrow" in self.state["completed_steps"]:

escrow_id = self.state.get("escrow_id")

if escrow_id and "release_escrow" not in self.state["completed_steps"]:

self.harness.execute("cancel_escrow", {

"escrow_id": escrow_id,

})

self.state["completed_steps"].append("compensate:cancel_escrow")

return {

"status": "rolled_back",

"error": error,

"state": self.state,

}

### 测试事务流程

python
class TestMarketplaceSaga(EscrowFixtures):
    """层级 3:包含回滚验证的完整市场流程。"""

    def test_happy_path(self, harness, buyer_id, seller_id):
        """完整执行 saga 的全部 7 个步骤。"""
        harness.mock_tool("register_service", {
            "service_id": "svc-test-001",
        })
        harness.mock_tool("search_services", {
            "services": [{"name": "Test Service", "agent_id": seller_id}],
        })
        harness.mock_tool("rate_service", {"status": "rated"})

        saga = MarketplaceSaga(harness, buyer_id, seller_id)
        result = saga.run()

        assert result["status"] == "completed"
        assert len(result["state"]["completed_steps"]) == 7
        harness.assert_call_order([
            "register_service",
            "search_services",
            "get_trust_score",
            "create_escrow",
            "release_escrow",
            "rate_service",
        ])

    def test_rollback_on_escrow_failure(self, harness, buyer_id, seller_id):
        """escrow 创建失败时不会留下孤立状态。"""
        harness.mock_tool("register_service", {"service_id": "svc-test-002"})
        harness.mock_tool("search_services", {
            "services": [{"name": "Test", "agent_id": seller_id}],
        })
        harness.mock_tool_error(
            "create_escrow", "insufficient_funds", "余额不足"
        )

        saga = MarketplaceSaga(harness, buyer_id, seller_id)
        result = saga.run()

        assert result["status"] == "rolled_back"
        assert "create_escrow" not in result["state"]["completed_steps"]
        harness.assert_tool_not_called("release_escrow")

    def test_rollback_cancels_escrow_on_work_failure(self, harness, buyer_id, seller_id):
        """工作步骤失败会触发 escrow 取消。"""
        harness.mock_tool("register_service", {"service_id": "svc-test-003"})
        harness.mock_tool("search_services", {
            "services": [{"name": "Test", "agent_id": seller_id}],
        })

        saga = MarketplaceSaga(harness, buyer_id, seller_id)
        # 通过在 escrow 创建后注入错误来模拟工作失败
        original_execute = harness.execute

        call_count = {"n": 0}
        def failing_execute(tool, input_data):
            call_count["n"] += 1
            if tool == "release_escrow":
                raise RuntimeError("模拟的工作验证失败")
            return original_execute(tool, input_data)

        harness.execute = failing_execute
        result = saga.run()

        assert result["status"] == "rolled_back"
        assert "compensate:cancel_escrow" in result["state"]["completed_steps"]

### 订阅生命周期测试

订阅是具有状态的流程:创建、续订、暂停、取消。每个状态转换都必须经过测试,包括边缘情况如余额不足时的续订(P2, P6)。

class TestSubscriptionLifecycle(AgentFixtures):

"""层级 3:验证订阅状态转换。"""

def test_full_lifecycle(self, harness):

"""创建 → 续订 → 取消 的完整生命周期。"""

sub_id = f"sub-{uuid.uuid4().hex[:8]}"

harness.mock_tool("create_subscription", {

"subscription_id": sub_id,

"status": "active",

"next_payment_date": "2026-05-06",

})

harness.mock_tool("get_subscription", {

"subscription_id": sub_id,

"status": "active",

"payments_completed": 1,

})

harness.mock_tool("cancel_subscription", {

"subscription_id": sub_id,

"status": "cancelled",

})

# 创建

sub = harness.execute("create_subscription", {

"payer_agent_id": harness.agent_id,

"payee_agent_id": "provider-001",

"amount": "15.00",

"interval": "monthly",

})

assert sub["status"] == "active"

# 检查状态

status = harness.execute("get_subscription", {

"subscription_id": sub_id,

})

assert status["payments_completed"] == 1

# 取消

cancel = harness.execute("cancel_subscription", {

"subscription_id": sub_id,

})

assert cancel["status"] == "cancelled"

harness.assert_call_order([

"create_subscription",

"get_subscription",

"cancel_subscription",

])

def test_renewal_with_insufficient_funds(self, harness):

"""当余额不足时,订阅续订应能优雅失败。"""

harness.mock_tool("get_balance", {"balance": "5.00", "currency": "USD"})

harness.mock_tool_error(

"create_subscription",

"insufficient_funds",

"余额不足以支付订阅金额",

)

result = harness.execute("create_subscription", {

"payer_agent_id": harness.agent_id,

"payee_agent_id": "provider-002",

"amount": "15.00",

"interval": "monthly",

})

assert result.get("status") == "error"

Webhook 交付测试

Webhook 是支付事件的主要通知机制。遗漏的 webhook 意味着遗漏结算、遗漏争议截止时间或遗漏订阅续订。应将 webhook 交付测试与它所触发的业务逻辑分开进行(P4)。

class TestWebhookDelivery:
    """层级 3:验证 webhook 注册和事件交付。"""

    def test_webhook_registration(self, harness):
        """Webhook 注册应返回 webhook_id。"""
        harness.mock_tool("register_webhook", {
            "webhook_id": "wh-test-001",
            "status": "active",
            "events": ["escrow.released", "escrow.disputed"],
        })

        result = harness.execute("register_webhook", {
            "url": "https://test.example.com/webhook",
            "events": ["escrow.released", "escrow.disputed"],
        })
        assert "webhook_id" in result
        assert result["status"] == "active"

    def test_webhook_event_format(self, harness):
        """Webhook payload 应包含必需字段。"""
        harness.mock_tool("get_webhook_logs", {
            "logs": [{
                "webhook_id": "wh-test-001",
                "event_type": "escrow.released",
                "payload": {
                    "escrow_id": "escrow-abc",
                    "amount": "25.00",
                    "payer_agent_id": "buyer-1",
                    "payee_agent_id": "seller-1",
                    "timestamp": "2026-04-06T12:00:00Z",
                },
                "delivery_status": "delivered",
                "response_code": 200,
            }],
        })

        logs = harness.execute("get_webhook_logs", {"webhook_id": "wh-test-001"})
        for log_entry in logs["logs"]:
            payload = log_entry["payload"]
            assert "escrow_id" in payload
            assert "amount" in payload
            assert "timestamp" in payload
            assert log_entry["delivery_status"] == "delivered"

第四章:Agent 商业系统的混沌测试

为什么混沌测试对支付系统至关重要

传统软件在下游服务不可用时会优雅降级——显示错误页面。而 Agent 商业系统失败的代价很高。例如,在 release_escrow 操作中发生超时,可能导致资金已在网关释放,但调用方未收到确认,从而引发重复释放尝试。在网络分区期间执行拆分支付,可能导致部分结算。混沌测试通过主动注入这些故障,让你能在生产环境出现之前验证系统是否能正确处理。

ChaosMiddleware

ChaosMiddleware 包装了 harness 的 execute 方法,可随机注入各类故障:超时、错误响应、延迟响应和损坏的 payload。该中间件支持按工具和故障类型进行配置。

import random
import time
from dataclasses import dataclass, field

@dataclass
class ChaosConfig:
    """用于在特定工具上注入混沌的配置。"""
    timeout_pct: float = 0.0       # % 的调用会超时
    error_pct: float = 0.0         # % 的调用返回错误
    delay_ms: float = 0.0          # 额外延迟,单位为毫秒
    corrupt_pct: float = 0.0       # % 的调用返回被破坏的响应
    duplicate_pct: float = 0.0     # % 的调用会执行两次

class ChaosMiddleware:
    """包装 AgentTestHarness._execute,支持可配置的故障注入。

    使用示例:
        harness = AgentTestHarness(api_key, agent_id, base_url)
        chaos = ChaosMiddleware(
            harness=harness,
            default_config=ChaosConfig(error_pct=10, delay_ms=200),
        )
        chaos.set_tool_config("create_escrow", ChaosConfig(
            timeout_pct=20,
            duplicate_pct=5,
        ))

        # 所有调用现在都会经过混沌注入
        result = chaos.execute("create_escrow", {...})
    """

    def __init__(
        self,
        harness: AgentTestHarness,
        default_config: ChaosConfig = None,
        seed: int = None,
    ):
        self.harness = harness
        self.default_config = default_config or ChaosConfig()
        self._tool_configs: dict[str, ChaosConfig] = {}
        self._rng = random.Random(seed)
        self._chaos_log: list[dict] = []

    def set_tool_config(self, tool: str, config: ChaosConfig):
        """为特定工具设置混沌配置。"""
        self._tool_configs[tool] = config

    def execute(self, tool: str, input_data: dict) -> dict:
        """使用混沌注入执行一个工具调用。"""
        config = self._tool_configs.get(tool, self.default_config)
        chaos_event = {"tool": tool, "injection": None, "timestamp": time.time()}

        # 检查超时注入
        if self._rng.random() * 100 < config.timeout_pct:
            chaos_event["injection"] = "timeout"
            self._chaos_log.append(chaos_event)
            raise TimeoutError(
                f"Chaos: 模拟超时,工具 '{tool}'"
            )

        # 检查错误注入
        if self._rng.random() * 100 < config.error_pct:
            chaos_event["injection"] = "error"
            self._chaos_log.append(chaos_event)
            return {
                "status": "error",
                "error": {
                    "code": "chaos_injected_error",
                    "message": f"Chaos: 模拟错误,工具 '{tool}'",
                },
            }

        # 应用延迟
        if config.delay_ms > 0:
            delay_seconds = config.delay_ms / 1000.0
            actual_delay = self._rng.uniform(0, delay_seconds * 2)
            time.sleep(actual_delay)
            chaos_event["injection"] = f"delay:{actual_delay:.3f}s"

        # 执行真实调用
        result = self.harness.execute(tool, input_data)

        # 检查重复执行
        if self._rng.random() * 100 < config.duplicate_pct:
            chaos_event["injection"] = "duplicate"
            self._chaos_log.append(chaos_event)
            # 再次执行 —— 用于测试幂等性
            duplicate_result = self.harness.execute(tool, input_data)
            return duplicate_result

        # 检查响应破坏
        if self._rng.random() * 100 < config.corrupt_pct:
            chaos_event["injection"] = "corrupt"
            self._chaos_log.append(chaos_event)
            if isinstance(result, dict):
                result["_chaos_corrupted"] = True
                # 随机移除一个键以模拟部分响应
                keys = [k for k in result.keys() if k != "status"]
                if keys:
                    del result[self._rng.choice(keys)]

        self._chaos_log.append(chaos_event)
        return result

    def get_chaos_log(self) -> list[dict]:
        """获取所有混沌注入的日志。"""
        return list(self._chaos_log)

    def get_injection_stats(self) -> dict:
        """获取混沌注入的统计摘要。"""
        stats = {"total": len(self._chaos_log)}
        for entry in self._chaos_log:
            injection = entry.get("injection") or "none"
            category = injection.split(":")[0]
            stats[category] = stats.get(category, 0) + 1
        return stats

混沌测试模式

markdown

Layer 4:验证托管操作在混沌注入下仍能正常运行

测试用例:托管创建在超时后能正确重试

def test_escrow_survives_timeout_retry(self, harness):
    """托管创建在超时后能正确重试。"""
    harness.use_mocks()
    harness.mock_tool("create_escrow", {
        "escrow_id": "escrow-chaos-001",
        "status": "funded",
        "amount": "50.00",
    })

    chaos = ChaosMiddleware(
        harness=harness,
        seed=42,
        default_config=ChaosConfig(timeout_pct=50),
    )

    # 重试循环 —— 生产环境代码应包含此逻辑
    max_retries = 5
    result = None
    for attempt in range(max_retries):
        try:
            result = chaos.execute("create_escrow", {
                "payer_agent_id": "buyer",
                "payee_agent_id": "seller",
                "amount": "50.00",
                "idempotency_key": "idem-chaos-001",
            })
            break
        except TimeoutError:
            continue

    assert result is not None, "所有重试尝试均超时"
    assert result["escrow_id"] == "escrow-chaos-001"

测试用例:重复注入不会导致双重支付

def test_no_double_payment_under_duplicates(self, harness):
    """重复混沌注入不会导致双重支付。"""
    harness.use_mocks()
    call_count = {"n": 0}

    original_execute = harness.execute
    def counting_execute(tool, input_data):
        call_count["n"] += 1
        return original_execute(tool, input_data)

    harness.execute = counting_execute
    harness.mock_tool("release_escrow", {
        "escrow_id": "escrow-dup-test",
        "status": "released",
    })

    chaos = ChaosMiddleware(
        harness=harness,
        seed=99,
        default_config=ChaosConfig(duplicate_pct=100),
    )

    result = chaos.execute("release_escrow", {
        "escrow_id": "escrow-dup-test",
    })

    # 中间件调用了 execute 两次,但结果应仍表示一次释放
    assert result["status"] == "released"

测试用例:并发托管创建不会引发竞态条件

def test_concurrent_escrow_load(self, harness):
    """并发托管创建不会引发竞态条件。"""
    import concurrent.futures

    harness.use_mocks()
    harness.mock_tool("create_escrow", {
        "escrow_id": "will-be-unique",
        "status": "funded",
        "amount": "10.00",
    })

    chaos = ChaosMiddleware(
        harness=harness,
        seed=7,
        default_config=ChaosConfig(delay_ms=50, error_pct=10),
    )

    results = []
    errors = []

    def create_escrow(i):
        try:
            return chaos.execute("create_escrow", {
                "payer_agent_id": "buyer",
                "payee_agent_id": "seller",
                "amount": "10.00",
                "idempotency_key": f"concurrent-{i}",
            })
        except Exception as e:
            return {"status": "error", "message": str(e)}

    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as pool:
        futures = [pool.submit(create_escrow, i) for i in range(20)]
        for f in concurrent.futures.as_completed(futures):
            r = f.result()
            if r.get("status") == "error":
                errors.append(r)
            else:
                results.append(r)

    # 尽管存在混沌,仍应有部分成功
    assert len(results) > 0
    # 混沌环境下出现错误是预期行为 —— 验证其被妥善处理
    total = len(results) + len(errors)
    assert total == 20

测试用例:托管必须在截止时间前释放或取消

def test_escrow_timeout_deadline(self, harness):
    """托管必须在截止时间前释放或取消。"""
    harness.use_mocks()
    deadline = time.time() + 2  # 2秒截止时间(测试用)

    harness.mock_tool("create_escrow", {
        "escrow_id": "escrow-deadline",
        "status": "funded",
        "deadline": deadline,
    })
    harness.mock_tool("cancel_escrow", {
        "escrow_id": "escrow-deadline",
        "status": "cancelled",
        "reason": "deadline_exceeded",
    })

    escrow = harness.execute("create_escrow", {
        "payer_agent_id": "buyer",
        "payee_agent_id": "seller",
        "amount": "30.00",
    })

    # 模拟截止时间到达
    time.sleep(0.1)  # 在真实测试中应使用时间模拟
    current_time = time.time()
    if current_time < deadline:
        # 仍在截止时间内 —— 释放有效
        harness.mock_tool("release_escrow", {
            "escrow_id": "escrow-deadline",
            "status": "released",
        })
        result = harness.execute("release_escrow", {
            "escrow_id": escrow["escrow_id"],
        })
        assert result["status"] == "released"
    else:
        # 截止时间已过 —— 应自动取消
        result = harness.execute("cancel_escrow", {
            "escrow_id": escrow["escrow_id"],
        })
        assert result["status"] == "cancelled"

第五章:生产环境可观测性

为什么 printf 调试无法用于支付系统

当支付在生产环境中失败时,你需要立即知道三件事:哪个工具失败了、耗时多久、输入数据是什么。这些信息必须结构化、可搜索,并且无需通过 SSH 登录服务器即可获取。AgentTracer 会为每个 _execute 调用添加时间记录、成功/失败状态追踪和结构化输出。

AgentTracer

import time
import json
import logging
from dataclasses import dataclass, field
from typing import Optional, Callable

@dataclass
class TraceRecord:
    """单次工具执行的追踪记录。"""
    tool: str
    agent_id: str
    started_at: float
    ended_at: float
    duration_ms: float
    success: bool
    input_data: dict
    output_data: Optional[dict] = None
    error: Optional[str] = None
    trace_id: str = ""

    def to_dict(self) -> dict:
        return {
            "trace_id": self.trace_id,
            "tool": self.tool,
            "agent_id": self.agent_id,
            "started_at": self.started_at,
            "ended_at": self.ended_at,
            "duration_ms": round(self.duration_ms, 2),
            "success": self.success,
            "error": self.error,
        }

class AgentTracer:
    """封装 _execute 并记录执行时间、成功/失败状态、工具名称。

    为生产环境中的每个工具调用提供结构化可观测性。

    使用示例:
        tracer = AgentTracer(
            api_key="...",
            agent_id="production-buyer-01",
            base_url="https://api.greenhelix.net/v1",
        )

        # 包装现有的 AgentCommerce 或测试框架
        result = tracer.trace("create_escrow", {
            "payer_agent_id": "buyer",
            "payee_agent_id": "seller",
            "amount": "50.00",
        })

        # 获取指标数据
        print(tracer.get_metrics())
        # {'total_calls': 47, 'success_rate': 0.957,
        #  'avg_latency_ms': 142.3, 'p99_latency_ms': 890.1,
        #  'error_rate_by_tool': {'create_escrow': 0.02}}
    """

    def __init__(
        self,
        api_key: str,
        agent_id: str,
        base_url: str = "https://api.greenhelix.net/v1",
        logger: Optional[logging.Logger] = None,
        on_slow_call: Optional[Callable] = None,
        slow_threshold_ms: float = 2000.0,
    ):
        self.api_key = api_key
        self.agent_id = agent_id
        self.base_url = base_url
        self.logger = logger or logging.getLogger("agent_tracer")
        self.on_slow_call = on_slow_call
        self.slow_threshold_ms = slow_threshold_ms
        self._traces: list[TraceRecord] = []
        self._session = requests.Session()
        self._session.headers.update({
            "Content-Type": "application/json",
            "Authorization": f"Bearer {api_key}",
        })

    def trace(self, tool: str, input_data: dict) -> dict:
        """以完整追踪方式执行一个工具调用。"""
        trace_id = f"trace-{uuid.uuid4().hex[:12]}"
        started_at = time.time()

        try:
            resp = self._session.post(
                f"{self.base_url}/v1",
                json={"tool": tool, "input": input_data},
            )
            resp.raise_for_status()
            result = resp.json()
            success = result.get("status") != "error"
            error = None if success else json.dumps(result.get("error", {}))
        except Exception as e:
            result = {"status": "error", "error": str(e)}
            success = False
            error = str(e)

        ended_at = time.time()
        duration_ms = (ended_at - started_at) * 1000

        record = TraceRecord(
            tool=tool,
            agent_id=self.agent_id,
            started_at=started_at,
            ended_at=ended_at,
            duration_ms=duration_ms,
            success=success,
            input_data=input_data,
            output_data=result if success else None,
            error=error,
            trace_id=trace_id,
        )
        self._traces.append(record)

        # 结构化日志输出
        self.logger.info(json.dumps({
            "event": "tool_execution",
            "trace_id": trace_id,
            "tool": tool,
            "agent_id": self.agent_id,
            "duration_ms": round(duration_ms, 2),
            "success": success,
            "error": error,
        }))

        # 慢调用回调处理
        if duration_ms > self.slow_threshold_ms and self.on_slow_call:
            self.on_slow_call(record)

        return result

    def get_metrics(self) -> dict:
        """从追踪记录中计算聚合指标。"""
        if not self._traces:
            return {"total_calls": 0}

        total = len(self._traces)
        successes = sum(1 for t in self._traces if t.success)
        durations = sorted(t.duration_ms for t in self._traces)

        # 按工具统计错误率
        tool_calls: dict[str, dict] = {}
        for t in self._traces:
            if t.tool not in tool_calls:
                tool_calls[t.tool] = {"total": 0, "errors": 0}
            tool_calls[t.tool]["total"] += 1
            if not t.success:
                tool_calls[t.tool]["errors"] += 1

        error_rate_by_tool = {
            tool: stats["errors"] / stats["total"]
            for tool, stats in tool_calls.items()
            if stats["errors"] > 0
        }

        # 按工具统计延迟
        tool_latencies: dict[str, list[float]] = {}
        for t in self._traces:
            tool_latencies.setdefault(t.tool, []).append(t.duration_ms)

        avg_latency_by_tool = {
            tool: round(sum(lats) / len(lats), 2)
            for tool, lats in tool_latencies.items()
        }

        return {
            "total_calls": total,
            "success_rate": round(successes / total, 4),
            "avg_latency_ms": round(sum(durations) / total, 2),
            "p50_latency_ms": round(durations[total // 2], 2),
            "p95_latency_ms": round(durations[int(total * 0.95)], 2),
            "p99_latency_ms": round(durations[int(total * 0.99)], 2),
            "error_rate_by_tool": error_rate_by_tool,
            "avg_latency_by_tool": avg_latency_by_tool,
        }

    def get_traces(
        self,
        tool: Optional[str] = None,
        success: Optional[bool] = None,
        min_duration_ms: Optional[float] = None,
    ) -> list[dict]:
        """按可选条件筛选追踪记录。"""
        filtered = self._traces
        if tool:
            filtered = [t for t in filtered if t.tool == tool]
        if success is not None:
            filtered = [t for t in filtered if t.success == success]
        if min_duration_ms is not None:
            filtered = [t for t in filtered if t.duration_ms >= min_duration_ms]
        return [t.to_dict() for t in filtered]

    def get_revenue_metrics(self) -> dict:
        """从追踪记录中提取与收入相关的指标。"""
        escrow_creates = [
            t for t in self._traces
            if t.tool == "create_escrow" and t.success
        ]
        escrow_releases = [
            t for t in self._traces
            if t.tool == "release_escrow" and t.success
        ]
        deposits = [
            t for t in self._traces
            if t.tool == "deposit" and t.success
        ]

        total_escrowed = sum(
            float(t.input_data.get("amount", 0))
            for t in escrow_creates
        )
        total_deposited = sum(
            float(t.input_data.get("amount", 0))
            for t in deposits
        )

        return {
            "escrows_created": len(escrow_creates),
            "escrows_released": len(escrow_releases),
            "total_escrowed": round(total_escrowed, 2),
            "total_deposited": round(total_deposited, 2),
            "release_rate": (
                round(len(escrow_releases) / len(escrow_creates), 4)
                if escrow_creates else 0
            ),
        }

Agent 决策的结构化日志

The tracer 的结构化日志可与任意日志聚合系统(如 Datadog、ELK、CloudWatch)集成。每条日志行均为包含一致字段的 JSON 对象。关键决策:记录每次调用的工具名称和耗时,但在生产环境中对输入数据进行脱敏处理,以避免记录敏感信息(如 API 密钥或钱包金额)。仅在预发环境(staging)中启用完整的输入输出日志记录(遵循 P7 安全模式)。

# 生产环境日志配置
import logging

def configure_production_logging():
    """为代理商业系统设置结构化 JSON 日志。"""
    logger = logging.getLogger("agent_tracer")
    logger.setLevel(logging.INFO)

    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(message)s"))
    logger.addHandler(handler)
    return logger

def configure_staging_logging():
    """预发环境日志器,支持完整输入/输出捕获。"""
    logger = logging.getLogger("agent_tracer")
    logger.setLevel(logging.DEBUG)

    handler = logging.FileHandler("/var/log/agent-commerce/traces.jsonl")
    handler.setFormatter(logging.Formatter("%(message)s"))
    logger.addHandler(handler)
    return logger

自定义指标仪表盘

从 tracer 中提取所需指标,用于 Grafana 或 Datadog 仪表盘。每个代理商业系统都应监控以下五个核心指标。

class MetricsExporter:
    """将 AgentTracer 指标导出至监控系统。"""

    def __init__(self, tracer: AgentTracer):
        self.tracer = tracer

    def export_prometheus(self) -> str:
        """以 Prometheus 文本格式导出指标。"""
        metrics = self.tracer.get_metrics()
        revenue = self.tracer.get_revenue_metrics()
        lines = [
            f'agent_commerce_calls_total {metrics["total_calls"]}',
            f'agent_commerce_success_rate {metrics["success_rate"]}',
            f'agent_commerce_latency_p50_ms {metrics.get("p50_latency_ms", 0)}',
            f'agent_commerce_latency_p99_ms {metrics.get("p99_latency_ms", 0)}',
            f'agent_commerce_escrows_created {revenue["escrows_created"]}',
            f'agent_commerce_escrows_released {revenue["escrows_released"]}',
            f'agent_commerce_total_escrowed {revenue["total_escrowed"]}',
            f'agent_commerce_release_rate {revenue["release_rate"]}',
        ]

        for tool, rate in metrics.get("error_rate_by_tool", {}).items():
            lines.append(
                f'agent_commerce_error_rate{{tool="{tool}"}} {rate}'
            )

        return "\n".join(lines)

    def to_datadog_events(self) -> list[dict]:
        """将失败的追踪记录格式化为 Datadog 事件。"""
        failed = self.tracer.get_traces(success=False)
        return [
            {
                "title": f"工具调用失败: {t['tool']}",
                "text": t.get("error", "未知错误"),
                "tags": [
                    f"tool:{t['tool']}",
                    f"agent:{t['agent_id']}",
                    "service:agent-commerce",
                ],
                "alert_type": "error",
            }
            for t in failed
        ]

第六章:告警与事件响应

应该告警的条件

并非所有错误都需立即响应。以下六种情况若未及时处理,可能导致代理商业系统的财务损失。

告警类型触发条件严重级别处理建议
结算失败release_escrow 调用连续返回错误超过 3 次严重检查支付网关状态,暂停新资金托管
资金托管超时托管资金未到账或未释放,超出截止时间自动取消或升级至争议处理流程
账户余额异常单笔交易导致余额下降超过 50%严重暂停代理运行,审计最近调用记录
信誉分下降信任评分低于阈值中等暂停雇佣,调查相关指标
回调通知失败连续超过 5 次回调发送失败检查目标地址,启用重试队列
重复付款同一 escrow_id 在 60 秒内被释放两次严重立即停止操作,审计账本记录

健康检查器(HealthChecker)

HealthChecker 会向沙箱环境(或生产环境中的专用健康检查代理)发起模拟交易,验证整个支付链路是否正常运行。建议由监控系统每 60 秒执行一次。

# HealthChecker:代理商业系统的合成交易健康检查

用于代理商业系统的合成交易健康检查。

执行一个微型托管生命周期(创建钱包 → 存款 → 创建托管 → 释放 → 验证余额),并报告通过/失败状态及延迟指标。

## 使用方法

checker = HealthChecker(

api_key="health-check-key",

agent_id="health-check-agent",

base_url="https://sandbox.greenhelix.net/v1",

)

result = checker.run_health_check()

{

"healthy": True,

"checks": {

"wallet": {"status": "pass", "latency_ms": 45.2},

"deposit": {"status": "pass", "latency_ms": 78.1},

"escrow_create": {"status": "pass", "latency_ms": 112.4},

"escrow_release": {"status": "pass", "latency_ms": 95.6},

"balance_verify": {"status": "pass", "latency_ms": 41.0},

},

"total_latency_ms": 372.3,

}

## 类定义

class HealthChecker:

"""Synthetic transaction health checks for agent commerce.

Runs a mini escrow lifecycle (create wallet → deposit → create

escrow → release → verify balance) and reports pass/fail with

latency metrics.

Usage:

checker = HealthChecker(

api_key="health-check-key",

agent_id="health-check-agent",

base_url="https://sandbox.greenhelix.net/v1",

)

result = checker.run_health_check()

# {

# "healthy": True,

# "checks": {

# "wallet": {"status": "pass", "latency_ms": 45.2},

# "deposit": {"status": "pass", "latency_ms": 78.1},

# "escrow_create": {"status": "pass", "latency_ms": 112.4},

# "escrow_release": {"status": "pass", "latency_ms": 95.6},

# "balance_verify": {"status": "pass", "latency_ms": 41.0},

# },

# "total_latency_ms": 372.3,

# }

"""

def __init__(

self,

api_key: str,

agent_id: str,

base_url: str = "https://sandbox.greenhelix.net/v1",

timeout_ms: float = 5000.0,

):

self.api_key = api_key

self.agent_id = agent_id

self.base_url = base_url

self.timeout_ms = timeout_ms

self._session = requests.Session()

self._session.headers.update({

"Content-Type": "application/json",

"Authorization": f"Bearer {api_key}",

})

def _timed_execute(self, tool: str, input_data: dict) -> tuple[dict, float]:

"""Execute a tool and return (result, latency_ms)."""

start = time.time()

resp = self._session.post(

f"{self.base_url}/v1",

json={"tool": tool, "input": input_data},

timeout=self.timeout_ms / 1000.0,

)

latency_ms = (time.time() - start) * 1000

resp.raise_for_status()

return resp.json(), latency_ms

def run_health_check(self) -> dict:

"""Run a full synthetic transaction health check."""

checks = {}

healthy = True

health_agent = f"{self.agent_id}-{int(time.time())}"

# Check 1: Wallet creation

try:

result, latency = self._timed_execute("create_wallet", {})

checks["wallet"] = {"status": "pass", "latency_ms": round(latency, 2)}

except Exception as e:

checks["wallet"] = {"status": "fail", "error": str(e)}

healthy = False

# Check 2: Deposit

try:

result, latency = self._timed_execute("deposit", {"amount": "1.00"})

checks["deposit"] = {"status": "pass", "latency_ms": round(latency, 2)}

except Exception as e:

checks["deposit"] = {"status": "fail", "error": str(e)}

healthy = False

# Check 3: Escrow creation

escrow_id = None

try:

result, latency = self._timed_execute("create_escrow", {

"payer_agent_id": health_agent,

"payee_agent_id": f"{health_agent}-payee",

"amount": "0.01",

"description": "Health check escrow",

})

escrow_id = result.get("escrow_id")

checks["escrow_create"] = {"status": "pass", "latency_ms": round(latency, 2)}

except Exception as e:

checks["escrow_create"] = {"status": "fail", "error": str(e)}

healthy = False

# Check 4: Escrow release

if escrow_id:

try:

result, latency = self._timed_execute("release_escrow", {

"escrow_id": escrow_id,

})

checks["escrow_release"] = {"status": "pass", "latency_ms": round(latency, 2)}

except Exception as e:

checks["escrow_release"] = {"status": "fail", "error": str(e)}

healthy = False

else:

checks["escrow_release"] = {"status": "skip", "reason": "no escrow_id"}

# Check 5: Balance verification

try:

result, latency = self._timed_execute("get_balance", {})

checks["balance_verify"] = {"status": "pass", "latency_ms": round(latency, 2)}

except Exception as e:

checks["balance_verify"] = {"status": "fail", "error": str(e)}

healthy = False

total_latency = sum(

c.get("latency_ms", 0) for c in checks.values()

)

return {

"healthy": healthy,

"checks": checks,

"total_latency_ms": round(total_latency, 2),

"timestamp": time.time(),

"agent_id": self.agent_id,

}

def run_and_alert(self, alert_callback: Callable = None) -> dict:

"""Run health check and trigger alert callback on failure."""

result = self.run_health_check()

if not result["healthy"] and alert_callback:

failed_checks = {

name: check for name, check in result["checks"].items()

if check.get("status") == "fail"

}

alert_callback({

"severity": "critical",

"title": "Agent Commerce Health Check Failed",

"failed_checks": failed_checks,

"timestamp": result["timestamp"],

})

return result

### 测试 HealthChecker

- 执行完整的合成交易健康检查流程。
- 每个步骤独立捕获状态和延迟时间。
- 若任一环节失败,标记为 `fail`;若跳过(如无托管 ID),标记为 `skip`。
- 返回结果包含整体健康状态、各步骤详情、总延迟时间与时间戳。
- 支持通过 `run_and_alert` 方法在检测失败时触发告警回调。

python
class TestHealthChecker:
    """验证健康检查器本身是否正常工作。"""

    def test_reports_healthy_when_all_pass(self, harness):
        """所有检查通过时,返回 healthy=True。"""
        harness.use_mocks()
        harness.mock_tool("create_wallet", {"wallet_id": "w-health"})
        harness.mock_tool("deposit", {"balance": "1.00", "transaction_id": "tx-h"})
        harness.mock_tool("create_escrow", {
            "escrow_id": "escrow-health", "status": "funded",
        })
        harness.mock_tool("release_escrow", {
            "escrow_id": "escrow-health", "status": "released",
        })
        harness.mock_tool("get_balance", {"balance": "0.99"})

        # HealthChecker 在测试模式下委托给 harness
        checker = HealthChecker(
            api_key=harness.api_key,
            agent_id=harness.agent_id,
            base_url=harness.base_url,
        )
        # 在真实测试中,需 patch _timed_execute 以使用 harness
        # 此处仅验证结构
        assert checker.timeout_ms == 5000.0

    def test_reports_unhealthy_on_escrow_failure(self):
        """Escrow 检查失败时,返回 healthy=False。"""
        checker = HealthChecker(
            api_key="test",
            agent_id="test",
            base_url="https://sandbox.greenhelix.net/v1",
        )

        alerts_received = []
        def mock_alert(alert):
            alerts_received.append(alert)

        # Patch 以模拟失败 —— 真实代码中应使用沙箱环境
        original = checker._timed_execute
        def failing_execute(tool, input_data):
            if tool == "create_escrow":
                raise ConnectionError("Gateway timeout")
            return original(tool, input_data)

        checker._timed_execute = failing_execute
        try:
            result = checker.run_and_alert(alert_callback=mock_alert)
        except Exception:
            pass  # 预期行为,因 wallet/deposit 也失败

### Webhook 失败消息队列(死信队列)

当 webhook 交付失败时,事件不应被静默丢弃。需实现一个死信队列,用于捕获失败的交付以供重试(P4)。

class WebhookDeadLetterQueue:

"""捕获失败的 webhook 交付,供手动重试。"""

def __init__(self, max_retries: int = 3, retry_delay_seconds: float = 60.0):

self.max_retries = max_retries

self.retry_delay_seconds = retry_delay_seconds

self._queue: list[dict] = []

def enqueue(self, event: dict, error: str):

"""将失败的 webhook 事件加入死信队列。"""

self._queue.append({

"event": event,

"error": error,

"retries": 0,

"enqueued_at": time.time(),

"next_retry_at": time.time() + self.retry_delay_seconds,

})

def get_pending(self) -> list[dict]:

"""获取已准备好重试的事件。"""

now = time.time()

return [

entry for entry in self._queue

if entry["retries"] < self.max_retries

and entry["next_retry_at"] <= now

]

def mark_delivered(self, event_id: str):

"""移除已成功重试的事件。"""

self._queue = [

e for e in self._queue

if e["event"].get("event_id") != event_id

]

def mark_retried(self, event_id: str):

"""增加重试次数并安排下次尝试。"""

for entry in self._queue:

if entry["event"].get("event_id") == event_id:

entry["retries"] += 1

entry["next_retry_at"] = (

time.time()

+ self.retry_delay_seconds * (2 ** entry["retries"])

)

break

def get_dead_letters(self) -> list[dict]:

"""获取已耗尽所有重试机会的事件。"""

return [

entry for entry in self._queue

if entry["retries"] >= self.max_retries

]

---

## 第七章:Agent 系统的 CI/CD

### 在 GitHub Actions 中运行测试

Agent 商业系统测试需要三个标准 CI 不具备的功能:GreenHelix API 密钥用于沙箱测试、并行测试运行之间的隔离(唯一 agent ID)、以及对 `sandbox.greenhelix.net` 的网络访问权限。此 GitHub Actions 模板可处理全部三项需求。

.github/workflows/agent-commerce-tests.yml

name: Agent Commerce Tests

on:

push:

branches: [main, develop]

pull_request:

branches: [main]

env:

GREENHELIX_API_KEY: ${{ secrets.GREENHELIX_API_KEY }}

GREENHELIX_BASE_URL: https://sandbox.greenhelix.net/v1

jobs:

# ── 第一层:快速的模拟测试(无需网络) ──────────

unit-tests:

runs-on: ubuntu-latest

steps:

- uses: actions/checkout@v4

- name: 设置 Python

uses: actions/setup-python@v5

with:

python-version: "3.12"

- name: 安装依赖

run: |

pip install pytest requests cryptography

- name: 运行单元测试(第一层)

run: |

pytest tests/ -x -q \

-m "not sandbox and not chaos" \

--tb=short \

--junit-xml=results/unit-tests.xml

- name: 上传测试结果

if: always()

uses: actions/upload-artifact@v4

with:

name: unit-test-results

path: results/unit-tests.xml

# ── 第二层:对沙箱环境的契约测试 ──────────────

contract-tests:

runs-on: ubuntu-latest

needs: unit-tests

steps:

- uses: actions/checkout@v4

- name: 设置 Python

uses: actions/setup-python@v5

with:

python-version: "3.12"

- name: 安装依赖

run: pip install pytest requests cryptography

- name: 运行契约测试(第二层)

run: |

pytest tests/ -x -q \

-m "sandbox and not chaos" \

--tb=short \

--junit-xml=results/contract-tests.xml

env:

GREENHELIX_API_KEY: ${{ secrets.GREENHELIX_API_KEY }}

- name: 上传测试结果

if: always()

uses: actions/upload-artifact@v4

with:

name: contract-test-results

path: results/contract-tests.xml

# ── 第三层 + 第四层:集成测试与混沌测试 ─────────────

integration-tests:

runs-on: ubuntu-latest

needs: contract-tests

steps:

- uses: actions/checkout@v4

- name: 设置 Python

uses: actions/setup-python@v5

with:

python-version: "3.12"

- name: 安装依赖

run: pip install pytest requests cryptography

- name: 运行集成测试(第三层)

run: |

pytest tests/ -x -q \

-m "integration" \

--tb=short \

--junit-xml=results/integration-tests.xml

env:

GREENHELIX_API_KEY: ${{ secrets.GREENHELIX_API_KEY }}

- name: 运行混沌测试(第四层)

run: |

pytest tests/ -q \

-m "chaos" \

--tb=short \

--junit-xml=results/chaos-tests.xml

env:

GREENHELIX_API_KEY: ${{ secrets.GREENHELIX_API_KEY }}

continue-on-error: true # 混沌测试可能包含预期失败

- name: 上传所有测试结果

if: always()

uses: actions/upload-artifact@v4

with:

name: integration-test-results

path: results/

# ── 对预发布环境的健康检查 ─────────────────────────

staging-health:

runs-on: ubuntu-latest

needs: integration-tests

if: github.ref == 'refs/heads/main'

steps:

- uses: actions/checkout@v4

- name: 设置 Python

uses: actions/setup-python@v5

with:

python-version: "3.12"

- name: 安装依赖

run: pip install requests

- name: 在预发布环境运行健康检查

run: |

python -c "

from health_checker import HealthChecker

import json, sys

checker = HealthChecker(

api_key='${{ secrets.GREENHELIX_API_KEY }}',

agent_id='ci-health-check',

base_url='https://sandbox.greenhelix.net/v1',

)

result = checker.run_health_check()

print(json.dumps(result, indent=2))

if not result['healthy']:

print('HEALTH CHECK FAILED')

sys.exit(1)

print('HEALTH CHECK PASSED')

"

使用 GreenHelix 沙箱环境

sandbox.greenhelix.net 的沙箱环境与生产 API 完全一致。请将其作为您的预发布环境使用。第 2 至第 4 层的所有测试均在此环境中运行。沙箱每天夜间重置余额,因此不要依赖 CI 运行之间状态的持久性。每次运行时,请使用 conftest 中的 agent_id 固定用例生成唯一的代理 ID(P1)。

# pytest.ini 或 pyproject.toml
# [tool.pytest.ini_options]
# markers:
#     sandbox: 需要 GreenHelix 沙箱环境的测试
#     chaos: 注入故障的混沌测试
#     integration: 多步骤工作流集成测试

@pytest.fixture
def ci_agent_id():
    """生成一个 CI 唯一的代理 ID,防止冲突。"""
    run_id = os.environ.get("GITHUB_RUN_ID", uuid.uuid4().hex[:8])
    return f"ci-agent-{run_id}-{uuid.uuid4().hex[:6]}"

灰度发布策略

在部署代理商业功能变更时,采用灰度发布模式:将 5% 的流量导向新版本,监控 15 分钟内的 AgentTracer 指标,然后决定是否正式上线或回滚。

class CanaryDeployment:
    """代理商业系统灰度发布控制器。"""

    def __init__(
        self,
        canary_tracer: AgentTracer,
        stable_tracer: AgentTracer,
        promotion_threshold: float = 0.95,
    ):
        self.canary = canary_tracer
        self.stable = stable_tracer
        self.promotion_threshold = promotion_threshold

    def evaluate(self) -> dict:
        """对比灰度环境指标与稳定基线。"""
        canary_metrics = self.canary.get_metrics()
        stable_metrics = self.stable.get_metrics()

        if canary_metrics["total_calls"] < 10:
            return {"decision": "waiting", "reason": "insufficient_data"}

        canary_success = canary_metrics.get("success_rate", 0)
        stable_success = stable_metrics.get("success_rate", 1)

        canary_latency = canary_metrics.get("p99_latency_ms", 0)
        stable_latency = stable_metrics.get("p99_latency_ms", 1)

        # 灰度成功率必须不低于稳定版本的98%
        success_ok = canary_success >= stable_success * 0.98

        # 灰度延迟不能比稳定版本增加超过20%
        latency_ok = canary_latency <= stable_latency * 1.20

        if success_ok and latency_ok:
            return {"decision": "promote", "canary_success": canary_success}
        else:
            return {
                "decision": "rollback",
                "reason": (
                    f"success: {canary_success} vs {stable_success}, "
                    f"p99: {canary_latency}ms vs {stable_latency}ms"
                ),
            }

回归检测

在 CI 运行之间跟踪关键指标,以在问题进入生产前发现回归。将指标作为 CI 构建产物保存,并与上一次运行进行对比。

class RegressionDetector:
    """检测 CI 运行之间的指标回归。"""

    def __init__(self, baseline_metrics: dict, current_metrics: dict):
        self.baseline = baseline_metrics
        self.current = current_metrics

    def check(self) -> list[dict]:
        """返回检测到的回归列表。"""
        regressions = []

        # 成功率下降即为回归(允许0.01的容忍)
        baseline_sr = self.baseline.get("success_rate", 1.0)
        current_sr = self.current.get("success_rate", 1.0)
        if current_sr < baseline_sr - 0.01:
            regressions.append({
                "metric": "success_rate",
                "baseline": baseline_sr,
                "current": current_sr,
                "delta": current_sr - baseline_sr,
            })

        # 延迟回归(p95延迟增加超过20%)
        baseline_p95 = self.baseline.get("p95_latency_ms", 0)
        current_p95 = self.current.get("p95_latency_ms", 0)
        if baseline_p95 > 0 and current_p95 > baseline_p95 * 1.20:
            regressions.append({
                "metric": "p95_latency_ms",
                "baseline": baseline_p95,
                "current": current_p95,
                "delta_pct": round(
                    (current_p95 - baseline_p95) / baseline_p95 * 100, 1
                ),
            })

        # 每个工具的错误率上升(超过5%视为回归)
        baseline_errors = self.baseline.get("error_rate_by_tool", {})
        current_errors = self.current.get("error_rate_by_tool", {})
        for tool, current_rate in current_errors.items():
            baseline_rate = baseline_errors.get(tool, 0)
            if current_rate > baseline_rate + 0.05:
                regressions.append({
                    "metric": f"error_rate:{tool}",
                    "baseline": baseline_rate,
                    "current": current_rate,
                })

        return regressions

第八章:下一步该做什么

本指南涵盖了代理测试的四层金字塔、工具合约测试、基于 Saga 的工作流测试、混沌故障注入、使用 AgentTracer 实现生产环境可观测性、通过 HealthChecker 实现告警,以及与 GitHub Actions 的 CI/CD 集成。四个核心类——AgentTestHarnessChaosMiddlewareAgentTracerHealthChecker——共同构成一个可靠性栈,从开发到生产全程包裹每一个 GreenHelix 工具调用。

GreenHelix 网关自身的测试套件(共 260+ 项测试,覆盖 9 个模块,其中网关本身包含 1,353 项测试)也采用了此处描述的相同模式:使用确定性模拟处理业务逻辑,通过沙箱集成测试验证合约,通过类混沌故障注入测试支付幂等性。这些模式并非理论设想——它们均来自保护网关本身的测试基础设施。

附带指南

如需了解本测试所保护的商业模式,请参阅以下配套指南:

  • 代理间商业:托管、支付与信任(P1)——AgentCommerce 类、托管模式、市场发现、订阅机制和争议解决。
  • AI 代理财务运营手册(P2)——AgentFinOps 类、按代理预算上限、Webhook 告警、舰队仪表盘和成本归因。
  • 可验证的交易机器人声誉(P3)——使用 Ed25519 签名和 Merkle 声明链实现的加密利润证明。
  • 防篡改的交易机器人审计日志(P4)——欧盟 AI 法规合规、MiFID II 报告要求,以及 Merkle 链轮换机制。
  • 如何在开展业务前验证任意 AI 代理(P5)——AgentVerifier 类、五层信任体系和持续声誉监控。
  • 代理策略市场运营手册(P6)——通过绩效托管销售经验证的交易策略。
  • 强化代理商业安全(P7)——基于 OWASP 标准的安全加固,使用 SecureAgentSecurityMonitor
  • 代理 SaaS 工厂(P8)——通过 AgentDeveloperAgentDBAAgentBilling 实现自主微 SaaS 创建。

在沙箱中实践

沙箱 sandbox.greenhelix.net 可免费使用,支持任意 API 密钥。请对它运行 HealthChecker。部署 GitHub Actions 模板。使用 ChaosMiddleware 故意制造故障。本指南中的模式可直接复制、调整并立即部署。

代码包

本手册及另外八本配套指南均以完整代码包形式提供。每本指南介绍一个可投入生产的 Python 类,共同覆盖代理电商系统的全生命周期:构建、安全、测试、监控与扩展。

如需完整的 API 参考和工具目录(共 128 个工具),请访问 GreenHelix 开发者文档:[https://api.greenhelix.net/docs](https://api.greenhelix.net/docs)。


*价格:$29 | 格式:数字指南 | 更新:终身访问*

M
@mirni

已收录 3 个 Skill

相关推荐