Agent Observability Stack: Distributed Tracing, Metrics, and Alerting for Multi-Agent Systems

构建多智能体系统的分布式追踪、指标监控与告警体系,基于OpenTelemetry和GreenHelix平台。

已扫描
适合谁
AI系统开发者、智能体平台运维人员
不适合谁
无技术背景的普通用户、无需监控的简单脚本使用者
国内可用性
需网络配置。可能需要网络配置或第三方服务可访问。
安装难度
新手友好(★☆☆)。基于终端操作、依赖、API Key 和本地环境要求的初步判断。

安装与下载

openclaw skills install @mirni/greenhelix-agent-observability-stack

Skill 说明

命令、参数、文件名以原文为准

Agent Observability Stack:多智能体系统的分布式追踪、指标与告警

注意:本指南为教学用途,包含示例代码。

不执行任何代码,无需凭证或依赖安装。

所有示例均使用 GreenHelix 沙箱(https://sandbox.greenhelix.net),提供 500 个免费积分——无需 API 密钥即可开始使用。

当 Agent A 调用 Agent B,Agent B 再调用 Agent C,而事务耗时从 200ms 增至 12 秒时,瓶颈出现在哪里?当您的智能体集群每日处理 10,000 笔交易,收入下降 15% 时,是哪个智能体表现不佳?当托管结算在凌晨 3 点无声失败,您多久才能发现?传统监控工具——ping 检查、CPU 图表、运行时间仪表板——无法回答这些关于分布式智能体系统的问题。它们原本是为单体架构和简单请求-响应服务设计的。智能体商业的本质完全不同:事务跨越多个自治智能体,每个智能体拥有独立状态、定价机制和故障模式。一次面向客户的操作可能经过五个智能体、两个托管合约和三次独立计费事件才完成。故障面呈组合式而非线性。

你需要的是可观测性,而不仅仅是监控。监控告诉你“出了问题”,可观测性则告诉你“为什么出问题、在哪里出问题、如何修复”。对于智能体商业而言,这意味着:通过分布式追踪追踪跨智能体边界的事务流;通过自定义指标衡量业务成果(而不仅是基础设施健康);通过智能告警在用户发现问题前及时捕获异常。15 分钟停机与 4 小时收入流失之间的区别,就在于你的可观测性体系是否理解智能体之间的事务流转。

本指南将从零开始构建一个完整的可观测性栈。我们采用 OpenTelemetry 标准以保证互操作性,直接集成 GreenHelix 的指标与事件工具以获取智能体特异性遥测数据,并构建能理解智能体商业独特模式的异常检测机制。所有组件均为可立即部署的生产级 Python 代码。最终你将获得:跨智能体调用的分布式追踪、自定义业务指标、异常检测、可视化仪表板、带升级策略的告警机制,以及带有成本归因的 SLA 监控。

你将学到的内容

  • 第一章:智能体可观测性的三大支柱
  • 第二章:AgentTracer 类
  • 第三章:跨智能体调用的分布式追踪
  • 第四章:MetricsCollector 类
  • 第五章:异常检测
  • 第六章:仪表板设计
  • 第七章:AlertManager 类
  • 第八章:SLA 监控与成本归因
  • 后续内容

完整指南

Agent Observability Stack:多智能体系统的分布式追踪、指标与告警

当 Agent A 调用 Agent B,Agent B 再调用 Agent C,而事务耗时从 200ms 增至 12 秒时,瓶颈出现在哪里?当您的智能体集群每日处理 10,000 笔交易,收入下降 15% 时,是哪个智能体表现不佳?当托管结算在凌晨 3 点无声失败,您多久才能发现?传统监控工具——ping 检查、CPU 图表、运行时间仪表板——无法回答这些关于分布式智能体系统的问题。它们原本是为单体架构和简单请求-响应服务设计的。智能体商业的本质完全不同:事务跨越多个自治智能体,每个智能体拥有独立状态、定价机制和故障模式。一次面向客户的操作可能经过五个智能体、两个托管合约和三次独立计费事件才完成。故障面呈组合式而非线性。

你需要的是可观测性,而不仅仅是监控。监控告诉你“出了问题”,可观测性则告诉你“为什么出问题、在哪里出问题、如何修复”。对于智能体商业而言,这意味着:通过分布式追踪追踪跨智能体边界的事务流;通过自定义指标衡量业务成果(而不仅是基础设施健康);通过智能告警在用户发现问题前及时捕获异常。15 分钟停机与 4 小时收入流失之间的区别,就在于你的可观测性体系是否理解智能体之间的事务流转。

本指南将从零开始构建一个完整的可观测性栈。我们采用 OpenTelemetry 标准以保证互操作性,直接集成 GreenHelix 的指标与事件工具以获取智能体特异性遥测数据,并构建能理解智能体商业独特模式的异常检测机制。所有组件均为可立即部署的生产级 Python 代码。最终你将获得:跨智能体调用的分布式追踪、自定义业务指标、异常检测、可视化仪表板、带升级策略的告警机制,以及带有成本归因的 SLA 监控。


快速入门:本指南中所有示例均可在 GreenHelix 沙箱(https://sandbox.greenhelix.net)中运行,提供 500 个免费积分——无需 API 密钥。

第一章:智能体可观测性的三大支柱

分布式系统的可观测性基于三大支柱:追踪(Traces)、指标(Metrics)和日志(Logs)。每一项解决不同类别的问题,且单独使用均不充分。对于智能体商业而言,每项支柱都有特定要求,与传统 Web 服务可观测性存在差异。

追踪:跨智能体边界追踪事务流程

追踪是单个事务在系统中流动的完整记录。在智能体商业中,一次追踪可能始于客户智能体发起服务请求,经网关智能体转发,调用专业智能体,触发托管创建,等待履约完成,最终以结算和付款分发结束。旅程中的每一步称为一个“跨度”(span),所有跨度的集合构成一条完整的追踪。

Agent 观测性栈:多智能体系统的分布式追踪、指标与告警

版本:1.3.1

分块:2/17

与传统分布式追踪的关键区别在于,智能体边界不仅是服务边界,更是信任边界。当 Agent A 调用 Agent B 时,该调用中嵌入了经济交易。追踪记录不仅需要包含延迟和状态码,还必须涵盖计费事件、资金托管状态变更以及每个跨度的经济上下文。一个仅显示“200 OK,耗时 150ms”的跨度是不完整的,若未同时标明“已计费 0.003 信用点,创建托管 ID esw_abc123”,则信息缺失。

追踪帮助回答以下问题:

  • 这笔特定交易为何失败?
  • 延迟来自何处?
  • 链路中的哪个智能体是性能瓶颈?
  • 计费事件是否与实际执行的工作相匹配?

指标:衡量智能体健康度、性能与业务成果

指标是随时间聚合的数值测量结果。与追踪提供单个交易细节不同,指标呈现整体态势。在智能体商业场景中,指标可分为三类。

基础设施指标 衡量智能体作为软件系统本身的健康状况:CPU 使用率、内存消耗、请求队列深度、连接池利用率等。这些属于基础要求,多数监控工具均可良好处理。

性能指标 反映智能体在负载下的行为表现:请求延迟(p50、p95、p99)、吞吐量(每秒请求数)、错误率、超时率。这类指标需使用直方图和百分位计算,而非仅依赖平均值。

业务指标 是智能体商业与传统服务的核心差异所在。需跟踪每个智能体的收入、单次交易成本、托管结算成功率、争议率、SLA 合规率、客户满意度代理指标等。这些指标直接反映你的智能体集群是盈利还是亏损。

GreenHelix 的 submit_metrics 工具支持自定义指标提交,是三类指标的理想汇聚点。该工具接收指标名称、数值、维度和时间戳,可构建丰富且可查询的指标序列。

日志:结构化事件日志用于调试

日志是带有上下文的离散事件。在智能体商业中,结构化日志至关重要,因为需要跨智能体边界关联日志事件。一条纯文本日志如“处理请求出错”在拥有 50 个智能体、每小时生成数千条日志的情况下毫无价值。

结构化日志应包含可机器解析的字段,如追踪 ID、跨度 ID、智能体 ID、交易 ID 及相关业务上下文。当出现问题时,可通过追踪 ID 过滤日志,按时间顺序查看该交易涉及的所有智能体产生的全部事件。

GreenHelix 的 get_events 工具用于获取事件流,作为结构化日志源。事件包括智能体交互、计费事件、托管状态变更、Webhook 发送等。将应用日志与 GreenHelix 事件关联后,可完整还原事件发生过程及原因。

三者缺一不可的原因

  • 指标 告诉你“有问题”:“错误率在 14:32 升至 5%。”
  • 追踪 告诉你“在哪里”和“为什么”:“交易 txn_789 在 Agent C 失败,因托管创建超时 30 秒。”
  • 日志 提供细节:“Agent C 的连接池已耗尽,因 Agent D 在失败结算后未释放连接。”

若无指标,你只能在用户投诉后才发现问题;若无追踪,无法定位具体是哪个智能体或哪一步导致故障;若无日志,无法深入理解根本原因以有效修复。智能体商业使这种依赖关系倍增——交互更复杂,故障模式更隐蔽,而遗漏问题的经济后果直接且可量化。

GreenHelix 观测性工具

三个 GreenHelix 工具构成观测性栈的基础:

submit_metrics 接收带维度的自定义指标数据点。我们用它将智能体性能指标、业务指标及自定义计数器推送至 GreenHelix 的指标存储。

get_events 按智能体、时间范围和事件类型过滤,获取事件流。我们用它提取结构化事件数据,用于追踪关联与日志增强。

register_webhook 为特定事件类型创建 Webhook 订阅。我们用它在关键事件发生时触发实时告警,避免轮询排查问题。

from greenhelix import GreenHelixClient

client = GreenHelixClient(api_key="your-api-key")

# 提交自定义指标
client.execute_tool("submit_metrics", {
    "agent_id": "agent-payment-processor",
    "metrics": [
        {
            "name": "transaction.latency_ms",
            "value": 245.3,
            "dimensions": {
                "agent": "agent-payment-processor",
                "operation": "process_payment",
                "status": "success"
            },
            "timestamp": "2026-04-07T14:30:00Z"
        }
    ]
})

# 获取用于追踪关联的事件
events = client.execute_tool("get_events", {
    "agent_id": "agent-payment-processor",
    "event_types": ["escrow.created", "escrow.settled", "billing.charged"],
    "start_time": "2026-04-07T14:00:00Z",
    "end_time": "2026-04-07T15:00:00Z"
})

# 注册 Webhook 实现实时告警
client.execute_tool("register_webhook", {
    "url": "https://alerts.myfleet.com/webhook",
    "event_types": ["escrow.failed", "billing.error"],
    "agent_id": "agent-payment-processor"
})

这三项工具结合 OpenTelemetry 的追踪与指标 API,足以构建生产级观测性体系。


第二章:AgentTracer 类

我们观测性栈的核心是一个理解智能体商业逻辑的追踪器。标准 OpenTelemetry 追踪器仅针对 HTTP 请求和数据库调用创建跨度。我们的 AgentTracer 在 OpenTelemetry 基础上封装,增加了智能体特有上下文:计费事件、托管状态、经济元数据以及跨智能体的追踪传播能力。

设计原则

追踪器必须轻量。添加可观测性不应显著增加事务延迟。我们目标是每个跨度的开销低于 1ms,这意味着需要使用内存缓冲并异步导出。

追踪器必须兼容 OpenTelemetry。这意味着可以将追踪数据导出到 Jaeger、Zipkin、Grafana Tempo 或任何其他 OTel 兼容后端。我们不会将你锁定在专有格式中。

追踪器必须理解代理商业原语。跨度应自动捕获账单金额、托管 ID、代理身份和交易类型,而无需在每个调用位置手动注入代码。

AgentTracer 实现

markdown

Agent 可观测性栈:分布式追踪、指标与告警,适用于多智能体系统

概述

本技能提供一套完整的可观测性解决方案,专为多智能体系统设计,支持分布式追踪、指标采集和告警机制。通过 OpenTelemetry 兼容的 AgentTracer,可实现跨智能体调用的端到端追踪,并将关键操作(如交易、托管、计费)转化为结构化指标。


核心组件

AgentSpanAttributes

定义智能体商业操作的标准属性键:

属性名说明
agent.id智能体唯一标识
agent.role智能体角色(如 buyer, seller
agent.transaction.id事务 ID
agent.escrow.id托管 ID
agent.billing.amount计费金额
agent.billing.currency计费货币
agent.operation.type操作类型(如 payment, transfer
agent.peer.id对端智能体 ID
agent.sla.tierSLA 等级
agent.cost_center成本中心

GreenHelixSpanExporter

将追踪跨度(span)导出为指标,提交至 GreenHelix 平台。

初始化参数

  • client: GreenHelix 客户端实例
  • agent_id: 当前智能体 ID
  • batch_size: 批量发送大小,默认为 50

导出逻辑

  1. 将每个 span 的持续时间转换为毫秒。
  2. 提取 span 的属性并构建维度信息。
  3. 生成以下两类指标:

- agent.span.duration_ms:跨度持续时间指标

- agent.billing.amount:计费金额指标(仅当存在计费属性时)

  1. 按批次批量提交至 submit_metrics 工具。

关键特性

  • 支持自动批处理与异步提交。
  • 错误处理:若提交失败,返回 FAILURE
  • shutdown() 方法无实际操作。

_ns_to_iso(ns_timestamp: int) -> str

将纳秒级时间戳转换为 ISO 8601 格式字符串。

def _ns_to_iso(ns_timestamp: int) -> str:
    import datetime
    dt = datetime.datetime.fromtimestamp(ns_timestamp / 1e9, tz=datetime.timezone.utc)
    return dt.isoformat()

AgentTracer

OpenTelemetry 兼容的智能体追踪器,用于记录智能体间的交互行为。

初始化参数

  • agent_id: 智能体唯一标识
  • client: GreenHelix 客户端对象(可选)
  • service_name: 服务名称,默认为 agent-{agent_id}
  • exporters: 额外的跨度导出器列表(可选)

主要功能

start_span

创建一个带有智能体上下文的新跨度。

参数:

  • name: 跨度名称
  • kind: 跨度类型(默认为 INTERNAL
  • attributes: 自定义属性
  • peer_agent_id: 对端智能体 ID(用于关联)

使用示例:

with tracer.start_span("process.payment", kind=SpanKind.SERVER, attributes={"status": "success"}) as span:
    # 执行业务逻辑
    pass
trace_agent_call

追踪从当前智能体向目标智能体发起的调用。

参数:

  • target_agent_id: 目标智能体 ID
  • operation: 操作类型(如 transfer, verify
  • transaction_id: 事务 ID(可选,自动生成 UUID)

生成跨度名称:

call.{target_agent_id}.{operation}

跨度类型: CLIENT

trace_escrow

追踪托管生命周期操作。

参数:

  • escrow_id: 托管 ID
  • operation: 操作类型(如 create, release, refund

生成跨度名称:

escrow.{operation}

跨度类型: INTERNAL

trace_billing

追踪计费事件。

参数:

  • amount: 计费金额
  • currency: 货币单位(默认为 credits

生成跨度名称:

billing.charge

跨度类型: INTERNAL

get_trace_context()

提取当前追踪上下文,用于跨智能体传播。

返回值:

  • traceparent: W3C Trace Context 标头
  • x-agent-id: 当前智能体 ID
inject_context(headers: Dict[str, str])

将追踪上下文注入 HTTP 请求头中。

示例:

headers = {"Content-Type": "application/json"}
headers = tracer.inject_context(headers)
shutdown()

刷新所有待处理的跨度并关闭追踪器。


使用方式

from agent_tracing import AgentTracer

# 初始化追踪器
tracer = AgentTracer(
    agent_id="agent-123",
    client=greenhelix_client,
    service_name="payment-service"
)

# 追踪一次支付调用
with tracer.trace_agent_call(target_agent_id="agent-456", operation="pay") as span:
    # 执行远程调用
    result = remote_api.call(...)
    if not result.success:
        span.set_status(StatusCode.ERROR)

注意事项

  • 所有跨度必须在 with 块中使用,以确保正确结束。
  • 若需跨服务传递上下文,请使用 inject_context 注入请求头。
  • GreenHelixSpanExporter 依赖于外部工具 submit_metrics,请确保其可用。
  • 不建议在高并发场景中使用过大的 batch_size,以免内存压力过大。

Agent 可观测性栈:多智能体系统的分布式追踪、指标与告警

版本:1.3.1

分块:5/17

AgentTracer 与 GreenHelix 客户端调用无缝集成。每次 API 交互都会生成一个跨度(span),这些跨度会自动捕获耗时、状态以及智能体交易相关属性。

from greenhelix import GreenHelixClient

client = GreenHelixClient(api_key="your-api-key")
tracer = AgentTracer(agent_id="agent-marketplace", client=client)

# 跟踪多步骤交易
with tracer.trace_agent_call("agent-data-provider", "fetch_dataset") as span:
    result = client.execute_tool("call_agent", {
        "target": "agent-data-provider",
        "operation": "fetch_dataset",
        "params": {"dataset_id": "ds_12345"},
        "headers": tracer.get_trace_context(),
    })
    span.set_attribute("dataset.size_bytes", result.get("size", 0))

    if result.get("status") == "error":
        span.set_status(StatusCode.ERROR, result.get("message", ""))
    else:
        span.set_status(StatusCode.OK)

# 跟踪托管资金生命周期
with tracer.trace_escrow("esw_abc123", "create") as span:
    escrow = client.execute_tool("create_escrow", {
        "amount": 5.00,
        "payer": "agent-buyer",
        "payee": "agent-seller",
    })
    span.set_attribute("escrow.amount", 5.00)
    span.set_status(StatusCode.OK)

核心设计原则是:AgentTracer 包装 OpenTelemetry,而非替代它。你可以同时使用标准的 OTel 导出器(如 Jaeger、OTLP、控制台等)和 GreenHelix 导出器。这意味着你的智能体追踪数据可直接出现在现有可观测性基础设施中,无需迁移。

GreenHelixSpanExporter 通过 submit_metrics 将跨度转换为指标。这种双重导出机制使得每个被追踪的操作自动产生延迟和计费指标,无需额外手动注入指标代码。


第三章:跨智能体调用的分布式追踪

单个智能体的追踪相对简单。真正的挑战在于追踪一个事务在多个独立智能体之间流转的过程——这些智能体可能运行在不同的基础设施上,由不同团队维护,并通过 GreenHelix 网关进行通信。

问题:追踪上下文传播

当智能体 A 调用智能体 B 时,智能体 B 并不天然知晓智能体 A 的追踪信息。若未显式传播上下文,智能体 B 会启动一个新的、孤立的追踪。最终你得到的是五个彼此割裂的追踪记录,而非完整事务流程的一条统一追踪链。

W3C 追踪上下文标准通过两个 HTTP 头部解决此问题:traceparent(包含追踪 ID、跨度 ID 和采样标志)和 tracestate(包含厂商特定数据)。我们的 AgentTracer 已通过 get_trace_context() 生成这些头部。关键挑战在于确保链路中的每个智能体都能正确提取、使用并继续传播这些头部。

智能体间调用中的追踪上下文

以下是带有追踪的智能体间调用的标准模式:调用方将上下文注入请求头;接收方提取上下文并创建子跨度。

# === 调用方智能体(智能体 A) ===

class TracedAgentClient:
    """自动传播追踪上下文的 HTTP 客户端。"""

    def __init__(self, tracer: AgentTracer, client):
        self._tracer = tracer
        self._client = client

    def call_agent(
        self,
        target_agent: str,
        operation: str,
        params: Dict[str, Any],
        transaction_id: str = None,
    ) -> Dict[str, Any]:
        """调用另一个智能体,自动传播追踪上下文。"""
        with self._tracer.trace_agent_call(
            target_agent, operation, transaction_id
        ) as span:
            headers = self._tracer.get_trace_context()
            headers["x-transaction-id"] = transaction_id or ""

            result = self._client.execute_tool("call_agent", {
                "target": target_agent,
                "operation": operation,
                "params": params,
                "headers": headers,
            })

            span.set_attribute("response.status", result.get("status", "unknown"))

            if result.get("error"):
                span.set_status(StatusCode.ERROR, result["error"])
            else:
                span.set_status(StatusCode.OK)

            return result

# === 接收方智能体(智能体 B) ===

from opentelemetry.trace.propagation import TraceContextTextMapPropagator
from opentelemetry.context import Context

class TracedAgentHandler:
    """提取并延续追踪上下文的请求处理器。"""

    def __init__(self, tracer: AgentTracer):
        self._tracer = tracer
        self._propagator = TraceContextTextMapPropagator()

    def handle_request(
        self,
        operation: str,
        params: Dict[str, Any],
        headers: Dict[str, str],
    ) -> Dict[str, Any]:
        """处理传入的智能体请求,延续追踪链。"""
        # 从传入的头部中提取追踪上下文
        ctx = self._propagator.extract(carrier=headers)

        # 在提取的上下文中创建子跨度
        token = attach(ctx)
        try:
            with self._tracer.start_span(
                f"handle.{operation}",
                kind=SpanKind.SERVER,
                attributes={
                    "caller.agent_id": headers.get("x-agent-id", "unknown"),
                    AgentSpanAttributes.OPERATION_TYPE: operation,
                },
            ) as span:
                result = self._dispatch(operation, params, span)
                return result
        finally:
            detach(token)

    def _dispatch(
        self, operation: str, params: Dict[str, Any], span
    ) -> Dict[str, Any]:
        """路由到对应的操作处理器。"""
        handler = getattr(self, f"op_{operation}", None)
        if not handler:
            span.set_status(StatusCode.ERROR, f"Unknown operation: {operation}")
            return {"error": f"Unknown operation: {operation}"}
        return handler(params, span)

跨五个代理追踪完整交易流程

考虑一个客户代理订购数据分析报告的电商交易场景,该交易流程经过五个代理:

  1. 客户代理 —— 启动请求
  2. 市场代理 —— 匹配请求与服务提供方
  3. 数据提供代理 —— 提供原始数据
  4. 分析代理 —— 处理并分析数据
  5. 交付代理 —— 格式化并交付报告
def traced_execute(
    client: TracedAgentClient,
    tracer: AgentTracer,
    request: Dict[str, Any],
) -> Dict[str, Any]:
    """跨多个代理执行完整的带追踪交易流程。"""
    transaction_id = str(uuid.uuid4())

    with tracer.start_span(
        "transaction.data_analysis_report",
        kind=SpanKind.INTERNAL,
        attributes={
            AgentSpanAttributes.TRANSACTION_ID: transaction_id,
            AgentSpanAttributes.OPERATION_TYPE: "data_analysis_report",
        },
    ) as root_span:

        # 步骤 1:通过市场代理寻找服务提供方
        with tracer.start_span("step.find_provider") as step_span:
            match = client.call_agent(
                "agent-marketplace",
                "find_provider",
                {"capability": "data_analysis", "budget": request["budget"]},
                transaction_id=transaction_id,
            )
            step_span.set_attribute("provider.matched", match.get("provider_id", ""))

        provider_id = match["provider_id"]

        # 步骤 2:创建交易托管
        with tracer.trace_escrow(f"esw_{transaction_id[:8]}", "create") as esc_span:
            escrow = client.call_agent(
                "agent-marketplace",
                "create_escrow",
                {
                    "amount": match["price"],
                    "payer": request["customer_agent"],
                    "payee": provider_id,
                },
                transaction_id=transaction_id,
            )
            esc_span.set_attribute("escrow.id", escrow.get("escrow_id", ""))

        # 步骤 3:获取原始数据
        with tracer.start_span("step.fetch_data") as step_span:
            data = client.call_agent(
                match["data_source"],
                "fetch_dataset",
                {"dataset_id": request["dataset_id"]},
                transaction_id=transaction_id,
            )
            step_span.set_attribute("data.records", data.get("record_count", 0))

        # 步骤 4:执行分析
        with tracer.start_span("step.analyze") as step_span:
            analysis = client.call_agent(
                provider_id,
                "analyze_data",
                {"data": data["data"], "analysis_type": request["analysis_type"]},
                transaction_id=transaction_id,
            )
            step_span.set_attribute("analysis.confidence", analysis.get("confidence", 0))

        # 步骤 5:交付报告
        with tracer.start_span("step.deliver") as step_span:
            delivery = client.call_agent(
                "agent-delivery",
                "deliver_report",
                {
                    "report": analysis["report"],
                    "recipient": request["customer_agent"],
                    "format": request.get("format", "pdf"),
                },
                transaction_id=transaction_id,
            )
            step_span.set_attribute("delivery.channel", delivery.get("channel", ""))

        # 步骤 6:结算托管
        with tracer.trace_escrow(escrow["escrow_id"], "settle") as esc_span:
            settlement = client.call_agent(
                "agent-marketplace",
                "settle_escrow",
                {"escrow_id": escrow["escrow_id"]},
                transaction_id=transaction_id,
            )
            esc_span.set_attribute("settlement.status", settlement.get("status", ""))

        root_span.set_status(StatusCode.OK)
        return {
            "transaction_id": transaction_id,
            "report_url": delivery.get("url"),
            "total_cost": match["price"],
        }

使用 GreenHelix 事件进行追踪关联

GreenHelix 事件拥有独立的事件 ID。为了获得完整的视图,需要将 OpenTelemetry 的追踪 ID 与 GreenHelix 的事件 ID 进行关联。方法是在调用 GreenHelix API 时嵌入追踪 ID,然后在分析阶段基于该 ID 进行关联。

def trace_escrow_lifecycle(
    tracer: AgentTracer,
    client,
    escrow_id: str,
) -> Dict[str, Any]:
    """追踪并关联托管资金的完整生命周期与 GreenHelix 事件。"""
    trace_ctx = tracer.get_trace_context()
    trace_id = trace_ctx.get("traceparent", "").split("-")[1] if trace_ctx else ""

    with tracer.trace_escrow(escrow_id, "lifecycle") as span:
        # 获取该托管资金相关的 GreenHelix 事件
        events = client.execute_tool("get_events", {
            "filters": {"escrow_id": escrow_id},
            "start_time": "2026-04-07T00:00:00Z",
            "end_time": "2026-04-07T23:59:59Z",
        })

        lifecycle = {
            "escrow_id": escrow_id,
            "trace_id": trace_id,
            "events": [],
        }

        for event in events.get("events", []):
            event_type = event.get("type", "")
            lifecycle["events"].append({
                "type": event_type,
                "timestamp": event.get("timestamp"),
                "greenhelix_event_id": event.get("event_id"),
                "trace_id": trace_id,
            })

            # 为每个 GreenHelix 事件创建子跨度
            with tracer.start_span(
                f"greenhelix.event.{event_type}",
                attributes={
                    "greenhelix.event_id": event.get("event_id", ""),
                    "greenhelix.event_type": event_type,
                    AgentSpanAttributes.ESCROW_ID: escrow_id,
                },
            ) as event_span:
                event_span.set_status(StatusCode.OK)

        span.set_attribute("lifecycle.event_count", len(lifecycle["events"]))
        return lifecycle

父-子跨度关系

OpenTelemetry 通过 Python 上下文管理器自动管理父-子跨度关系。当你嵌套使用 start_span 调用时,每个内部跨度都会成为外部跨度的子跨度。这在 Trace 可视化工具(如 Jaeger)中形成树状结构。

对于代理商业场景,典型的层级结构如下:

transaction.data_analysis_report (根跨度)
  |-- step.find_provider
  |     |-- call.agent-marketplace.find_provider (CLIENT)
  |-- escrow.create
  |     |-- call.agent-marketplace.create_escrow (CLIENT)
  |-- step.fetch_data
  |     |-- call.agent-data-source.fetch_dataset (CLIENT)
  |-- step.analyze
  |     |-- call.agent-analysis.analyze_data (CLIENT)
  |-- step.deliver
  |     |-- call.agent-delivery.deliver_report (CLIENT)
  |-- escrow.settle
        |-- call.agent-marketplace.settle_escrow (CLIENT)

每个调用方的 CLIENT 跨度,在接收方对应一个 SERVER 跨度。所有代理共享相同的 trace ID,因此 Trace 可视化工具可将整个交易呈现为跨越五个代理的单一、统一的追踪记录。


第四章:MetricsCollector 类

追踪提供单个事务的详细信息。指标则提供聚合视角:当前集群表现如何?今天与昨天相比有何变化?是否达成业务目标?MetricsCollector 类提供了一个清晰的 API,用于从代理集群中收集、缓冲并导出指标。

指标类型

代理商业场景中存在三种基础指标类型:

计数器(Counters) 是单调递增的数值。适用于统计总请求量、总错误数、总收入、总交易数等。用于回答“有多少”的问题。

仪表(Gauges) 是某一时刻的瞬时值,可上升或下降。适用于活跃连接数、队列深度、当前余额、活跃托管资金等。用于回答“当前是多少”的问题。

直方图(Histograms) 用于跟踪数值的分布情况。适用于延迟时间、交易金额、响应大小等。直方图提供百分位数(如 p50、p95、p99),而非仅平均值,从而回答“分布情况如何”的问题。

MetricsCollector 实现

import time
import math
import threading
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any

@dataclass
class MetricPoint:
    """单个指标数据点。"""
    name: str
    value: float
    dimensions: Dict[str, str]
    timestamp: float
    metric_type: str  # "counter", "gauge", "histogram"

class HistogramBuckets:
    """用于跟踪直方图指标的值分布。"""

    def __init__(self, boundaries: List[float] = None):
        self.boundaries = boundaries or [
            5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000
        ]
        self.buckets = [0] * (len(self.boundaries) + 1)
        self.sum = 0.0
        self.count = 0
        self.min = float("inf")
        self.max = float("-inf")
        self._values: List[float] = []

    def observe(self, value: float):
        self.sum += value
        self.count += 1
        self.min = min(self.min, value)
        self.max = max(self.max, value)
        self._values.append(value)

        for i, boundary in enumerate(self.boundaries):
            if value <= boundary:
                self.buckets[i] += 1
                return
        self.buckets[-1] += 1

    def percentile(self, p: float) -> float:
        if not self._values:
            return 0.0
        sorted_vals = sorted(self._values)
        idx = int(math.ceil(p / 100.0 * len(sorted_vals))) - 1
        return sorted_vals[max(0, idx)]

    def reset(self):
        self.buckets = [0] * (len(self.boundaries) + 1)
        self.sum = 0.0
        self.count = 0
        self.min = float("inf")
        self.max = float("-inf")
        self._values = []

class MetricsCollector:
    """收集、缓冲并导出代理商业指标。"""

    def __init__(
        self,
        agent_id: str,
        client=None,
        flush_interval_seconds: float = 60.0,
        buffer_size: int = 1000,
    ):
        self.agent_id = agent_id
        self._client = client
        self._flush_interval = flush_interval_seconds
        self._buffer_size = buffer_size

        self._counters: Dict[str, float] = defaultdict(float)
        self._gauges: Dict[str, float] = {}
        self._histograms: Dict[str, HistogramBuckets] = {}
        self._buffer: List[MetricPoint] = []
        self._lock = threading.Lock()

        self._dimensions_registry: Dict[str, Dict[str, str]] = {}
        self._flush_callbacks: List[Callable] = []

        self._running = False
        self._flush_thread: Optional[threading.Thread] = None

    def start(self):
        """启动后台刷新线程。"""
        self._running = True
        self._flush_thread = threading.Thread(
            target=self._flush_loop, daemon=True
        )
        self._flush_thread.start()

    def stop(self):
        """停止后台刷新线程并刷新剩余指标。"""
        self._running = False
        if self._flush_thread:
            self._flush_thread.join(timeout=5.0)
        self.flush()

    # --- 计数器操作 ---

    def increment(
        self,
        name: str,
        value: float = 1.0,
        dimensions: Dict[str, str] = None,
    ):
        """递增一个计数器指标。"""
        key = self._make_key(name, dimensions)
        with self._lock:
            self._counters[key] += value
            self._dimensions_registry[key] = dimensions or {}
            self._buffer_point(name, self._counters[key], dimensions, "counter")

    # --- 仪表盘操作 ---

    def gauge_set(
        self,
        name: str,
        value: float,
        dimensions: Dict[str, str] = None,
    ):
        """将一个仪表盘指标设置为绝对值。"""
        key = self._make_key(name, dimensions)
        with self._lock:
            self._gauges[key] = value
            self._dimensions_registry[key] = dimensions or {}
            self._buffer_point(name, value, dimensions, "gauge")

    def gauge_increment(
        self,
        name: str,
        value: float = 1.0,
        dimensions: Dict[str, str] = None,
    ):
        """递增一个仪表盘指标。"""
        key = self._make_key(name, dimensions)
        with self._lock:
            self._gauges[key] = self._gauges.get(key, 0.0) + value
            self._dimensions_registry[key] = dimensions or {}
            self._buffer_point(name, self._gauges[key], dimensions, "gauge")

    # --- 直方图操作 ---

    def observe(
        self,
        name: str,
        value: float,
        dimensions: Dict[str, str] = None,
        boundaries: List[float] = None,
    ):
        """记录一个直方图指标的观测值。"""
        key = self._make_key(name, dimensions)
        with self._lock:
            if key not in self._histograms:
                self._histograms[key] = HistogramBuckets(boundaries)
            self._histograms[key].observe(value)
            self._dimensions_registry[key] = dimensions or {}
            self._buffer_point(name, value, dimensions, "histogram")

    # --- 业务指标辅助方法 ---

    def record_transaction(
        self,
        operation: str,
        duration_ms: float,
        amount: float,
        status: str,
        peer_agent: str = "",
    ):
        """记录一次完整的交易,包含所有标准指标。"""
        dims = {
            "operation": operation,
            "status": status,
            "peer_agent": peer_agent,
        }

        self.increment("transactions.total", 1.0, dims)
        self.observe("transactions.duration_ms", duration_ms, dims)
        self.observe("transactions.amount", amount, dims)

        if status == "error":
            self.increment("transactions.errors", 1.0, dims)

    def record_revenue(self, amount: float, source: str, currency: str = "credits"):
        """记录来自代理操作的收入。"""
        dims = {"source": source, "currency": currency}
        self.increment("revenue.total", amount, dims)

    def record_cost(self, amount: float, category: str, currency: str = "credits"):
        """记录代理产生的成本。"""
        dims = {"category": category, "currency": currency}
        self.increment("costs.total", amount, dims)

    def record_escrow_event(self, event_type: str, amount: float, escrow_id: str):
        """记录一笔托管生命周期事件。"""
        dims = {"event_type": event_type}
        self.increment(f"escrow.{event_type}", 1.0, dims)
        self.observe("escrow.amount", amount, dims)

    # --- 导出与刷新 ---

    def flush(self):
        """将缓冲的指标发送至 GreenHelix。"""
        with self._lock:
            if not self._buffer:
                return
            batch = self._buffer[:]
            self._buffer.clear()

        if self._client:
            metrics_payload = [
                {
                    "name": point.name,
                    "value": point.value,
                    "dimensions": {
                        "agent": self.agent_id,
                        **point.dimensions,
                    },
                    "timestamp": _ts_to_iso(point.timestamp),
                }
                for point in batch
            ]

            try:
                self._client.execute_tool("submit_metrics", {
                    "agent_id": self.agent_id,
                    "metrics": metrics_payload,
                })
            except Exception as e:
                # 失败时重新缓冲以供重试
                with self._lock:
                    self._buffer = batch + self._buffer
                    # 防止缓冲区无限增长
                    if len(self._buffer) > self._buffer_size * 2:
                        self._buffer = self._buffer[-self._buffer_size:]

        for callback in self._flush_callbacks:
            try:
                callback(batch)
            except Exception:
                pass

    def get_histogram_summary(
        self, name: str, dimensions: Dict[str, str] = None
    ) -> Dict[str, float]:
        """获取直方图指标的汇总统计信息。"""
        key = self._make_key(name, dimensions)
        with self._lock:
            hist = self._histograms.get(key)
            if not hist or hist.count == 0:
                return {}
            return {
                "count": hist.count,
                "sum": hist.sum,
                "min": hist.min,
                "max": hist.max,
                "avg": hist.sum / hist.count,
                "p50": hist.percentile(50),
                "p95": hist.percentile(95),
                "p99": hist.percentile(99),
            }

    def on_flush(self, callback: Callable):
        """注册一个在每次刷新时调用的回调函数。"""
        self._flush_callbacks.append(callback)

    # --- 内部方法 ---

    def _buffer_point(
        self,
        name: str,
        value: float,
        dimensions: Dict[str, str],
        metric_type: str,
    ):
        point = MetricPoint(
            name=name,
            value=value,
            dimensions=dimensions or {},
            timestamp=time.time(),
            metric_type=metric_type,
        )
        self._buffer.append(point)

        if len(self._buffer) >= self._buffer_size:
            threading.Thread(target=self.flush, daemon=True).start()

    def _flush_loop(self):
        while self._running:
            time.sleep(self._flush_interval)
            self.flush()

    @staticmethod
    def _make_key(name: str, dimensions: Dict[str, str] = None) -> str:
        if not dimensions:
            return name
        dim_str = ",".join(f"{k}={v}" for k, v in sorted(dimensions.items()))
        return f"{name}{{{dim_str}}}"

def _ts_to_iso(ts: float) -> str:
    import datetime
    dt = datetime.datetime.fromtimestamp(ts, tz=datetime.timezone.utc)
    return dt.isoformat()

使用 MetricsCollector

client = GreenHelixClient(api_key="your-api-key")
metrics = MetricsCollector(agent_id="agent-marketplace", client=client)
metrics.start()

# 记录一次事务
start = time.time()
result = process_order(order)
duration_ms = (time.time() - start) * 1000

metrics.record_transaction(
    operation="process_order",
    duration_ms=duration_ms,
    amount=order["amount"],
    status="success" if result["ok"] else "error",
    peer_agent=order["seller_agent"],
)

# 记录收入和成本
metrics.record_revenue(order["commission"], source="marketplace_fee")
metrics.record_cost(order["gateway_fee"], category="gateway")

# 记录托管事件
metrics.record_escrow_event("created", order["amount"], result["escrow_id"])

# 获取延迟百分位数
summary = metrics.get_histogram_summary(
    "transactions.duration_ms",
    {"operation": "process_order", "status": "success"},
)
print(f"p50={summary['p50']:.0f}ms  p95={summary['p95']:.0f}ms  p99={summary['p99']:.0f}ms")

# 正常关闭
metrics.stop()

MetricsCollector 将指标暂存在内存中,并按周期(默认 60 秒)或缓冲区满时进行批量发送。这种批处理机制可减少对 GreenHelix 的 API 调用次数,同时确保指标不会丢失。在发送失败时具备重试逻辑,会将数据重新放入缓冲区,等待下一次发送周期,且设有上限以防止内存无限制增长。


第五章:异常检测

原始指标是必要的,但并不足够。一位管理 50 个代理的运营者无法持续监控 200 个仪表板来发现潜在问题。你需要自动化异常检测能力,能够理解正常行为模式,并在出现偏离时发出告警。

代理指标的统计异常检测

针对代理商业场景的异常检测与传统基础设施监控存在两点不同。第一,代理流量模式通常具有突发性和非均匀性:例如一个市场代理在工作时间可能每分钟处理 100 个请求,而在夜间仅处理 5 个。第二,收入等业务指标具有季节性规律(日、周、月),必须在检测中予以考虑。

我们的 AnomalyDetector 采用三种互补的检测方法:Z-score 用于识别突发性峰值,百分比变化用于捕捉趋势偏移,季节性分解用于发现模式异常。

import math
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from enum import Enum

class AnomalyType(Enum):
    SPIKE = "spike"
    DROP = "drop"
    TREND_CHANGE = "trend_change"
    SEASONAL_VIOLATION = "seasonal_violation"

class Severity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class Anomaly:
    """指标序列中检测到的异常。"""
    metric_name: str
    anomaly_type: AnomalyType
    severity: Severity
    current_value: float
    expected_value: float
    deviation: float
    timestamp: float
    dimensions: Dict[str, str] = field(default_factory=dict)
    message: str = ""

class MetricWindow:
    """用于统计分析的滑动窗口,存储指标值。"""

    def __init__(self, window_size: int = 60):
        self._values: deque = deque(maxlen=window_size)
        self._timestamps: deque = deque(maxlen=window_size)

    def add(self, value: float, timestamp: float = None):
        self._values.append(value)
        self._timestamps.append(timestamp or time.time())

    @property
    def count(self) -> int:
        return len(self._values)

    @property
    def mean(self) -> float:
        if not self._values:
            return 0.0
        return sum(self._values) / len(self._values)

    @property
    def std(self) -> float:
        if len(self._values) < 2:
            return 0.0
        m = self.mean
        variance = sum((v - m) ** 2 for v in self._values) / (len(self._values) - 1)
        return math.sqrt(variance)

    @property
    def values(self) -> List[float]:
        return list(self._values)

    @property
    def latest(self) -> Optional[float]:
        return self._values[-1] if self._values else None

    def rate_of_change(self, periods: int = 5) -> Optional[float]:
        """计算最近 N 个周期的变化率。"""
        if len(self._values) < periods + 1:
            return None
        recent = list(self._values)[-periods:]
        older = list(self._values)[-(periods + 1)]
        if older == 0:
            return None
        return (recent[-1] - older) / older

class SeasonalProfile:
    """跟踪指标的季节性模式(按小时分桶)。"""

    def __init__(self, num_buckets: int = 24):
        self._num_buckets = num_buckets
        self._bucket_values: Dict[int, List[float]] = {
            i: [] for i in range(num_buckets)
        }

    def add(self, value: float, timestamp: float):
        import datetime
        dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
        bucket = dt.hour  # 按小时分桶
        self._bucket_values[bucket].append(value)

    def expected_value(self, timestamp: float) -> Optional[Tuple[float, float]]:
        """返回该时间点的预期值(均值和标准差)。"""
        import datetime
        dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
        bucket = dt.hour
        values = self._bucket_values.get(bucket, [])
        if len(values) < 7:  # 至少需要一周数据
            return None
        m = sum(values) / len(values)
        variance = sum((v - m) ** 2 for v in values) / (len(values) - 1)
        return m, math.sqrt(variance)

class AnomalyDetector:
    """检测代理商业指标流中的异常。"""

    def __init__(
        self,
        z_score_threshold: float = 3.0,
        pct_change_threshold: float = 0.30,
        seasonal_z_threshold: float = 2.5,
        min_data_points: int = 20,
    ):
        self._z_threshold = z_score_threshold
        self._pct_threshold = pct_change_threshold
        self._seasonal_z_threshold = seasonal_z_threshold
        self._min_data_points = min_data_points

        self._windows: Dict[str, MetricWindow] = {}
        self._seasonal: Dict[str, SeasonalProfile] = {}
        self._anomaly_callbacks: List = []

    def observe(
        self,
        metric_name: str,
        value: float,
        dimensions: Dict[str, str] = None,
        timestamp: float = None,
    ) -> List[Anomaly]:
        """观察一个指标值并检查是否存在异常。"""
        ts = timestamp or time.time()
        key = self._make_key(metric_name, dimensions)

        if key not in self._windows:
            self._windows[key] = MetricWindow(window_size=120)
            self._seasonal[key] = SeasonalProfile()

        window = self._windows[key]
        seasonal = self._seasonal[key]

        window.add(value, ts)
        seasonal.add(value, ts)

        anomalies = []

        if window.count >= self._min_data_points:
            # Z-score 异常检测
            z_anomaly = self._check_z_score(
                metric_name, value, window, dimensions, ts
            )
            if z_anomaly:
                anomalies.append(z_anomaly)

            # 百分比变化异常检测
            pct_anomaly = self._check_pct_change(
                metric_name, value, window, dimensions, ts
            )
            if pct_anomaly:
                anomalies.append(pct_anomaly)

        # 季节性异常检测
        seasonal_anomaly = self._check_seasonal(
            metric_name, value, seasonal, dimensions, ts
        )
        if seasonal_anomaly:
            anomalies.append(seasonal_anomaly)

        for anomaly in anomalies:
            for callback in self._anomaly_callbacks:
                try:
                    callback(anomaly)
                except Exception:
                    pass

        return anomalies

    def on_anomaly(self, callback):
        """注册异常检测回调函数。"""
        self._anomaly_callbacks.append(callback)

    def _check_z_score(
        self,
        metric_name: str,
        value: float,
        window: MetricWindow,
        dimensions: Dict[str, str],
        timestamp: float,
    ) -> Optional[Anomaly]:
        """使用 Z-score 方法检测异常(与均值的标准差偏离)。"""
        std = window.std
        if std == 0:
            return None

        z_score = abs(value - window.mean) / std
        if z_score < self._z_threshold:
            return None

        anomaly_type = AnomalyType.SPIKE if value > window.mean else AnomalyType.DROP
        severity = self._z_to_severity(z_score)

        return Anomaly(
            metric_name=metric_name,
            anomaly_type=anomaly_type,
            severity=severity,
            current_value=value,
            expected_value=window.mean,
            deviation=z_score,
            timestamp=timestamp,
            dimensions=dimensions or {},
            message=f"{metric_name} 偏离均值 {z_score:.1f} 个标准差 "
                    f"{'高于' if value > window.mean else '低于'} "
                    f"(当前={value:.2f}, 均值={window.mean:.2f}, 标准差={std:.2f})",
        )

    def _check_pct_change(
        self,
        metric_name: str,
        value: float,
        window: MetricWindow,
        dimensions: Dict[str, str],
        timestamp: float,
    ) -> Optional[Anomaly]:
        """使用近期百分比变化检测异常。"""
        roc = window.rate_of_change(periods=5)
        if roc is None or abs(roc) < self._pct_threshold:
            return None

        anomaly_type = AnomalyType.TREND_CHANGE
        severity = Severity.MEDIUM if abs(roc) < 0.5 else Severity.HIGH

        return Anomaly(
            metric_name=metric_name,
            anomaly_type=anomaly_type,
            severity=severity,
            current_value=value,
            expected_value=window.mean,
            deviation=roc,
            timestamp=timestamp,
            dimensions=dimensions or {},
            message=f"{metric_name} 在最近 5 个周期内变化了 {roc*100:.1f}% "
                    f"(当前={value:.2f})",
        )

    def _check_seasonal(
        self,
        metric_name: str,
        value: float,
        seasonal: SeasonalProfile,
        dimensions: Dict[str, str],
        timestamp: float,
    ) -> Optional[Anomaly]:
        """相对于季节性模式检测异常。"""
        expected = seasonal.expected_value(timestamp)
        if expected is None:
            return None

        mean, std = expected
        if std == 0:
            return None

        z_score = abs(value - mean) / std
        if z_score < self._seasonal_z_threshold:
            return None

        return Anomaly(
            metric_name=metric_name,
            anomaly_type=AnomalyType.SEASONAL_VIOLATION,
            severity=self._z_to_severity(z_score),
            current_value=value,
            expected_value=mean,
            deviation=z_score,
            timestamp=timestamp,
            dimensions=dimensions or {},
            message=f"{metric_name} 偏离季节性预期 {z_score:.1f} 个标准差 "
                    f"(当前={value:.2f}, 预期={mean:.2f} ± {std:.2f})",
        )

    @staticmethod
    def _z_to_severity(z_score: float) -> Severity:
        if z_score >= 5.0:
            return Severity.CRITICAL
        elif z_score >= 4.0:
            return Severity.HIGH
        elif z_score >= 3.0:
            return Severity.MEDIUM
        return Severity.LOW

    @staticmethod
    def _make_key(name: str, dimensions: Dict[str, str] = None) -> str:
        if not dimensions:
            return name
        dim_str = ",".join(f"{k}={v}" for k, v in sorted(dimensions.items()))
        return f"{name}{{{dim_str}}}"

将异常检测集成到指标收集中

AnomalyDetectorMetricsCollector 协同工作。将每个指标观测值同时传递给两者:

detector = AnomalyDetector(
    z_score_threshold=3.0,
    pct_change_threshold=0.30,
    min_data_points=20,
)
metrics = MetricsCollector(agent_id="agent-marketplace", client=client)

def record_and_detect(
    metric_name: str,
    value: float,
    dimensions: Dict[str, str] = None,
):
    """记录指标并一次性检查异常。"""
    metrics.observe(metric_name, value, dimensions)
    anomalies = detector.observe(metric_name, value, dimensions)

    for anomaly in anomalies:
        print(f"ANOMALY: {anomaly.message}")
        # 将异常信息作为指标回传,用于元监控
        metrics.increment(
            "anomalies.detected",
            1.0,
            {
                "metric": anomaly.metric_name,
                "type": anomaly.anomaly_type.value,
                "severity": anomaly.severity.value,
            },
        )

    return anomalies

# 示例:检测延迟突增
record_and_detect(
    "transactions.duration_ms",
    12500.0,  # 12.5 秒 —— 很可能属于异常
    {"operation": "process_order"},
)

检测器采用滑动窗口机制,能够自适应变化的基线。如果代理的延迟在一周内从 200ms 逐步上升至 300ms,检测器会自动调整基线,不会产生误报。但如果延迟在 1 分钟内从 300ms 突增至 3000ms,Z-score 会立即捕捉到这一异常。

季节性分析模型可避免在已知流量模式下产生误报。例如,若代理在夜间处理的交易量较少,从每秒 100 次降至每秒 10 次在午夜是正常现象——季节性模型知道该时段本就如此,不会将其判定为异常。


第六章:仪表盘设计

异常检测能自动发现问题,但运维人员仍需仪表盘来实现态势感知、容量规划和利益相关方报告。本章介绍代理商业运营所需的三个核心仪表盘,以及填充它们所需的数据源和查询。

仪表盘 1:集群健康概览

集群健康仪表盘是你怀疑系统出问题时最先查看的内容。它回答的问题是:“当前所有组件是否都处于健康状态?”

关键面板:

  1. 代理状态网格 —— 一个网格视图,展示每个代理的健康状态(绿色/黄色/红色)。数据来源:各代理的最后心跳时间戳和错误率指标。当错误率低于 1% 且最近心跳在 60 秒内时为绿色;错误率在 1%-5% 之间或心跳超时 60-120 秒为黄色;其余情况为红色。
  1. 集群错误率 —— 一条时间序列图表,展示过去 24 小时内所有代理的聚合错误率,并叠加异常检测阈值。数据来源:计数器指标 transactions.errors / transactions.total,按分钟聚合。
  1. 活跃托管数 —— 一个仪表盘显示当前未结算的托管数量,并附带趋势线。趋势上升表明存在结算失败或处理延迟问题。数据来源:仪表盘指标 escrow.active_count
  1. 延迟热力图 —— 一张热力图,展示过去 6 小时内每个代理的 p95 延迟。列代表时间区间(15 分钟),行代表代理,颜色深浅表示延迟高低。这有助于快速识别整体性问题(某列过热)或单个代理问题(某行过热)。数据来源:直方图指标 transactions.duration_ms,p95 聚合。
  1. 告警摘要 —— 一个表格,按严重程度分组列出当前活跃的告警,并提供确认/解决按钮。数据来源:AlertManager 的活跃告警状态(详见第七章)。
def build_fleet_health_query(agent_ids: List[str]) -> Dict[str, Any]:
    """构建集群健康仪表盘的查询配置。"""
    return {
        "panels": [
            {
                "title": "代理状态网格",
                "type": "status_grid",
                "query": {
                    "metrics": ["agent.heartbeat.age_seconds", "transactions.errors"],
                    "group_by": ["agent"],
                    "thresholds": {
                        "green": {"error_rate": 0.01, "heartbeat_age": 60},
                        "yellow": {"error_rate": 0.05, "heartbeat_age": 120},
                        "red": {"error_rate": 1.0, "heartbeat_age": float("inf")},
                    },
                },
            },
            {
                "title": "集群错误率",
                "type": "time_series",
                "query": {
                    "metric": "transactions.errors",
                    "aggregation": "rate",
                    "interval": "1m",
                    "time_range": "24h",
                },
            },
            {
                "title": "活跃托管数",
                "type": "gauge_with_trend",
                "query": {
                    "metric": "escrow.active_count",
                    "aggregation": "latest",
                    "trend_window": "6h",
                },
            },
            {
                "title": "延迟热力图",
                "type": "heatmap",
                "query": {
                    "metric": "transactions.duration_ms",
                    "aggregation": "p95",
                    "group_by": ["agent"],
                    "interval": "15m",
                    "time_range": "6h",
                },
            },
        ],
    }

仪表盘 2:交易流程

交易流程仪表盘展示交易在代理集群中的流转过程。它回答的问题是:“我们的交易正在经历什么?”

关键面板:

  1. 交易漏斗 —— 一个漏斗图,展示交易在各个阶段的数量:发起、匹配、托管、履行、结算、完成。各阶段的流失情况揭示了交易失败的位置。数据来源:各交易阶段的计数器指标。
  1. Agent-to-Agent 流量 — 一个桑基图或弦图,展示代理之间通信的流量体积。粗线表示高流量关系。点击某条流量线可钻取查看该特定代理对的延迟和错误指标。数据源:带有 peer_agent 维度的计数器指标 transactions.total
  1. 托管资金生命周期 — 一个状态图,展示托管资金在各个状态(待处理、已充值、已履行、已结算、争议中、已过期)中的分布及状态转换速率。数据源:每个托管状态转换对应的计数器指标。
  1. 慢速交易 — 显示过去一小时内最慢的 20 笔交易的表格,包含可链接至追踪查看器的 trace ID。为运维人员提供一键访问最差性能交易的详细追踪分析能力。数据源:按 operation 维度分组的直方图指标 transactions.duration_ms,执行 Top-N 查询。
  1. 交易总量 — 一个堆叠面积图,展示过去 24 小时内各操作类型下的交易总量。数据源:按 operation 维度分组的计数器指标 transactions.total

仪表板 3:收入与成本

该仪表板面向业务相关方和容量规划人员,回答核心问题:“我们是否盈利?利润流向何处?”

关键面板:

  1. 净收入 — 一个大数字显示当日净收入(收入减去成本),并对比去年同期同日数据。数据源:revenue.total 减去 costs.total 的计数器指标。
  1. 各代理收入贡献 — 一个柱状图,展示各代理的收入贡献。用于识别最盈利和最不盈利的代理。数据源:按 agent 维度分组的计数器指标 revenue.total
  1. 成本构成 — 一个饼图,展示各类成本占比:网关费用、托管费用、基础设施开销、第三方 API 调用费用。数据源:按 category 维度分组的计数器指标 costs.total
  1. 收入趋势 — 一条时间序列图,对比过去 30 天的日收入,并叠加异常检测带。当收入超出预期范围时触发告警。数据源:每日聚合的计数器指标 revenue.total
  1. 单笔交易经济性 — 一张表格,展示各操作类型的平均收入、成本和利润率。用于识别亏损的操作类型。数据源:基于 revenue.totalcosts.total 指标除以 transactions.total,按 operation 分组计算得出。

每个仪表板应根据其用途设置合适的自动刷新频率:集群健康状态每 30 秒刷新一次,交易流每分钟刷新一次,收入与成本每 5 分钟刷新一次。所有底层数据均来自通过 submit_metrics 推送至 GreenHelix 的 MetricsCollector 指标,并用于可视化查询。


第七章:AlertManager 类

仪表板需要有人持续监控。告警机制将问题及时推送给相关人员。AlertManager 会评估告警规则,对相关告警进行去重与分组,通过合适渠道路由,并在未被确认时进行升级处理。

告警规则类型

三种告警规则覆盖了代理商业务中关键的场景:

阈值告警 在指标超过静态或动态边界时触发。例如:“错误率超过 5%。” 这类告警逻辑简单且可靠,适用于已知的明确边界。

变化率告警 在指标变化速度过快时触发。例如:“交易量在 10 分钟内下降超过 30%。” 此类告警能捕捉阈值告警无法发现的问题,因为绝对值可能仍处于“正常”范围。

基于异常的告警AnomalyDetector 识别出统计异常时触发。这类告警能够自适应基线变化和季节性模式,可发现需频繁调整阈值才能捕捉的细微问题。

AlertManager 实现

import time
import uuid
import threading
import json
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
from collections import defaultdict

class AlertState(Enum):
    FIRING = "firing"
    ACKNOWLEDGED = "acknowledged"
    RESOLVED = "resolved"

class AlertChannel(Enum):
    WEBHOOK = "webhook"
    EMAIL = "email"
    SLACK = "slack"
    PAGERDUTY = "pagerduty"

@dataclass
class AlertRule:
    """告警规则定义。"""
    name: str
    metric_name: str
    condition: str  # "threshold", "rate_of_change", "anomaly"
    threshold: Optional[float] = None
    comparison: str = "gt"  # "gt", "lt", "gte", "lte"
    window_minutes: int = 5
    severity: str = "medium"
    channels: List[AlertChannel] = field(default_factory=list)
    dimensions_filter: Dict[str, str] = field(default_factory=dict)
    cooldown_minutes: int = 15
    escalation_minutes: int = 30
    description: str = ""
    runbook_url: str = ""

@dataclass
class Alert:
    """一个活跃的告警实例。"""
    id: str
    rule: AlertRule
    state: AlertState
    fired_at: float
    last_value: float
    message: str
    acknowledged_at: Optional[float] = None
    acknowledged_by: Optional[str] = None
    resolved_at: Optional[float] = None
    escalated: bool = False
    notification_count: int = 0
    dimensions: Dict[str, str] = field(default_factory=dict)

class AlertManager:
    """管理告警规则、路由、去重和升级。"""

    def __init__(
        self,
        agent_id: str,
        client=None,
        anomaly_detector: "AnomalyDetector" = None,
    ):
        self.agent_id = agent_id
        self._client = client
        self._detector = anomaly_detector

        self._rules: Dict[str, AlertRule] = {}
        self._active_alerts: Dict[str, Alert] = {}
        self._alert_history: List[Alert] = []
        self._cooldowns: Dict[str, float] = {}
        self._lock = threading.Lock()

        # 通道处理器
        self._channel_handlers: Dict[AlertChannel, Callable] = {}

        # 升级线程
        self._running = False
        self._escalation_thread: Optional[threading.Thread] = None

    def start(self):
        """启动升级监控线程。"""
        self._running = True
        self._escalation_thread = threading.Thread(
            target=self._escalation_loop, daemon=True
        )
        self._escalation_thread.start()

    def stop(self):
        self._running = False
        if self._escalation_thread:
            self._escalation_thread.join(timeout=5.0)

    # --- 规则管理 ---

    def add_rule(self, rule: AlertRule):
        """添加一个告警规则。"""
        self._rules[rule.name] = rule

    def remove_rule(self, name: str):
        """移除一个告警规则。"""
        self._rules.pop(name, None)

    # --- 通道配置 ---

    def register_channel(self, channel: AlertChannel, handler: Callable):
        """为指定通道注册通知处理器。"""
        self._channel_handlers[channel] = handler

    def configure_webhook(self, url: str):
        """使用 URL 配置 Webhook 通道。"""
        import urllib.request

        def webhook_handler(alert: Alert, message: str):
            payload = json.dumps({
                "alert_id": alert.id,
                "rule": alert.rule.name,
                "severity": alert.rule.severity,
                "state": alert.state.value,
                "message": message,
                "value": alert.last_value,
                "fired_at": alert.fired_at,
                "agent_id": self.agent_id,
                "dimensions": alert.dimensions,
            }).encode()

            req = urllib.request.Request(
                url,
                data=payload,
                headers={"Content-Type": "application/json"},
                method="POST",
            )
            try:
                urllib.request.urlopen(req, timeout=10)
            except Exception:
                pass

        self._channel_handlers[AlertChannel.WEBHOOK] = webhook_handler

    def configure_slack(self, webhook_url: str, default_channel: str = "#alerts"):
        """配置 Slack 通道。"""
        import urllib.request

        def slack_handler(alert: Alert, message: str):
            severity_emoji = {
                "critical": ":red_circle:",
                "high": ":large_orange_circle:",
                "medium": ":large_yellow_circle:",
                "low": ":white_circle:",
            }
            emoji = severity_emoji.get(alert.rule.severity, ":grey_question:")

            payload = json.dumps({
                "channel": default_channel,
                "text": f"{emoji} *{alert.rule.name}*\n{message}",
                "attachments": [
                    {
                        "color": {
                            "critical": "#ff0000",
                            "high": "#ff8800",
                            "medium": "#ffcc00",
                            "low": "#cccccc",
                        }.get(alert.rule.severity, "#cccccc"),
                        "fields": [
                            {"title": "Agent", "value": self.agent_id, "short": True},
                            {"title": "Value", "value": str(alert.last_value), "short": True},
                            {"title": "Severity", "value": alert.rule.severity, "short": True},
                            {"title": "Alert ID", "value": alert.id, "short": True},
                        ],
                    }
                ],
            }).encode()

            req = urllib.request.Request(
                webhook_url,
                data=payload,
                headers={"Content-Type": "application/json"},
                method="POST",
            )
            try:
                urllib.request.urlopen(req, timeout=10)
            except Exception:
                pass

        self._channel_handlers[AlertChannel.SLACK] = slack_handler

    # --- 告警评估 ---

    def evaluate(
        self,
        metric_name: str,
        value: float,
        dimensions: Dict[str, str] = None,
    ) -> List[Alert]:
        """根据指标值评估所有规则。"""
        new_alerts = []
        dims = dimensions or {}

        for rule in self._rules.values():
            if rule.metric_name != metric_name:
                continue

            # 检查维度过滤器
            if rule.dimensions_filter:
                if not all(
                    dims.get(k) == v for k, v in rule.dimensions_filter.items()
                ):
                    continue

            # 检查冷却期
            cooldown_key = f"{rule.name}:{self._dim_key(dims)}"
            if cooldown_key in self._cooldowns:
                if time.time() - self._cooldowns[cooldown_key] < rule.cooldown_minutes * 60:
                    continue

            triggered = False

            if rule.condition == "threshold":
                triggered = self._check_threshold(rule, value)
            elif rule.condition == "rate_of_change":
                triggered = self._check_rate_of_change(rule, value, metric_name, dims)
            elif rule.condition == "anomaly" and self._detector:
                anomalies = self._detector.observe(metric_name, value, dims)
                triggered = len(anomalies) > 0

            if triggered:
                alert = self._fire_alert(rule, value, dims)
                if alert:
                    new_alerts.append(alert)
                    self._cooldowns[cooldown_key] = time.time()

        # 检查自动解除
        self._check_resolutions(metric_name, value, dims)

        return new_alerts

    def acknowledge(self, alert_id: str, acknowledged_by: str = "operator"):
        """确认一个活跃的告警。"""
        with self._lock:
            alert = self._active_alerts.get(alert_id)
            if alert and alert.state == AlertState.FIRING:
                alert.state = AlertState.ACKNOWLEDGED
                alert.acknowledged_at = time.time()
                alert.acknowledged_by = acknowledged_by

    def resolve(self, alert_id: str):
        """手动解除告警。"""
        with self._lock:
            alert = self._active_alerts.get(alert_id)
            if alert:
                alert.state = AlertState.RESOLVED
                alert.resolved_at = time.time()
                self._alert_history.append(alert)
                del self._active_alerts[alert_id]
                self._notify(alert, f"RESOLVED: {alert.rule.name}")

    def get_active_alerts(
        self, severity: str = None
    ) -> List[Alert]:
        """获取所有活跃(触发或已确认)的告警。"""
        with self._lock:
            alerts = list(self._active_alerts.values())
            if severity:
                alerts = [a for a in alerts if a.rule.severity == severity]
            return sorted(alerts, key=lambda a: a.fired_at, reverse=True)

    # --- 内部方法 ---

    def _check_threshold(self, rule: AlertRule, value: float) -> bool:
        if rule.threshold is None:
            return False
        ops = {
            "gt": lambda v, t: v > t,
            "lt": lambda v, t: v < t,
            "gte": lambda v, t: v >= t,
            "lte": lambda v, t: v <= t,
        }
        op = ops.get(rule.comparison, ops["gt"])
        return op(value, rule.threshold)

    def _check_rate_of_change(
        self,
        rule: AlertRule,
        value: float,
        metric_name: str,
        dimensions: Dict[str, str],
    ) -> bool:
        if not self._detector:
            return False
        key = self._detector._make_key(metric_name, dimensions)
        window = self._detector._windows.get(key)
        if not window:
            return False
        roc = window.rate_of_change(periods=5)
        if roc is None or rule.threshold is None:
            return False
        return abs(roc) > rule.threshold

    def _fire_alert(
        self,
        rule: AlertRule,
        value: float,
        dimensions: Dict[str, str],
    ) -> Optional[Alert]:
        # 去重:检查相同规则+维度是否已触发
        dedup_key = f"{rule.name}:{self._dim_key(dimensions)}"
        with self._lock:
            for alert in self._active_alerts.values():
                existing_key = f"{alert.rule.name}:{self._dim_key(alert.dimensions)}"
                if existing_key == dedup_key:
                    # 更新现有告警而非创建新告警
                    alert.last_value = value
                    alert.notification_count += 1
                    return None

            alert = Alert(
                id=f"alert_{uuid.uuid4().hex[:12]}",
                rule=rule,
                state=AlertState.FIRING,
                fired_at=time.time(),
                last_value=value,
                message=f"{rule.name}: {rule.metric_name} = {value} "
                        f"(threshold: {rule.comparison} {rule.threshold})",
                dimensions=dimensions,
            )
            self._active_alerts[alert.id] = alert

        self._notify(alert, alert.message)
        return alert

    def _notify(self, alert: Alert, message: str):
        for channel in alert.rule.channels:
            handler = self._channel_handlers.get(channel)
            if handler:
                try:
                    handler(alert, message)
                except Exception:
                    pass

        # 同时通过 GreenHelix 注册事件驱动告警
        if self._client and alert.state == AlertState.FIRING:
            try:
                self._client.execute_tool("submit_metrics", {
                    "agent_id": self.agent_id,
                    "metrics": [{
                        "name": "alerts.fired",
                        "value": 1,
                        "dimensions": {
                            "rule": alert.rule.name,
                            "severity": alert.rule.severity,
                            "agent": self.agent_id,
                        },
                    }],
                })
            except Exception:
                pass

    def _check_resolutions(
        self,
        metric_name: str,
        value: float,
        dimensions: Dict[str, str],
    ):
        """当条件清除时自动解除告警。"""
        with self._lock:
            to_resolve = []
            for alert_id, alert in self._active_alerts.items():
                if alert.rule.metric_name != metric_name:
                    continue
                if alert.rule.condition == "threshold":
                    if not self._check_threshold(alert.rule, value):
                        to_resolve.append(alert_id)

            for alert_id in to_resolve:
                alert = self._active_alerts[alert_id]
                alert.state = AlertState.RESOLVED
                alert.resolved_at = time.time()
                self._alert_history.append(alert)
                del self._active_alerts[alert_id]
                self._notify(alert, f"RESOLVED: {alert.rule.name}")

    def _escalation_loop(self):
        """定期检查需要升级的告警。"""
        while self._running:
            time.sleep(60)
            now = time.time()
            with self._lock:
                for alert in self._active_alerts.values():
                    if alert.state != AlertState.FIRING:
                        continue
                    if alert.escalated:
                        continue
                    age_minutes = (now - alert.fired_at) / 60
                    if age_minutes >= alert.rule.escalation_minutes:
                        alert.escalated = True
                        self._notify(
                            alert,
                            f"ESCALATION: {alert.rule.name} 已持续触发 "
                            f"{age_minutes:.0f} 分钟且未被确认",
                        )

    @staticmethod
    def _dim_key(dimensions: Dict[str, str]) -> str:
        if not dimensions:
            return ""
        return ",".join(f"{k}={v}" for k, v in sorted(dimensions.items()))

配置告警规则

# Agent 观测性栈:多智能体系统的分布式追踪、指标监控与告警

## 第八章:SLA 监控与成本归因

可观测性栈的最后一个重要组成部分是将技术指标与业务承诺关联起来。SLA 监控用于跟踪智能体是否满足其公开的服务水平协议,成本归因则用于追踪资金消耗情况,并评估每个智能体的盈利能力。

### SLA 定义与监控

针对智能体电商系统,典型的 SLA 包括以下指标:

- **可用性**:智能体在接收和处理请求方面的正常运行时间占比(目标:99.9%)
- **延迟**:标准操作的 p95 或 p99 响应时间(目标:p95 低于 500ms)
- **错误率**:失败交易占总交易的比例(目标:低于 0.1%)
- **结算时长**:从创建托管账户到完成结算的时间(目标:小于 24 小时)

---

## 实际部署配置示例

以下是针对智能体电商系统的实用配置:

alert_manager = AlertManager(

agent_id="agent-marketplace",

client=client,

anomaly_detector=detector,

)

配置通知渠道

alert_manager.configure_webhook("https://alerts.myfleet.com/webhook")

alert_manager.configure_slack(

webhook_url="https://hooks.slack.com/services/T00/B00/xxx",

default_channel="#agent-alerts",

)

规则 1:高错误率

alert_manager.add_rule(AlertRule(

name="high_error_rate",

metric_name="transactions.errors",

condition="threshold",

threshold=0.05,

comparison="gt",

severity="high",

channels=[AlertChannel.SLACK, AlertChannel.WEBHOOK],

cooldown_minutes=15,

escalation_minutes=30,

description="交易错误率超过 5%",

runbook_url="https://wiki.myfleet.com/runbooks/high-error-rate",

))

规则 2:延迟突增

alert_manager.add_rule(AlertRule(

name="latency_spike",

metric_name="transactions.duration_ms",

condition="threshold",

threshold=5000,

comparison="gt",

severity="medium",

channels=[AlertChannel.SLACK],

cooldown_minutes=10,

description="交易延迟超过 5 秒",

))

规则 3:收入下降

alert_manager.add_rule(AlertRule(

name="revenue_drop",

metric_name="revenue.total",

condition="rate_of_change",

threshold=0.30, # 30% 下降

severity="critical",

channels=[AlertChannel.SLACK, AlertChannel.PAGERDUTY, AlertChannel.WEBHOOK],

cooldown_minutes=60,

escalation_minutes=15,

description="连续 5 个周期内收入下降超过 30%",

))

规则 4:基于异常检测(使用 AnomalyDetector)

alert_manager.add_rule(AlertRule(

name="escrow_anomaly",

metric_name="escrow.created",

condition="anomaly",

severity="medium",

channels=[AlertChannel.SLACK],

cooldown_minutes=30,

description="检测到异常的托管账户创建模式",

))

alert_manager.start()

实时评估指标

alert_manager.evaluate("transactions.errors", 0.08, {"operation": "process_order"})

alert_manager.evaluate("transactions.duration_ms", 7500.0, {"operation": "process_order"})

去重逻辑可防止告警风暴:当同一规则在相同维度下再次触发时,系统会更新已有告警而非生成新通知。冷却期机制避免告警在恢复后过快重新触发。升级机制确保未被确认的告警在设定超时后自动升级至其他通知渠道。

import time

from dataclasses import dataclass, field

from typing import Dict, List, Optional, Any

@dataclass

class SLADefinition:

"""SLA 定义,包含目标值和度量参数。"""

name: str

metric_name: str

target_value: float

comparison: str # "lte"(延迟)、"gte"(可用性)、"lte"(错误率)

measurement_window: str # "1h"、"24h"、"7d"、"30d"

description: str = ""

@dataclass

class SLAStatus:

"""当前 SLA 的状态。"""

definition: SLADefinition

current_value: float

compliant: bool

compliance_percentage: float # 满足 SLA 的度量窗口占比(%)

last_violation: Optional[float] = None

violation_count: int = 0

class SLAMonitor:

"""用于监控代理商业操作的 SLA 合规性。"""

def __init__(

self,

agent_id: str,

metrics_collector: "MetricsCollector",

client=None,

):

self.agent_id = agent_id

self._metrics = metrics_collector

self._client = client

self._sla_definitions: Dict[str, SLADefinition] = {}

self._compliance_history: Dict[str, List[bool]] = defaultdict(list)

self._violation_callbacks: List = []

def define_sla(self, sla: SLADefinition):

"""注册一个 SLA 定义。"""

self._sla_definitions[sla.name] = sla

def on_violation(self, callback):

"""为 SLA 违规事件注册回调函数。"""

self._violation_callbacks.append(callback)

def check_sla(

self,

sla_name: str,

current_value: float,

) -> SLAStatus:

"""检查指标值是否满足 SLA 目标。"""

sla = self._sla_definitions.get(sla_name)

if not sla:

raise ValueError(f"未知的 SLA:{sla_name}")

ops = {

"lte": lambda v, t: v <= t,

"gte": lambda v, t: v >= t,

"lt": lambda v, t: v < t,

"gt": lambda v, t: v > t,

}

op = ops.get(sla.comparison, ops["lte"])

compliant = op(current_value, sla.target_value)

# 跟踪合规历史

self._compliance_history[sla_name].append(compliant)

# 计算合规百分比(最近 100 次检查)

history = self._compliance_history[sla_name][-100:]

compliance_pct = sum(1 for c in history if c) / len(history) * 100

status = SLAStatus(

definition=sla,

current_value=current_value,

compliant=compliant,

compliance_percentage=compliance_pct,

violation_count=sum(1 for c in history if not c),

)

if not compliant:

status.last_violation = time.time()

# 记录违规指标

self._metrics.increment(

"sla.violations",

1.0,

{"sla": sla_name, "agent": self.agent_id},

)

for callback in self._violation_callbacks:

try:

callback(status)

except Exception:

pass

# 始终记录合规百分比

self._metrics.gauge_set(

"sla.compliance_pct",

compliance_pct,

{"sla": sla_name, "agent": self.agent_id},

)

return status

def get_all_sla_status(self) -> Dict[str, SLAStatus]:

"""获取所有 SLA 的最新状态。"""

results = {}

for name in self._sla_definitions:

history = self._compliance_history.get(name, [])

if not history:

continue

last_100 = history[-100:]

compliance_pct = sum(1 for c in last_100 if c) / len(last_100) * 100

results[name] = SLAStatus(

definition=self._sla_definitions[name],

current_value=0.0, # 需要获取上次观测值

compliant=last_100[-1] if last_100 else True,

compliance_percentage=compliance_pct,

violation_count=sum(1 for c in last_100 if not c),

)

return results

### 代理级成本归因

在多代理集群中,你需要了解哪些代理是盈利的,哪些正在产生高额成本。成本归因机制将每一笔支出追溯到实际产生该支出的代理。

@dataclass

class CostEntry:

"""单个由代理产生的成本条目。"""

agent_id: str

category: str

amount: float

currency: str

timestamp: float

transaction_id: str = ""

description: str = ""

class CostAttributor:

"""跟踪并分配代理集群中的各项成本。"""

def __init__(self, metrics_collector: "MetricsCollector"):

self._metrics = metrics_collector

self._costs: Dict[str, List[CostEntry]] = defaultdict(list)

self._budgets: Dict[str, float] = {}

self._budget_callbacks: List = []

def record_cost(

self,

agent_id: str,

category: str,

amount: float,

currency: str = "credits",

transaction_id: str = "",

description: str = "",

):

"""记录归属于某个代理的成本。"""

entry = CostEntry(

agent_id=agent_id,

category=category,

amount=amount,

currency=currency,

timestamp=time.time(),

transaction_id=transaction_id,

description=description,

)

self._costs[agent_id].append(entry)

self._metrics.record_cost(amount, category, currency)

self._metrics.increment(

"cost_attribution.total",

amount,

{"agent": agent_id, "category": category},

)

# 检查预算

if agent_id in self._budgets:

total = self.get_agent_total(agent_id)

budget = self._budgets[agent_id]

utilization = total / budget if budget > 0 else 0

self._metrics.gauge_set(

"budget.utilization_pct",

utilization * 100,

{"agent": agent_id},

)

if utilization >= 0.9:

for callback in self._budget_callbacks:

try:

callback(agent_id, total, budget, utilization)

except Exception:

pass

def set_budget(self, agent_id: str, budget: float):

"""为某个代理设置支出预算。"""

self._budgets[agent_id] = budget

def on_budget_warning(self, callback):

"""注册预算阈值警告的回调函数。"""

self._budget_callbacks.append(callback)

def get_agent_total(

self,

agent_id: str,

since: float = None,

) -> float:

"""获取自指定时间以来某个代理的总成本。"""

costs = self._costs.get(agent_id, [])

if since:

costs = [c for c in costs if c.timestamp >= since]

return sum(c.amount for c in costs)

def get_agent_breakdown(

self,

agent_id: str,

since: float = None,

) -> Dict[str, float]:

"""获取某个代理按类别划分的成本明细。"""

costs = self._costs.get(agent_id, [])

if since:

costs = [c for c in costs if c.timestamp >= since]

breakdown = defaultdict(float)

for cost in costs:

breakdown[cost.category] += cost.amount

return dict(breakdown)

def generate_monthly_report(

self,

month_start: float,

month_end: float,

) -> Dict[str, Any]:

"""生成月度成本归属报告。"""

report = {

"period_start": month_start,

"period_end": month_end,

"agents": {},

"total_cost": 0.0,

"top_categories": defaultdict(float),

}

for agent_id, costs in self._costs.items():

month_costs = [

c for c in costs

if month_start <= c.timestamp <= month_end

]

if not month_costs:

continue

agent_total = sum(c.amount for c in month_costs)

breakdown = defaultdict(float)

for cost in month_costs:

breakdown[cost.category] += cost.amount

report["top_categories"][cost.category] += cost.amount

budget = self._budgets.get(agent_id, 0)

report["agents"][agent_id] = {

"total_cost": agent_total,

"budget": budget,

"utilization_pct": (agent_total / budget * 100) if budget else 0,

"transaction_count": len(month_costs),

"breakdown": dict(breakdown),

}

report["total_cost"] += agent_total

report["top_categories"] = dict(

sorted(

report["top_categories"].items(),

key=lambda x: x[1],

reverse=True,

)

)

return report

### 将所有组件整合在一起

以下是生产环境中多代理部署中所有可观测性组件的集成方式:

markdown
# Agent 可观测性栈:多智能体系统的分布式追踪、指标监控与告警

版本:1.3.1
分块:17/17

from greenhelix import GreenHelixClient

初始化核心组件

client = GreenHelixClient(api_key="your-api-key")

tracer = AgentTracer(agent_id="agent-marketplace", client=client)

metrics = MetricsCollector(agent_id="agent-marketplace", client=client)

detector = AnomalyDetector()

alert_manager = AlertManager(

agent_id="agent-marketplace", client=client, anomaly_detector=detector

)

sla_monitor = SLAMonitor(

agent_id="agent-marketplace", metrics_collector=metrics, client=client

)

cost_tracker = CostAttributor(metrics_collector=metrics)

定义 SLA

sla_monitor.define_sla(SLADefinition(

name="availability",

metric_name="agent.availability_pct",

target_value=99.9,

comparison="gte",

measurement_window="24h",

))

sla_monitor.define_sla(SLADefinition(

name="latency_p95",

metric_name="transactions.duration_ms",

target_value=500,

comparison="lte",

measurement_window="1h",

))

设置预算

cost_tracker.set_budget("agent-marketplace", budget=1000.0)

cost_tracker.set_budget("agent-data-provider", budget=500.0)

将 SLA 违规事件关联到告警

sla_monitor.on_violation(lambda status: alert_manager.evaluate(

status.definition.metric_name,

status.current_value,

{"sla": status.definition.name},

))

将预算预警关联到告警

cost_tracker.on_budget_warning(

lambda agent, total, budget, util: alert_manager.evaluate(

"budget.utilization_pct",

util * 100,

{"agent": agent},

)

)

启动后台线程

metrics.start()

alert_manager.start()

现在每个事务都会自动完成追踪、度量、检查和告警:

def handle_transaction(request):

with tracer.trace_agent_call(

request["target"], request["operation"]

) as span:

start = time.time()

result = process(request)

duration_ms = (time.time() - start) * 1000

# 指标记录

metrics.record_transaction(

operation=request["operation"],

duration_ms=duration_ms,

amount=request.get("amount", 0),

status="success" if result["ok"] else "error",

peer_agent=request["target"],

)

# 异常检测 + 告警

alert_manager.evaluate("transactions.duration_ms", duration_ms)

alert_manager.evaluate(

"transactions.errors",

0 if result["ok"] else 1,

)

# SLA 检查

sla_monitor.check_sla("latency_p95", duration_ms)

# 成本归因

cost_tracker.record_cost(

agent_id="agent-marketplace",

category="gateway_fee",

amount=result.get("fee", 0),

transaction_id=result.get("transaction_id", ""),

)

return result

此集成意味着,通过你智能体的每个事务都会自动生成用于调试的追踪数据、用于仪表盘的指标、用于自动发现问题的异常检测、用于通知运维人员的告警、用于合规报告的 SLA 跟踪,以及用于业务分析的成本归因。操作开销极低——每笔事务仅增加几毫秒的内存缓冲时间——但带来的可见性提升是根本性的。

---

## 下一步

本指南从底层开始构建可观测性栈:分布式追踪、指标采集、异常检测、仪表盘、告警机制和 SLA 监控。各组件可独立运行,但整合后价值最大化。

投入生产时,建议从 `AgentTracer` 和 `MetricsCollector` 开始。这两个组件能以最小成本提供即时可见性。待积累数天基线数据后,再引入 `AnomalyDetector`。使用保守阈值配置 `AlertManager`,随着对系统正常行为的理解加深,逐步收紧阈值。当拥有付费客户并需要合规报告时,再部署 `SLAMonitor` 和 `CostAttributor`。

本系列后续指南将涵盖智能体安全加固(防范对抗性智能体)、智能体市场经济学(定价策略与佣金结构)以及智能体扩展模式(应对百倍流量增长而无需重写架构)。这些内容均建立在本节所奠定的可观测性基础之上,因为无法看见的事物,就无法保障安全、合理定价或实现扩展。
M
@mirni

已收录 3 个 Skill

相关推荐