Vitest Testing
提供 Vitest 单元测试与集成测试的模式与最佳实践,涵盖断言、异步测试与模拟方法。
构建多智能体系统的分布式追踪、指标监控与告警体系,基于OpenTelemetry和GreenHelix平台。
openclaw skills install @mirni/greenhelix-agent-observability-stack命令、参数、文件名以原文为准
注意:本指南为教学用途,包含示例代码。
不执行任何代码,无需凭证或依赖安装。
所有示例均使用 GreenHelix 沙箱(https://sandbox.greenhelix.net),提供 500 个免费积分——无需 API 密钥即可开始使用。
当 Agent A 调用 Agent B,Agent B 再调用 Agent C,而事务耗时从 200ms 增至 12 秒时,瓶颈出现在哪里?当您的智能体集群每日处理 10,000 笔交易,收入下降 15% 时,是哪个智能体表现不佳?当托管结算在凌晨 3 点无声失败,您多久才能发现?传统监控工具——ping 检查、CPU 图表、运行时间仪表板——无法回答这些关于分布式智能体系统的问题。它们原本是为单体架构和简单请求-响应服务设计的。智能体商业的本质完全不同:事务跨越多个自治智能体,每个智能体拥有独立状态、定价机制和故障模式。一次面向客户的操作可能经过五个智能体、两个托管合约和三次独立计费事件才完成。故障面呈组合式而非线性。
你需要的是可观测性,而不仅仅是监控。监控告诉你“出了问题”,可观测性则告诉你“为什么出问题、在哪里出问题、如何修复”。对于智能体商业而言,这意味着:通过分布式追踪追踪跨智能体边界的事务流;通过自定义指标衡量业务成果(而不仅是基础设施健康);通过智能告警在用户发现问题前及时捕获异常。15 分钟停机与 4 小时收入流失之间的区别,就在于你的可观测性体系是否理解智能体之间的事务流转。
本指南将从零开始构建一个完整的可观测性栈。我们采用 OpenTelemetry 标准以保证互操作性,直接集成 GreenHelix 的指标与事件工具以获取智能体特异性遥测数据,并构建能理解智能体商业独特模式的异常检测机制。所有组件均为可立即部署的生产级 Python 代码。最终你将获得:跨智能体调用的分布式追踪、自定义业务指标、异常检测、可视化仪表板、带升级策略的告警机制,以及带有成本归因的 SLA 监控。
当 Agent A 调用 Agent B,Agent B 再调用 Agent C,而事务耗时从 200ms 增至 12 秒时,瓶颈出现在哪里?当您的智能体集群每日处理 10,000 笔交易,收入下降 15% 时,是哪个智能体表现不佳?当托管结算在凌晨 3 点无声失败,您多久才能发现?传统监控工具——ping 检查、CPU 图表、运行时间仪表板——无法回答这些关于分布式智能体系统的问题。它们原本是为单体架构和简单请求-响应服务设计的。智能体商业的本质完全不同:事务跨越多个自治智能体,每个智能体拥有独立状态、定价机制和故障模式。一次面向客户的操作可能经过五个智能体、两个托管合约和三次独立计费事件才完成。故障面呈组合式而非线性。
你需要的是可观测性,而不仅仅是监控。监控告诉你“出了问题”,可观测性则告诉你“为什么出问题、在哪里出问题、如何修复”。对于智能体商业而言,这意味着:通过分布式追踪追踪跨智能体边界的事务流;通过自定义指标衡量业务成果(而不仅是基础设施健康);通过智能告警在用户发现问题前及时捕获异常。15 分钟停机与 4 小时收入流失之间的区别,就在于你的可观测性体系是否理解智能体之间的事务流转。
本指南将从零开始构建一个完整的可观测性栈。我们采用 OpenTelemetry 标准以保证互操作性,直接集成 GreenHelix 的指标与事件工具以获取智能体特异性遥测数据,并构建能理解智能体商业独特模式的异常检测机制。所有组件均为可立即部署的生产级 Python 代码。最终你将获得:跨智能体调用的分布式追踪、自定义业务指标、异常检测、可视化仪表板、带升级策略的告警机制,以及带有成本归因的 SLA 监控。
快速入门:本指南中所有示例均可在 GreenHelix 沙箱(https://sandbox.greenhelix.net)中运行,提供 500 个免费积分——无需 API 密钥。
分布式系统的可观测性基于三大支柱:追踪(Traces)、指标(Metrics)和日志(Logs)。每一项解决不同类别的问题,且单独使用均不充分。对于智能体商业而言,每项支柱都有特定要求,与传统 Web 服务可观测性存在差异。
追踪是单个事务在系统中流动的完整记录。在智能体商业中,一次追踪可能始于客户智能体发起服务请求,经网关智能体转发,调用专业智能体,触发托管创建,等待履约完成,最终以结算和付款分发结束。旅程中的每一步称为一个“跨度”(span),所有跨度的集合构成一条完整的追踪。
Agent 观测性栈:多智能体系统的分布式追踪、指标与告警
版本:1.3.1
分块:2/17
与传统分布式追踪的关键区别在于,智能体边界不仅是服务边界,更是信任边界。当 Agent A 调用 Agent B 时,该调用中嵌入了经济交易。追踪记录不仅需要包含延迟和状态码,还必须涵盖计费事件、资金托管状态变更以及每个跨度的经济上下文。一个仅显示“200 OK,耗时 150ms”的跨度是不完整的,若未同时标明“已计费 0.003 信用点,创建托管 ID esw_abc123”,则信息缺失。
追踪帮助回答以下问题:
指标是随时间聚合的数值测量结果。与追踪提供单个交易细节不同,指标呈现整体态势。在智能体商业场景中,指标可分为三类。
基础设施指标 衡量智能体作为软件系统本身的健康状况:CPU 使用率、内存消耗、请求队列深度、连接池利用率等。这些属于基础要求,多数监控工具均可良好处理。
性能指标 反映智能体在负载下的行为表现:请求延迟(p50、p95、p99)、吞吐量(每秒请求数)、错误率、超时率。这类指标需使用直方图和百分位计算,而非仅依赖平均值。
业务指标 是智能体商业与传统服务的核心差异所在。需跟踪每个智能体的收入、单次交易成本、托管结算成功率、争议率、SLA 合规率、客户满意度代理指标等。这些指标直接反映你的智能体集群是盈利还是亏损。
GreenHelix 的 submit_metrics 工具支持自定义指标提交,是三类指标的理想汇聚点。该工具接收指标名称、数值、维度和时间戳,可构建丰富且可查询的指标序列。
日志是带有上下文的离散事件。在智能体商业中,结构化日志至关重要,因为需要跨智能体边界关联日志事件。一条纯文本日志如“处理请求出错”在拥有 50 个智能体、每小时生成数千条日志的情况下毫无价值。
结构化日志应包含可机器解析的字段,如追踪 ID、跨度 ID、智能体 ID、交易 ID 及相关业务上下文。当出现问题时,可通过追踪 ID 过滤日志,按时间顺序查看该交易涉及的所有智能体产生的全部事件。
GreenHelix 的 get_events 工具用于获取事件流,作为结构化日志源。事件包括智能体交互、计费事件、托管状态变更、Webhook 发送等。将应用日志与 GreenHelix 事件关联后,可完整还原事件发生过程及原因。
若无指标,你只能在用户投诉后才发现问题;若无追踪,无法定位具体是哪个智能体或哪一步导致故障;若无日志,无法深入理解根本原因以有效修复。智能体商业使这种依赖关系倍增——交互更复杂,故障模式更隐蔽,而遗漏问题的经济后果直接且可量化。
三个 GreenHelix 工具构成观测性栈的基础:
submit_metrics 接收带维度的自定义指标数据点。我们用它将智能体性能指标、业务指标及自定义计数器推送至 GreenHelix 的指标存储。
get_events 按智能体、时间范围和事件类型过滤,获取事件流。我们用它提取结构化事件数据,用于追踪关联与日志增强。
register_webhook 为特定事件类型创建 Webhook 订阅。我们用它在关键事件发生时触发实时告警,避免轮询排查问题。
from greenhelix import GreenHelixClient
client = GreenHelixClient(api_key="your-api-key")
# 提交自定义指标
client.execute_tool("submit_metrics", {
"agent_id": "agent-payment-processor",
"metrics": [
{
"name": "transaction.latency_ms",
"value": 245.3,
"dimensions": {
"agent": "agent-payment-processor",
"operation": "process_payment",
"status": "success"
},
"timestamp": "2026-04-07T14:30:00Z"
}
]
})
# 获取用于追踪关联的事件
events = client.execute_tool("get_events", {
"agent_id": "agent-payment-processor",
"event_types": ["escrow.created", "escrow.settled", "billing.charged"],
"start_time": "2026-04-07T14:00:00Z",
"end_time": "2026-04-07T15:00:00Z"
})
# 注册 Webhook 实现实时告警
client.execute_tool("register_webhook", {
"url": "https://alerts.myfleet.com/webhook",
"event_types": ["escrow.failed", "billing.error"],
"agent_id": "agent-payment-processor"
})这三项工具结合 OpenTelemetry 的追踪与指标 API,足以构建生产级观测性体系。
我们观测性栈的核心是一个理解智能体商业逻辑的追踪器。标准 OpenTelemetry 追踪器仅针对 HTTP 请求和数据库调用创建跨度。我们的 AgentTracer 在 OpenTelemetry 基础上封装,增加了智能体特有上下文:计费事件、托管状态、经济元数据以及跨智能体的追踪传播能力。
追踪器必须轻量。添加可观测性不应显著增加事务延迟。我们目标是每个跨度的开销低于 1ms,这意味着需要使用内存缓冲并异步导出。
追踪器必须兼容 OpenTelemetry。这意味着可以将追踪数据导出到 Jaeger、Zipkin、Grafana Tempo 或任何其他 OTel 兼容后端。我们不会将你锁定在专有格式中。
追踪器必须理解代理商业原语。跨度应自动捕获账单金额、托管 ID、代理身份和交易类型,而无需在每个调用位置手动注入代码。
markdown
本技能提供一套完整的可观测性解决方案,专为多智能体系统设计,支持分布式追踪、指标采集和告警机制。通过 OpenTelemetry 兼容的 AgentTracer,可实现跨智能体调用的端到端追踪,并将关键操作(如交易、托管、计费)转化为结构化指标。
AgentSpanAttributes定义智能体商业操作的标准属性键:
| 属性名 | 说明 |
|---|---|
agent.id | 智能体唯一标识 |
agent.role | 智能体角色(如 buyer, seller) |
agent.transaction.id | 事务 ID |
agent.escrow.id | 托管 ID |
agent.billing.amount | 计费金额 |
agent.billing.currency | 计费货币 |
agent.operation.type | 操作类型(如 payment, transfer) |
agent.peer.id | 对端智能体 ID |
agent.sla.tier | SLA 等级 |
agent.cost_center | 成本中心 |
GreenHelixSpanExporter将追踪跨度(span)导出为指标,提交至 GreenHelix 平台。
client: GreenHelix 客户端实例agent_id: 当前智能体 IDbatch_size: 批量发送大小,默认为 50 - agent.span.duration_ms:跨度持续时间指标
- agent.billing.amount:计费金额指标(仅当存在计费属性时)
submit_metrics 工具。FAILURE。shutdown() 方法无实际操作。_ns_to_iso(ns_timestamp: int) -> str将纳秒级时间戳转换为 ISO 8601 格式字符串。
def _ns_to_iso(ns_timestamp: int) -> str:
import datetime
dt = datetime.datetime.fromtimestamp(ns_timestamp / 1e9, tz=datetime.timezone.utc)
return dt.isoformat()AgentTracerOpenTelemetry 兼容的智能体追踪器,用于记录智能体间的交互行为。
agent_id: 智能体唯一标识client: GreenHelix 客户端对象(可选)service_name: 服务名称,默认为 agent-{agent_id}exporters: 额外的跨度导出器列表(可选)start_span创建一个带有智能体上下文的新跨度。
参数:
name: 跨度名称kind: 跨度类型(默认为 INTERNAL)attributes: 自定义属性peer_agent_id: 对端智能体 ID(用于关联)使用示例:
with tracer.start_span("process.payment", kind=SpanKind.SERVER, attributes={"status": "success"}) as span:
# 执行业务逻辑
passtrace_agent_call追踪从当前智能体向目标智能体发起的调用。
参数:
target_agent_id: 目标智能体 IDoperation: 操作类型(如 transfer, verify)transaction_id: 事务 ID(可选,自动生成 UUID)生成跨度名称:
call.{target_agent_id}.{operation}
跨度类型: CLIENT
trace_escrow追踪托管生命周期操作。
参数:
escrow_id: 托管 IDoperation: 操作类型(如 create, release, refund)生成跨度名称:
escrow.{operation}
跨度类型: INTERNAL
trace_billing追踪计费事件。
参数:
amount: 计费金额currency: 货币单位(默认为 credits)生成跨度名称:
billing.charge
跨度类型: INTERNAL
get_trace_context()提取当前追踪上下文,用于跨智能体传播。
返回值:
traceparent: W3C Trace Context 标头x-agent-id: 当前智能体 IDinject_context(headers: Dict[str, str])将追踪上下文注入 HTTP 请求头中。
示例:
headers = {"Content-Type": "application/json"}
headers = tracer.inject_context(headers)shutdown()刷新所有待处理的跨度并关闭追踪器。
from agent_tracing import AgentTracer
# 初始化追踪器
tracer = AgentTracer(
agent_id="agent-123",
client=greenhelix_client,
service_name="payment-service"
)
# 追踪一次支付调用
with tracer.trace_agent_call(target_agent_id="agent-456", operation="pay") as span:
# 执行远程调用
result = remote_api.call(...)
if not result.success:
span.set_status(StatusCode.ERROR)with 块中使用,以确保正确结束。inject_context 注入请求头。GreenHelixSpanExporter 依赖于外部工具 submit_metrics,请确保其可用。batch_size,以免内存压力过大。版本:1.3.1
分块:5/17
AgentTracer 与 GreenHelix 客户端调用无缝集成。每次 API 交互都会生成一个跨度(span),这些跨度会自动捕获耗时、状态以及智能体交易相关属性。
from greenhelix import GreenHelixClient
client = GreenHelixClient(api_key="your-api-key")
tracer = AgentTracer(agent_id="agent-marketplace", client=client)
# 跟踪多步骤交易
with tracer.trace_agent_call("agent-data-provider", "fetch_dataset") as span:
result = client.execute_tool("call_agent", {
"target": "agent-data-provider",
"operation": "fetch_dataset",
"params": {"dataset_id": "ds_12345"},
"headers": tracer.get_trace_context(),
})
span.set_attribute("dataset.size_bytes", result.get("size", 0))
if result.get("status") == "error":
span.set_status(StatusCode.ERROR, result.get("message", ""))
else:
span.set_status(StatusCode.OK)
# 跟踪托管资金生命周期
with tracer.trace_escrow("esw_abc123", "create") as span:
escrow = client.execute_tool("create_escrow", {
"amount": 5.00,
"payer": "agent-buyer",
"payee": "agent-seller",
})
span.set_attribute("escrow.amount", 5.00)
span.set_status(StatusCode.OK)核心设计原则是:AgentTracer 包装 OpenTelemetry,而非替代它。你可以同时使用标准的 OTel 导出器(如 Jaeger、OTLP、控制台等)和 GreenHelix 导出器。这意味着你的智能体追踪数据可直接出现在现有可观测性基础设施中,无需迁移。
GreenHelixSpanExporter 通过 submit_metrics 将跨度转换为指标。这种双重导出机制使得每个被追踪的操作自动产生延迟和计费指标,无需额外手动注入指标代码。
单个智能体的追踪相对简单。真正的挑战在于追踪一个事务在多个独立智能体之间流转的过程——这些智能体可能运行在不同的基础设施上,由不同团队维护,并通过 GreenHelix 网关进行通信。
当智能体 A 调用智能体 B 时,智能体 B 并不天然知晓智能体 A 的追踪信息。若未显式传播上下文,智能体 B 会启动一个新的、孤立的追踪。最终你得到的是五个彼此割裂的追踪记录,而非完整事务流程的一条统一追踪链。
W3C 追踪上下文标准通过两个 HTTP 头部解决此问题:traceparent(包含追踪 ID、跨度 ID 和采样标志)和 tracestate(包含厂商特定数据)。我们的 AgentTracer 已通过 get_trace_context() 生成这些头部。关键挑战在于确保链路中的每个智能体都能正确提取、使用并继续传播这些头部。
以下是带有追踪的智能体间调用的标准模式:调用方将上下文注入请求头;接收方提取上下文并创建子跨度。
# === 调用方智能体(智能体 A) ===
class TracedAgentClient:
"""自动传播追踪上下文的 HTTP 客户端。"""
def __init__(self, tracer: AgentTracer, client):
self._tracer = tracer
self._client = client
def call_agent(
self,
target_agent: str,
operation: str,
params: Dict[str, Any],
transaction_id: str = None,
) -> Dict[str, Any]:
"""调用另一个智能体,自动传播追踪上下文。"""
with self._tracer.trace_agent_call(
target_agent, operation, transaction_id
) as span:
headers = self._tracer.get_trace_context()
headers["x-transaction-id"] = transaction_id or ""
result = self._client.execute_tool("call_agent", {
"target": target_agent,
"operation": operation,
"params": params,
"headers": headers,
})
span.set_attribute("response.status", result.get("status", "unknown"))
if result.get("error"):
span.set_status(StatusCode.ERROR, result["error"])
else:
span.set_status(StatusCode.OK)
return result
# === 接收方智能体(智能体 B) ===
from opentelemetry.trace.propagation import TraceContextTextMapPropagator
from opentelemetry.context import Context
class TracedAgentHandler:
"""提取并延续追踪上下文的请求处理器。"""
def __init__(self, tracer: AgentTracer):
self._tracer = tracer
self._propagator = TraceContextTextMapPropagator()
def handle_request(
self,
operation: str,
params: Dict[str, Any],
headers: Dict[str, str],
) -> Dict[str, Any]:
"""处理传入的智能体请求,延续追踪链。"""
# 从传入的头部中提取追踪上下文
ctx = self._propagator.extract(carrier=headers)
# 在提取的上下文中创建子跨度
token = attach(ctx)
try:
with self._tracer.start_span(
f"handle.{operation}",
kind=SpanKind.SERVER,
attributes={
"caller.agent_id": headers.get("x-agent-id", "unknown"),
AgentSpanAttributes.OPERATION_TYPE: operation,
},
) as span:
result = self._dispatch(operation, params, span)
return result
finally:
detach(token)
def _dispatch(
self, operation: str, params: Dict[str, Any], span
) -> Dict[str, Any]:
"""路由到对应的操作处理器。"""
handler = getattr(self, f"op_{operation}", None)
if not handler:
span.set_status(StatusCode.ERROR, f"Unknown operation: {operation}")
return {"error": f"Unknown operation: {operation}"}
return handler(params, span)考虑一个客户代理订购数据分析报告的电商交易场景,该交易流程经过五个代理:
def traced_execute(
client: TracedAgentClient,
tracer: AgentTracer,
request: Dict[str, Any],
) -> Dict[str, Any]:
"""跨多个代理执行完整的带追踪交易流程。"""
transaction_id = str(uuid.uuid4())
with tracer.start_span(
"transaction.data_analysis_report",
kind=SpanKind.INTERNAL,
attributes={
AgentSpanAttributes.TRANSACTION_ID: transaction_id,
AgentSpanAttributes.OPERATION_TYPE: "data_analysis_report",
},
) as root_span:
# 步骤 1:通过市场代理寻找服务提供方
with tracer.start_span("step.find_provider") as step_span:
match = client.call_agent(
"agent-marketplace",
"find_provider",
{"capability": "data_analysis", "budget": request["budget"]},
transaction_id=transaction_id,
)
step_span.set_attribute("provider.matched", match.get("provider_id", ""))
provider_id = match["provider_id"]
# 步骤 2:创建交易托管
with tracer.trace_escrow(f"esw_{transaction_id[:8]}", "create") as esc_span:
escrow = client.call_agent(
"agent-marketplace",
"create_escrow",
{
"amount": match["price"],
"payer": request["customer_agent"],
"payee": provider_id,
},
transaction_id=transaction_id,
)
esc_span.set_attribute("escrow.id", escrow.get("escrow_id", ""))
# 步骤 3:获取原始数据
with tracer.start_span("step.fetch_data") as step_span:
data = client.call_agent(
match["data_source"],
"fetch_dataset",
{"dataset_id": request["dataset_id"]},
transaction_id=transaction_id,
)
step_span.set_attribute("data.records", data.get("record_count", 0))
# 步骤 4:执行分析
with tracer.start_span("step.analyze") as step_span:
analysis = client.call_agent(
provider_id,
"analyze_data",
{"data": data["data"], "analysis_type": request["analysis_type"]},
transaction_id=transaction_id,
)
step_span.set_attribute("analysis.confidence", analysis.get("confidence", 0))
# 步骤 5:交付报告
with tracer.start_span("step.deliver") as step_span:
delivery = client.call_agent(
"agent-delivery",
"deliver_report",
{
"report": analysis["report"],
"recipient": request["customer_agent"],
"format": request.get("format", "pdf"),
},
transaction_id=transaction_id,
)
step_span.set_attribute("delivery.channel", delivery.get("channel", ""))
# 步骤 6:结算托管
with tracer.trace_escrow(escrow["escrow_id"], "settle") as esc_span:
settlement = client.call_agent(
"agent-marketplace",
"settle_escrow",
{"escrow_id": escrow["escrow_id"]},
transaction_id=transaction_id,
)
esc_span.set_attribute("settlement.status", settlement.get("status", ""))
root_span.set_status(StatusCode.OK)
return {
"transaction_id": transaction_id,
"report_url": delivery.get("url"),
"total_cost": match["price"],
}GreenHelix 事件拥有独立的事件 ID。为了获得完整的视图,需要将 OpenTelemetry 的追踪 ID 与 GreenHelix 的事件 ID 进行关联。方法是在调用 GreenHelix API 时嵌入追踪 ID,然后在分析阶段基于该 ID 进行关联。
def trace_escrow_lifecycle(
tracer: AgentTracer,
client,
escrow_id: str,
) -> Dict[str, Any]:
"""追踪并关联托管资金的完整生命周期与 GreenHelix 事件。"""
trace_ctx = tracer.get_trace_context()
trace_id = trace_ctx.get("traceparent", "").split("-")[1] if trace_ctx else ""
with tracer.trace_escrow(escrow_id, "lifecycle") as span:
# 获取该托管资金相关的 GreenHelix 事件
events = client.execute_tool("get_events", {
"filters": {"escrow_id": escrow_id},
"start_time": "2026-04-07T00:00:00Z",
"end_time": "2026-04-07T23:59:59Z",
})
lifecycle = {
"escrow_id": escrow_id,
"trace_id": trace_id,
"events": [],
}
for event in events.get("events", []):
event_type = event.get("type", "")
lifecycle["events"].append({
"type": event_type,
"timestamp": event.get("timestamp"),
"greenhelix_event_id": event.get("event_id"),
"trace_id": trace_id,
})
# 为每个 GreenHelix 事件创建子跨度
with tracer.start_span(
f"greenhelix.event.{event_type}",
attributes={
"greenhelix.event_id": event.get("event_id", ""),
"greenhelix.event_type": event_type,
AgentSpanAttributes.ESCROW_ID: escrow_id,
},
) as event_span:
event_span.set_status(StatusCode.OK)
span.set_attribute("lifecycle.event_count", len(lifecycle["events"]))
return lifecycleOpenTelemetry 通过 Python 上下文管理器自动管理父-子跨度关系。当你嵌套使用 start_span 调用时,每个内部跨度都会成为外部跨度的子跨度。这在 Trace 可视化工具(如 Jaeger)中形成树状结构。
对于代理商业场景,典型的层级结构如下:
transaction.data_analysis_report (根跨度)
|-- step.find_provider
| |-- call.agent-marketplace.find_provider (CLIENT)
|-- escrow.create
| |-- call.agent-marketplace.create_escrow (CLIENT)
|-- step.fetch_data
| |-- call.agent-data-source.fetch_dataset (CLIENT)
|-- step.analyze
| |-- call.agent-analysis.analyze_data (CLIENT)
|-- step.deliver
| |-- call.agent-delivery.deliver_report (CLIENT)
|-- escrow.settle
|-- call.agent-marketplace.settle_escrow (CLIENT)每个调用方的 CLIENT 跨度,在接收方对应一个 SERVER 跨度。所有代理共享相同的 trace ID,因此 Trace 可视化工具可将整个交易呈现为跨越五个代理的单一、统一的追踪记录。
追踪提供单个事务的详细信息。指标则提供聚合视角:当前集群表现如何?今天与昨天相比有何变化?是否达成业务目标?MetricsCollector 类提供了一个清晰的 API,用于从代理集群中收集、缓冲并导出指标。
代理商业场景中存在三种基础指标类型:
计数器(Counters) 是单调递增的数值。适用于统计总请求量、总错误数、总收入、总交易数等。用于回答“有多少”的问题。
仪表(Gauges) 是某一时刻的瞬时值,可上升或下降。适用于活跃连接数、队列深度、当前余额、活跃托管资金等。用于回答“当前是多少”的问题。
直方图(Histograms) 用于跟踪数值的分布情况。适用于延迟时间、交易金额、响应大小等。直方图提供百分位数(如 p50、p95、p99),而非仅平均值,从而回答“分布情况如何”的问题。
import time
import math
import threading
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any
@dataclass
class MetricPoint:
"""单个指标数据点。"""
name: str
value: float
dimensions: Dict[str, str]
timestamp: float
metric_type: str # "counter", "gauge", "histogram"
class HistogramBuckets:
"""用于跟踪直方图指标的值分布。"""
def __init__(self, boundaries: List[float] = None):
self.boundaries = boundaries or [
5, 10, 25, 50, 100, 250, 500, 1000, 2500, 5000, 10000
]
self.buckets = [0] * (len(self.boundaries) + 1)
self.sum = 0.0
self.count = 0
self.min = float("inf")
self.max = float("-inf")
self._values: List[float] = []
def observe(self, value: float):
self.sum += value
self.count += 1
self.min = min(self.min, value)
self.max = max(self.max, value)
self._values.append(value)
for i, boundary in enumerate(self.boundaries):
if value <= boundary:
self.buckets[i] += 1
return
self.buckets[-1] += 1
def percentile(self, p: float) -> float:
if not self._values:
return 0.0
sorted_vals = sorted(self._values)
idx = int(math.ceil(p / 100.0 * len(sorted_vals))) - 1
return sorted_vals[max(0, idx)]
def reset(self):
self.buckets = [0] * (len(self.boundaries) + 1)
self.sum = 0.0
self.count = 0
self.min = float("inf")
self.max = float("-inf")
self._values = []
class MetricsCollector:
"""收集、缓冲并导出代理商业指标。"""
def __init__(
self,
agent_id: str,
client=None,
flush_interval_seconds: float = 60.0,
buffer_size: int = 1000,
):
self.agent_id = agent_id
self._client = client
self._flush_interval = flush_interval_seconds
self._buffer_size = buffer_size
self._counters: Dict[str, float] = defaultdict(float)
self._gauges: Dict[str, float] = {}
self._histograms: Dict[str, HistogramBuckets] = {}
self._buffer: List[MetricPoint] = []
self._lock = threading.Lock()
self._dimensions_registry: Dict[str, Dict[str, str]] = {}
self._flush_callbacks: List[Callable] = []
self._running = False
self._flush_thread: Optional[threading.Thread] = None
def start(self):
"""启动后台刷新线程。"""
self._running = True
self._flush_thread = threading.Thread(
target=self._flush_loop, daemon=True
)
self._flush_thread.start()
def stop(self):
"""停止后台刷新线程并刷新剩余指标。"""
self._running = False
if self._flush_thread:
self._flush_thread.join(timeout=5.0)
self.flush()
# --- 计数器操作 ---
def increment(
self,
name: str,
value: float = 1.0,
dimensions: Dict[str, str] = None,
):
"""递增一个计数器指标。"""
key = self._make_key(name, dimensions)
with self._lock:
self._counters[key] += value
self._dimensions_registry[key] = dimensions or {}
self._buffer_point(name, self._counters[key], dimensions, "counter")
# --- 仪表盘操作 ---
def gauge_set(
self,
name: str,
value: float,
dimensions: Dict[str, str] = None,
):
"""将一个仪表盘指标设置为绝对值。"""
key = self._make_key(name, dimensions)
with self._lock:
self._gauges[key] = value
self._dimensions_registry[key] = dimensions or {}
self._buffer_point(name, value, dimensions, "gauge")
def gauge_increment(
self,
name: str,
value: float = 1.0,
dimensions: Dict[str, str] = None,
):
"""递增一个仪表盘指标。"""
key = self._make_key(name, dimensions)
with self._lock:
self._gauges[key] = self._gauges.get(key, 0.0) + value
self._dimensions_registry[key] = dimensions or {}
self._buffer_point(name, self._gauges[key], dimensions, "gauge")
# --- 直方图操作 ---
def observe(
self,
name: str,
value: float,
dimensions: Dict[str, str] = None,
boundaries: List[float] = None,
):
"""记录一个直方图指标的观测值。"""
key = self._make_key(name, dimensions)
with self._lock:
if key not in self._histograms:
self._histograms[key] = HistogramBuckets(boundaries)
self._histograms[key].observe(value)
self._dimensions_registry[key] = dimensions or {}
self._buffer_point(name, value, dimensions, "histogram")
# --- 业务指标辅助方法 ---
def record_transaction(
self,
operation: str,
duration_ms: float,
amount: float,
status: str,
peer_agent: str = "",
):
"""记录一次完整的交易,包含所有标准指标。"""
dims = {
"operation": operation,
"status": status,
"peer_agent": peer_agent,
}
self.increment("transactions.total", 1.0, dims)
self.observe("transactions.duration_ms", duration_ms, dims)
self.observe("transactions.amount", amount, dims)
if status == "error":
self.increment("transactions.errors", 1.0, dims)
def record_revenue(self, amount: float, source: str, currency: str = "credits"):
"""记录来自代理操作的收入。"""
dims = {"source": source, "currency": currency}
self.increment("revenue.total", amount, dims)
def record_cost(self, amount: float, category: str, currency: str = "credits"):
"""记录代理产生的成本。"""
dims = {"category": category, "currency": currency}
self.increment("costs.total", amount, dims)
def record_escrow_event(self, event_type: str, amount: float, escrow_id: str):
"""记录一笔托管生命周期事件。"""
dims = {"event_type": event_type}
self.increment(f"escrow.{event_type}", 1.0, dims)
self.observe("escrow.amount", amount, dims)
# --- 导出与刷新 ---
def flush(self):
"""将缓冲的指标发送至 GreenHelix。"""
with self._lock:
if not self._buffer:
return
batch = self._buffer[:]
self._buffer.clear()
if self._client:
metrics_payload = [
{
"name": point.name,
"value": point.value,
"dimensions": {
"agent": self.agent_id,
**point.dimensions,
},
"timestamp": _ts_to_iso(point.timestamp),
}
for point in batch
]
try:
self._client.execute_tool("submit_metrics", {
"agent_id": self.agent_id,
"metrics": metrics_payload,
})
except Exception as e:
# 失败时重新缓冲以供重试
with self._lock:
self._buffer = batch + self._buffer
# 防止缓冲区无限增长
if len(self._buffer) > self._buffer_size * 2:
self._buffer = self._buffer[-self._buffer_size:]
for callback in self._flush_callbacks:
try:
callback(batch)
except Exception:
pass
def get_histogram_summary(
self, name: str, dimensions: Dict[str, str] = None
) -> Dict[str, float]:
"""获取直方图指标的汇总统计信息。"""
key = self._make_key(name, dimensions)
with self._lock:
hist = self._histograms.get(key)
if not hist or hist.count == 0:
return {}
return {
"count": hist.count,
"sum": hist.sum,
"min": hist.min,
"max": hist.max,
"avg": hist.sum / hist.count,
"p50": hist.percentile(50),
"p95": hist.percentile(95),
"p99": hist.percentile(99),
}
def on_flush(self, callback: Callable):
"""注册一个在每次刷新时调用的回调函数。"""
self._flush_callbacks.append(callback)
# --- 内部方法 ---
def _buffer_point(
self,
name: str,
value: float,
dimensions: Dict[str, str],
metric_type: str,
):
point = MetricPoint(
name=name,
value=value,
dimensions=dimensions or {},
timestamp=time.time(),
metric_type=metric_type,
)
self._buffer.append(point)
if len(self._buffer) >= self._buffer_size:
threading.Thread(target=self.flush, daemon=True).start()
def _flush_loop(self):
while self._running:
time.sleep(self._flush_interval)
self.flush()
@staticmethod
def _make_key(name: str, dimensions: Dict[str, str] = None) -> str:
if not dimensions:
return name
dim_str = ",".join(f"{k}={v}" for k, v in sorted(dimensions.items()))
return f"{name}{{{dim_str}}}"
def _ts_to_iso(ts: float) -> str:
import datetime
dt = datetime.datetime.fromtimestamp(ts, tz=datetime.timezone.utc)
return dt.isoformat()client = GreenHelixClient(api_key="your-api-key")
metrics = MetricsCollector(agent_id="agent-marketplace", client=client)
metrics.start()
# 记录一次事务
start = time.time()
result = process_order(order)
duration_ms = (time.time() - start) * 1000
metrics.record_transaction(
operation="process_order",
duration_ms=duration_ms,
amount=order["amount"],
status="success" if result["ok"] else "error",
peer_agent=order["seller_agent"],
)
# 记录收入和成本
metrics.record_revenue(order["commission"], source="marketplace_fee")
metrics.record_cost(order["gateway_fee"], category="gateway")
# 记录托管事件
metrics.record_escrow_event("created", order["amount"], result["escrow_id"])
# 获取延迟百分位数
summary = metrics.get_histogram_summary(
"transactions.duration_ms",
{"operation": "process_order", "status": "success"},
)
print(f"p50={summary['p50']:.0f}ms p95={summary['p95']:.0f}ms p99={summary['p99']:.0f}ms")
# 正常关闭
metrics.stop()MetricsCollector 将指标暂存在内存中,并按周期(默认 60 秒)或缓冲区满时进行批量发送。这种批处理机制可减少对 GreenHelix 的 API 调用次数,同时确保指标不会丢失。在发送失败时具备重试逻辑,会将数据重新放入缓冲区,等待下一次发送周期,且设有上限以防止内存无限制增长。
原始指标是必要的,但并不足够。一位管理 50 个代理的运营者无法持续监控 200 个仪表板来发现潜在问题。你需要自动化异常检测能力,能够理解正常行为模式,并在出现偏离时发出告警。
针对代理商业场景的异常检测与传统基础设施监控存在两点不同。第一,代理流量模式通常具有突发性和非均匀性:例如一个市场代理在工作时间可能每分钟处理 100 个请求,而在夜间仅处理 5 个。第二,收入等业务指标具有季节性规律(日、周、月),必须在检测中予以考虑。
我们的 AnomalyDetector 采用三种互补的检测方法:Z-score 用于识别突发性峰值,百分比变化用于捕捉趋势偏移,季节性分解用于发现模式异常。
import math
import time
from collections import deque
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Tuple
from enum import Enum
class AnomalyType(Enum):
SPIKE = "spike"
DROP = "drop"
TREND_CHANGE = "trend_change"
SEASONAL_VIOLATION = "seasonal_violation"
class Severity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class Anomaly:
"""指标序列中检测到的异常。"""
metric_name: str
anomaly_type: AnomalyType
severity: Severity
current_value: float
expected_value: float
deviation: float
timestamp: float
dimensions: Dict[str, str] = field(default_factory=dict)
message: str = ""
class MetricWindow:
"""用于统计分析的滑动窗口,存储指标值。"""
def __init__(self, window_size: int = 60):
self._values: deque = deque(maxlen=window_size)
self._timestamps: deque = deque(maxlen=window_size)
def add(self, value: float, timestamp: float = None):
self._values.append(value)
self._timestamps.append(timestamp or time.time())
@property
def count(self) -> int:
return len(self._values)
@property
def mean(self) -> float:
if not self._values:
return 0.0
return sum(self._values) / len(self._values)
@property
def std(self) -> float:
if len(self._values) < 2:
return 0.0
m = self.mean
variance = sum((v - m) ** 2 for v in self._values) / (len(self._values) - 1)
return math.sqrt(variance)
@property
def values(self) -> List[float]:
return list(self._values)
@property
def latest(self) -> Optional[float]:
return self._values[-1] if self._values else None
def rate_of_change(self, periods: int = 5) -> Optional[float]:
"""计算最近 N 个周期的变化率。"""
if len(self._values) < periods + 1:
return None
recent = list(self._values)[-periods:]
older = list(self._values)[-(periods + 1)]
if older == 0:
return None
return (recent[-1] - older) / older
class SeasonalProfile:
"""跟踪指标的季节性模式(按小时分桶)。"""
def __init__(self, num_buckets: int = 24):
self._num_buckets = num_buckets
self._bucket_values: Dict[int, List[float]] = {
i: [] for i in range(num_buckets)
}
def add(self, value: float, timestamp: float):
import datetime
dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
bucket = dt.hour # 按小时分桶
self._bucket_values[bucket].append(value)
def expected_value(self, timestamp: float) -> Optional[Tuple[float, float]]:
"""返回该时间点的预期值(均值和标准差)。"""
import datetime
dt = datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc)
bucket = dt.hour
values = self._bucket_values.get(bucket, [])
if len(values) < 7: # 至少需要一周数据
return None
m = sum(values) / len(values)
variance = sum((v - m) ** 2 for v in values) / (len(values) - 1)
return m, math.sqrt(variance)
class AnomalyDetector:
"""检测代理商业指标流中的异常。"""
def __init__(
self,
z_score_threshold: float = 3.0,
pct_change_threshold: float = 0.30,
seasonal_z_threshold: float = 2.5,
min_data_points: int = 20,
):
self._z_threshold = z_score_threshold
self._pct_threshold = pct_change_threshold
self._seasonal_z_threshold = seasonal_z_threshold
self._min_data_points = min_data_points
self._windows: Dict[str, MetricWindow] = {}
self._seasonal: Dict[str, SeasonalProfile] = {}
self._anomaly_callbacks: List = []
def observe(
self,
metric_name: str,
value: float,
dimensions: Dict[str, str] = None,
timestamp: float = None,
) -> List[Anomaly]:
"""观察一个指标值并检查是否存在异常。"""
ts = timestamp or time.time()
key = self._make_key(metric_name, dimensions)
if key not in self._windows:
self._windows[key] = MetricWindow(window_size=120)
self._seasonal[key] = SeasonalProfile()
window = self._windows[key]
seasonal = self._seasonal[key]
window.add(value, ts)
seasonal.add(value, ts)
anomalies = []
if window.count >= self._min_data_points:
# Z-score 异常检测
z_anomaly = self._check_z_score(
metric_name, value, window, dimensions, ts
)
if z_anomaly:
anomalies.append(z_anomaly)
# 百分比变化异常检测
pct_anomaly = self._check_pct_change(
metric_name, value, window, dimensions, ts
)
if pct_anomaly:
anomalies.append(pct_anomaly)
# 季节性异常检测
seasonal_anomaly = self._check_seasonal(
metric_name, value, seasonal, dimensions, ts
)
if seasonal_anomaly:
anomalies.append(seasonal_anomaly)
for anomaly in anomalies:
for callback in self._anomaly_callbacks:
try:
callback(anomaly)
except Exception:
pass
return anomalies
def on_anomaly(self, callback):
"""注册异常检测回调函数。"""
self._anomaly_callbacks.append(callback)
def _check_z_score(
self,
metric_name: str,
value: float,
window: MetricWindow,
dimensions: Dict[str, str],
timestamp: float,
) -> Optional[Anomaly]:
"""使用 Z-score 方法检测异常(与均值的标准差偏离)。"""
std = window.std
if std == 0:
return None
z_score = abs(value - window.mean) / std
if z_score < self._z_threshold:
return None
anomaly_type = AnomalyType.SPIKE if value > window.mean else AnomalyType.DROP
severity = self._z_to_severity(z_score)
return Anomaly(
metric_name=metric_name,
anomaly_type=anomaly_type,
severity=severity,
current_value=value,
expected_value=window.mean,
deviation=z_score,
timestamp=timestamp,
dimensions=dimensions or {},
message=f"{metric_name} 偏离均值 {z_score:.1f} 个标准差 "
f"{'高于' if value > window.mean else '低于'} "
f"(当前={value:.2f}, 均值={window.mean:.2f}, 标准差={std:.2f})",
)
def _check_pct_change(
self,
metric_name: str,
value: float,
window: MetricWindow,
dimensions: Dict[str, str],
timestamp: float,
) -> Optional[Anomaly]:
"""使用近期百分比变化检测异常。"""
roc = window.rate_of_change(periods=5)
if roc is None or abs(roc) < self._pct_threshold:
return None
anomaly_type = AnomalyType.TREND_CHANGE
severity = Severity.MEDIUM if abs(roc) < 0.5 else Severity.HIGH
return Anomaly(
metric_name=metric_name,
anomaly_type=anomaly_type,
severity=severity,
current_value=value,
expected_value=window.mean,
deviation=roc,
timestamp=timestamp,
dimensions=dimensions or {},
message=f"{metric_name} 在最近 5 个周期内变化了 {roc*100:.1f}% "
f"(当前={value:.2f})",
)
def _check_seasonal(
self,
metric_name: str,
value: float,
seasonal: SeasonalProfile,
dimensions: Dict[str, str],
timestamp: float,
) -> Optional[Anomaly]:
"""相对于季节性模式检测异常。"""
expected = seasonal.expected_value(timestamp)
if expected is None:
return None
mean, std = expected
if std == 0:
return None
z_score = abs(value - mean) / std
if z_score < self._seasonal_z_threshold:
return None
return Anomaly(
metric_name=metric_name,
anomaly_type=AnomalyType.SEASONAL_VIOLATION,
severity=self._z_to_severity(z_score),
current_value=value,
expected_value=mean,
deviation=z_score,
timestamp=timestamp,
dimensions=dimensions or {},
message=f"{metric_name} 偏离季节性预期 {z_score:.1f} 个标准差 "
f"(当前={value:.2f}, 预期={mean:.2f} ± {std:.2f})",
)
@staticmethod
def _z_to_severity(z_score: float) -> Severity:
if z_score >= 5.0:
return Severity.CRITICAL
elif z_score >= 4.0:
return Severity.HIGH
elif z_score >= 3.0:
return Severity.MEDIUM
return Severity.LOW
@staticmethod
def _make_key(name: str, dimensions: Dict[str, str] = None) -> str:
if not dimensions:
return name
dim_str = ",".join(f"{k}={v}" for k, v in sorted(dimensions.items()))
return f"{name}{{{dim_str}}}"AnomalyDetector 与 MetricsCollector 协同工作。将每个指标观测值同时传递给两者:
detector = AnomalyDetector(
z_score_threshold=3.0,
pct_change_threshold=0.30,
min_data_points=20,
)
metrics = MetricsCollector(agent_id="agent-marketplace", client=client)
def record_and_detect(
metric_name: str,
value: float,
dimensions: Dict[str, str] = None,
):
"""记录指标并一次性检查异常。"""
metrics.observe(metric_name, value, dimensions)
anomalies = detector.observe(metric_name, value, dimensions)
for anomaly in anomalies:
print(f"ANOMALY: {anomaly.message}")
# 将异常信息作为指标回传,用于元监控
metrics.increment(
"anomalies.detected",
1.0,
{
"metric": anomaly.metric_name,
"type": anomaly.anomaly_type.value,
"severity": anomaly.severity.value,
},
)
return anomalies
# 示例:检测延迟突增
record_and_detect(
"transactions.duration_ms",
12500.0, # 12.5 秒 —— 很可能属于异常
{"operation": "process_order"},
)检测器采用滑动窗口机制,能够自适应变化的基线。如果代理的延迟在一周内从 200ms 逐步上升至 300ms,检测器会自动调整基线,不会产生误报。但如果延迟在 1 分钟内从 300ms 突增至 3000ms,Z-score 会立即捕捉到这一异常。
季节性分析模型可避免在已知流量模式下产生误报。例如,若代理在夜间处理的交易量较少,从每秒 100 次降至每秒 10 次在午夜是正常现象——季节性模型知道该时段本就如此,不会将其判定为异常。
异常检测能自动发现问题,但运维人员仍需仪表盘来实现态势感知、容量规划和利益相关方报告。本章介绍代理商业运营所需的三个核心仪表盘,以及填充它们所需的数据源和查询。
集群健康仪表盘是你怀疑系统出问题时最先查看的内容。它回答的问题是:“当前所有组件是否都处于健康状态?”
关键面板:
transactions.errors / transactions.total,按分钟聚合。escrow.active_count。transactions.duration_ms,p95 聚合。def build_fleet_health_query(agent_ids: List[str]) -> Dict[str, Any]:
"""构建集群健康仪表盘的查询配置。"""
return {
"panels": [
{
"title": "代理状态网格",
"type": "status_grid",
"query": {
"metrics": ["agent.heartbeat.age_seconds", "transactions.errors"],
"group_by": ["agent"],
"thresholds": {
"green": {"error_rate": 0.01, "heartbeat_age": 60},
"yellow": {"error_rate": 0.05, "heartbeat_age": 120},
"red": {"error_rate": 1.0, "heartbeat_age": float("inf")},
},
},
},
{
"title": "集群错误率",
"type": "time_series",
"query": {
"metric": "transactions.errors",
"aggregation": "rate",
"interval": "1m",
"time_range": "24h",
},
},
{
"title": "活跃托管数",
"type": "gauge_with_trend",
"query": {
"metric": "escrow.active_count",
"aggregation": "latest",
"trend_window": "6h",
},
},
{
"title": "延迟热力图",
"type": "heatmap",
"query": {
"metric": "transactions.duration_ms",
"aggregation": "p95",
"group_by": ["agent"],
"interval": "15m",
"time_range": "6h",
},
},
],
}交易流程仪表盘展示交易在代理集群中的流转过程。它回答的问题是:“我们的交易正在经历什么?”
关键面板:
peer_agent 维度的计数器指标 transactions.total。operation 维度分组的直方图指标 transactions.duration_ms,执行 Top-N 查询。operation 维度分组的计数器指标 transactions.total。该仪表板面向业务相关方和容量规划人员,回答核心问题:“我们是否盈利?利润流向何处?”
关键面板:
revenue.total 减去 costs.total 的计数器指标。agent 维度分组的计数器指标 revenue.total。category 维度分组的计数器指标 costs.total。revenue.total。revenue.total 和 costs.total 指标除以 transactions.total,按 operation 分组计算得出。每个仪表板应根据其用途设置合适的自动刷新频率:集群健康状态每 30 秒刷新一次,交易流每分钟刷新一次,收入与成本每 5 分钟刷新一次。所有底层数据均来自通过 submit_metrics 推送至 GreenHelix 的 MetricsCollector 指标,并用于可视化查询。
仪表板需要有人持续监控。告警机制将问题及时推送给相关人员。AlertManager 会评估告警规则,对相关告警进行去重与分组,通过合适渠道路由,并在未被确认时进行升级处理。
三种告警规则覆盖了代理商业务中关键的场景:
阈值告警 在指标超过静态或动态边界时触发。例如:“错误率超过 5%。” 这类告警逻辑简单且可靠,适用于已知的明确边界。
变化率告警 在指标变化速度过快时触发。例如:“交易量在 10 分钟内下降超过 30%。” 此类告警能捕捉阈值告警无法发现的问题,因为绝对值可能仍处于“正常”范围。
基于异常的告警 在 AnomalyDetector 识别出统计异常时触发。这类告警能够自适应基线变化和季节性模式,可发现需频繁调整阈值才能捕捉的细微问题。
import time
import uuid
import threading
import json
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Callable, Any
from enum import Enum
from collections import defaultdict
class AlertState(Enum):
FIRING = "firing"
ACKNOWLEDGED = "acknowledged"
RESOLVED = "resolved"
class AlertChannel(Enum):
WEBHOOK = "webhook"
EMAIL = "email"
SLACK = "slack"
PAGERDUTY = "pagerduty"
@dataclass
class AlertRule:
"""告警规则定义。"""
name: str
metric_name: str
condition: str # "threshold", "rate_of_change", "anomaly"
threshold: Optional[float] = None
comparison: str = "gt" # "gt", "lt", "gte", "lte"
window_minutes: int = 5
severity: str = "medium"
channels: List[AlertChannel] = field(default_factory=list)
dimensions_filter: Dict[str, str] = field(default_factory=dict)
cooldown_minutes: int = 15
escalation_minutes: int = 30
description: str = ""
runbook_url: str = ""
@dataclass
class Alert:
"""一个活跃的告警实例。"""
id: str
rule: AlertRule
state: AlertState
fired_at: float
last_value: float
message: str
acknowledged_at: Optional[float] = None
acknowledged_by: Optional[str] = None
resolved_at: Optional[float] = None
escalated: bool = False
notification_count: int = 0
dimensions: Dict[str, str] = field(default_factory=dict)
class AlertManager:
"""管理告警规则、路由、去重和升级。"""
def __init__(
self,
agent_id: str,
client=None,
anomaly_detector: "AnomalyDetector" = None,
):
self.agent_id = agent_id
self._client = client
self._detector = anomaly_detector
self._rules: Dict[str, AlertRule] = {}
self._active_alerts: Dict[str, Alert] = {}
self._alert_history: List[Alert] = []
self._cooldowns: Dict[str, float] = {}
self._lock = threading.Lock()
# 通道处理器
self._channel_handlers: Dict[AlertChannel, Callable] = {}
# 升级线程
self._running = False
self._escalation_thread: Optional[threading.Thread] = None
def start(self):
"""启动升级监控线程。"""
self._running = True
self._escalation_thread = threading.Thread(
target=self._escalation_loop, daemon=True
)
self._escalation_thread.start()
def stop(self):
self._running = False
if self._escalation_thread:
self._escalation_thread.join(timeout=5.0)
# --- 规则管理 ---
def add_rule(self, rule: AlertRule):
"""添加一个告警规则。"""
self._rules[rule.name] = rule
def remove_rule(self, name: str):
"""移除一个告警规则。"""
self._rules.pop(name, None)
# --- 通道配置 ---
def register_channel(self, channel: AlertChannel, handler: Callable):
"""为指定通道注册通知处理器。"""
self._channel_handlers[channel] = handler
def configure_webhook(self, url: str):
"""使用 URL 配置 Webhook 通道。"""
import urllib.request
def webhook_handler(alert: Alert, message: str):
payload = json.dumps({
"alert_id": alert.id,
"rule": alert.rule.name,
"severity": alert.rule.severity,
"state": alert.state.value,
"message": message,
"value": alert.last_value,
"fired_at": alert.fired_at,
"agent_id": self.agent_id,
"dimensions": alert.dimensions,
}).encode()
req = urllib.request.Request(
url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=10)
except Exception:
pass
self._channel_handlers[AlertChannel.WEBHOOK] = webhook_handler
def configure_slack(self, webhook_url: str, default_channel: str = "#alerts"):
"""配置 Slack 通道。"""
import urllib.request
def slack_handler(alert: Alert, message: str):
severity_emoji = {
"critical": ":red_circle:",
"high": ":large_orange_circle:",
"medium": ":large_yellow_circle:",
"low": ":white_circle:",
}
emoji = severity_emoji.get(alert.rule.severity, ":grey_question:")
payload = json.dumps({
"channel": default_channel,
"text": f"{emoji} *{alert.rule.name}*\n{message}",
"attachments": [
{
"color": {
"critical": "#ff0000",
"high": "#ff8800",
"medium": "#ffcc00",
"low": "#cccccc",
}.get(alert.rule.severity, "#cccccc"),
"fields": [
{"title": "Agent", "value": self.agent_id, "short": True},
{"title": "Value", "value": str(alert.last_value), "short": True},
{"title": "Severity", "value": alert.rule.severity, "short": True},
{"title": "Alert ID", "value": alert.id, "short": True},
],
}
],
}).encode()
req = urllib.request.Request(
webhook_url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=10)
except Exception:
pass
self._channel_handlers[AlertChannel.SLACK] = slack_handler
# --- 告警评估 ---
def evaluate(
self,
metric_name: str,
value: float,
dimensions: Dict[str, str] = None,
) -> List[Alert]:
"""根据指标值评估所有规则。"""
new_alerts = []
dims = dimensions or {}
for rule in self._rules.values():
if rule.metric_name != metric_name:
continue
# 检查维度过滤器
if rule.dimensions_filter:
if not all(
dims.get(k) == v for k, v in rule.dimensions_filter.items()
):
continue
# 检查冷却期
cooldown_key = f"{rule.name}:{self._dim_key(dims)}"
if cooldown_key in self._cooldowns:
if time.time() - self._cooldowns[cooldown_key] < rule.cooldown_minutes * 60:
continue
triggered = False
if rule.condition == "threshold":
triggered = self._check_threshold(rule, value)
elif rule.condition == "rate_of_change":
triggered = self._check_rate_of_change(rule, value, metric_name, dims)
elif rule.condition == "anomaly" and self._detector:
anomalies = self._detector.observe(metric_name, value, dims)
triggered = len(anomalies) > 0
if triggered:
alert = self._fire_alert(rule, value, dims)
if alert:
new_alerts.append(alert)
self._cooldowns[cooldown_key] = time.time()
# 检查自动解除
self._check_resolutions(metric_name, value, dims)
return new_alerts
def acknowledge(self, alert_id: str, acknowledged_by: str = "operator"):
"""确认一个活跃的告警。"""
with self._lock:
alert = self._active_alerts.get(alert_id)
if alert and alert.state == AlertState.FIRING:
alert.state = AlertState.ACKNOWLEDGED
alert.acknowledged_at = time.time()
alert.acknowledged_by = acknowledged_by
def resolve(self, alert_id: str):
"""手动解除告警。"""
with self._lock:
alert = self._active_alerts.get(alert_id)
if alert:
alert.state = AlertState.RESOLVED
alert.resolved_at = time.time()
self._alert_history.append(alert)
del self._active_alerts[alert_id]
self._notify(alert, f"RESOLVED: {alert.rule.name}")
def get_active_alerts(
self, severity: str = None
) -> List[Alert]:
"""获取所有活跃(触发或已确认)的告警。"""
with self._lock:
alerts = list(self._active_alerts.values())
if severity:
alerts = [a for a in alerts if a.rule.severity == severity]
return sorted(alerts, key=lambda a: a.fired_at, reverse=True)
# --- 内部方法 ---
def _check_threshold(self, rule: AlertRule, value: float) -> bool:
if rule.threshold is None:
return False
ops = {
"gt": lambda v, t: v > t,
"lt": lambda v, t: v < t,
"gte": lambda v, t: v >= t,
"lte": lambda v, t: v <= t,
}
op = ops.get(rule.comparison, ops["gt"])
return op(value, rule.threshold)
def _check_rate_of_change(
self,
rule: AlertRule,
value: float,
metric_name: str,
dimensions: Dict[str, str],
) -> bool:
if not self._detector:
return False
key = self._detector._make_key(metric_name, dimensions)
window = self._detector._windows.get(key)
if not window:
return False
roc = window.rate_of_change(periods=5)
if roc is None or rule.threshold is None:
return False
return abs(roc) > rule.threshold
def _fire_alert(
self,
rule: AlertRule,
value: float,
dimensions: Dict[str, str],
) -> Optional[Alert]:
# 去重:检查相同规则+维度是否已触发
dedup_key = f"{rule.name}:{self._dim_key(dimensions)}"
with self._lock:
for alert in self._active_alerts.values():
existing_key = f"{alert.rule.name}:{self._dim_key(alert.dimensions)}"
if existing_key == dedup_key:
# 更新现有告警而非创建新告警
alert.last_value = value
alert.notification_count += 1
return None
alert = Alert(
id=f"alert_{uuid.uuid4().hex[:12]}",
rule=rule,
state=AlertState.FIRING,
fired_at=time.time(),
last_value=value,
message=f"{rule.name}: {rule.metric_name} = {value} "
f"(threshold: {rule.comparison} {rule.threshold})",
dimensions=dimensions,
)
self._active_alerts[alert.id] = alert
self._notify(alert, alert.message)
return alert
def _notify(self, alert: Alert, message: str):
for channel in alert.rule.channels:
handler = self._channel_handlers.get(channel)
if handler:
try:
handler(alert, message)
except Exception:
pass
# 同时通过 GreenHelix 注册事件驱动告警
if self._client and alert.state == AlertState.FIRING:
try:
self._client.execute_tool("submit_metrics", {
"agent_id": self.agent_id,
"metrics": [{
"name": "alerts.fired",
"value": 1,
"dimensions": {
"rule": alert.rule.name,
"severity": alert.rule.severity,
"agent": self.agent_id,
},
}],
})
except Exception:
pass
def _check_resolutions(
self,
metric_name: str,
value: float,
dimensions: Dict[str, str],
):
"""当条件清除时自动解除告警。"""
with self._lock:
to_resolve = []
for alert_id, alert in self._active_alerts.items():
if alert.rule.metric_name != metric_name:
continue
if alert.rule.condition == "threshold":
if not self._check_threshold(alert.rule, value):
to_resolve.append(alert_id)
for alert_id in to_resolve:
alert = self._active_alerts[alert_id]
alert.state = AlertState.RESOLVED
alert.resolved_at = time.time()
self._alert_history.append(alert)
del self._active_alerts[alert_id]
self._notify(alert, f"RESOLVED: {alert.rule.name}")
def _escalation_loop(self):
"""定期检查需要升级的告警。"""
while self._running:
time.sleep(60)
now = time.time()
with self._lock:
for alert in self._active_alerts.values():
if alert.state != AlertState.FIRING:
continue
if alert.escalated:
continue
age_minutes = (now - alert.fired_at) / 60
if age_minutes >= alert.rule.escalation_minutes:
alert.escalated = True
self._notify(
alert,
f"ESCALATION: {alert.rule.name} 已持续触发 "
f"{age_minutes:.0f} 分钟且未被确认",
)
@staticmethod
def _dim_key(dimensions: Dict[str, str]) -> str:
if not dimensions:
return ""
return ",".join(f"{k}={v}" for k, v in sorted(dimensions.items()))# Agent 观测性栈:多智能体系统的分布式追踪、指标监控与告警
## 第八章:SLA 监控与成本归因
可观测性栈的最后一个重要组成部分是将技术指标与业务承诺关联起来。SLA 监控用于跟踪智能体是否满足其公开的服务水平协议,成本归因则用于追踪资金消耗情况,并评估每个智能体的盈利能力。
### SLA 定义与监控
针对智能体电商系统,典型的 SLA 包括以下指标:
- **可用性**:智能体在接收和处理请求方面的正常运行时间占比(目标:99.9%)
- **延迟**:标准操作的 p95 或 p99 响应时间(目标:p95 低于 500ms)
- **错误率**:失败交易占总交易的比例(目标:低于 0.1%)
- **结算时长**:从创建托管账户到完成结算的时间(目标:小于 24 小时)
---
## 实际部署配置示例
以下是针对智能体电商系统的实用配置:alert_manager = AlertManager(
agent_id="agent-marketplace",
client=client,
anomaly_detector=detector,
)
alert_manager.configure_webhook("https://alerts.myfleet.com/webhook")
alert_manager.configure_slack(
webhook_url="https://hooks.slack.com/services/T00/B00/xxx",
default_channel="#agent-alerts",
)
alert_manager.add_rule(AlertRule(
name="high_error_rate",
metric_name="transactions.errors",
condition="threshold",
threshold=0.05,
comparison="gt",
severity="high",
channels=[AlertChannel.SLACK, AlertChannel.WEBHOOK],
cooldown_minutes=15,
escalation_minutes=30,
description="交易错误率超过 5%",
runbook_url="https://wiki.myfleet.com/runbooks/high-error-rate",
))
alert_manager.add_rule(AlertRule(
name="latency_spike",
metric_name="transactions.duration_ms",
condition="threshold",
threshold=5000,
comparison="gt",
severity="medium",
channels=[AlertChannel.SLACK],
cooldown_minutes=10,
description="交易延迟超过 5 秒",
))
alert_manager.add_rule(AlertRule(
name="revenue_drop",
metric_name="revenue.total",
condition="rate_of_change",
threshold=0.30, # 30% 下降
severity="critical",
channels=[AlertChannel.SLACK, AlertChannel.PAGERDUTY, AlertChannel.WEBHOOK],
cooldown_minutes=60,
escalation_minutes=15,
description="连续 5 个周期内收入下降超过 30%",
))
alert_manager.add_rule(AlertRule(
name="escrow_anomaly",
metric_name="escrow.created",
condition="anomaly",
severity="medium",
channels=[AlertChannel.SLACK],
cooldown_minutes=30,
description="检测到异常的托管账户创建模式",
))
alert_manager.start()
alert_manager.evaluate("transactions.errors", 0.08, {"operation": "process_order"})
alert_manager.evaluate("transactions.duration_ms", 7500.0, {"operation": "process_order"})
去重逻辑可防止告警风暴:当同一规则在相同维度下再次触发时,系统会更新已有告警而非生成新通知。冷却期机制避免告警在恢复后过快重新触发。升级机制确保未被确认的告警在设定超时后自动升级至其他通知渠道。import time
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
@dataclass
class SLADefinition:
"""SLA 定义,包含目标值和度量参数。"""
name: str
metric_name: str
target_value: float
comparison: str # "lte"(延迟)、"gte"(可用性)、"lte"(错误率)
measurement_window: str # "1h"、"24h"、"7d"、"30d"
description: str = ""
@dataclass
class SLAStatus:
"""当前 SLA 的状态。"""
definition: SLADefinition
current_value: float
compliant: bool
compliance_percentage: float # 满足 SLA 的度量窗口占比(%)
last_violation: Optional[float] = None
violation_count: int = 0
class SLAMonitor:
"""用于监控代理商业操作的 SLA 合规性。"""
def __init__(
self,
agent_id: str,
metrics_collector: "MetricsCollector",
client=None,
):
self.agent_id = agent_id
self._metrics = metrics_collector
self._client = client
self._sla_definitions: Dict[str, SLADefinition] = {}
self._compliance_history: Dict[str, List[bool]] = defaultdict(list)
self._violation_callbacks: List = []
def define_sla(self, sla: SLADefinition):
"""注册一个 SLA 定义。"""
self._sla_definitions[sla.name] = sla
def on_violation(self, callback):
"""为 SLA 违规事件注册回调函数。"""
self._violation_callbacks.append(callback)
def check_sla(
self,
sla_name: str,
current_value: float,
) -> SLAStatus:
"""检查指标值是否满足 SLA 目标。"""
sla = self._sla_definitions.get(sla_name)
if not sla:
raise ValueError(f"未知的 SLA:{sla_name}")
ops = {
"lte": lambda v, t: v <= t,
"gte": lambda v, t: v >= t,
"lt": lambda v, t: v < t,
"gt": lambda v, t: v > t,
}
op = ops.get(sla.comparison, ops["lte"])
compliant = op(current_value, sla.target_value)
# 跟踪合规历史
self._compliance_history[sla_name].append(compliant)
# 计算合规百分比(最近 100 次检查)
history = self._compliance_history[sla_name][-100:]
compliance_pct = sum(1 for c in history if c) / len(history) * 100
status = SLAStatus(
definition=sla,
current_value=current_value,
compliant=compliant,
compliance_percentage=compliance_pct,
violation_count=sum(1 for c in history if not c),
)
if not compliant:
status.last_violation = time.time()
# 记录违规指标
self._metrics.increment(
"sla.violations",
1.0,
{"sla": sla_name, "agent": self.agent_id},
)
for callback in self._violation_callbacks:
try:
callback(status)
except Exception:
pass
# 始终记录合规百分比
self._metrics.gauge_set(
"sla.compliance_pct",
compliance_pct,
{"sla": sla_name, "agent": self.agent_id},
)
return status
def get_all_sla_status(self) -> Dict[str, SLAStatus]:
"""获取所有 SLA 的最新状态。"""
results = {}
for name in self._sla_definitions:
history = self._compliance_history.get(name, [])
if not history:
continue
last_100 = history[-100:]
compliance_pct = sum(1 for c in last_100 if c) / len(last_100) * 100
results[name] = SLAStatus(
definition=self._sla_definitions[name],
current_value=0.0, # 需要获取上次观测值
compliant=last_100[-1] if last_100 else True,
compliance_percentage=compliance_pct,
violation_count=sum(1 for c in last_100 if not c),
)
return results
### 代理级成本归因
在多代理集群中,你需要了解哪些代理是盈利的,哪些正在产生高额成本。成本归因机制将每一笔支出追溯到实际产生该支出的代理。@dataclass
class CostEntry:
"""单个由代理产生的成本条目。"""
agent_id: str
category: str
amount: float
currency: str
timestamp: float
transaction_id: str = ""
description: str = ""
class CostAttributor:
"""跟踪并分配代理集群中的各项成本。"""
def __init__(self, metrics_collector: "MetricsCollector"):
self._metrics = metrics_collector
self._costs: Dict[str, List[CostEntry]] = defaultdict(list)
self._budgets: Dict[str, float] = {}
self._budget_callbacks: List = []
def record_cost(
self,
agent_id: str,
category: str,
amount: float,
currency: str = "credits",
transaction_id: str = "",
description: str = "",
):
"""记录归属于某个代理的成本。"""
entry = CostEntry(
agent_id=agent_id,
category=category,
amount=amount,
currency=currency,
timestamp=time.time(),
transaction_id=transaction_id,
description=description,
)
self._costs[agent_id].append(entry)
self._metrics.record_cost(amount, category, currency)
self._metrics.increment(
"cost_attribution.total",
amount,
{"agent": agent_id, "category": category},
)
# 检查预算
if agent_id in self._budgets:
total = self.get_agent_total(agent_id)
budget = self._budgets[agent_id]
utilization = total / budget if budget > 0 else 0
self._metrics.gauge_set(
"budget.utilization_pct",
utilization * 100,
{"agent": agent_id},
)
if utilization >= 0.9:
for callback in self._budget_callbacks:
try:
callback(agent_id, total, budget, utilization)
except Exception:
pass
def set_budget(self, agent_id: str, budget: float):
"""为某个代理设置支出预算。"""
self._budgets[agent_id] = budget
def on_budget_warning(self, callback):
"""注册预算阈值警告的回调函数。"""
self._budget_callbacks.append(callback)
def get_agent_total(
self,
agent_id: str,
since: float = None,
) -> float:
"""获取自指定时间以来某个代理的总成本。"""
costs = self._costs.get(agent_id, [])
if since:
costs = [c for c in costs if c.timestamp >= since]
return sum(c.amount for c in costs)
def get_agent_breakdown(
self,
agent_id: str,
since: float = None,
) -> Dict[str, float]:
"""获取某个代理按类别划分的成本明细。"""
costs = self._costs.get(agent_id, [])
if since:
costs = [c for c in costs if c.timestamp >= since]
breakdown = defaultdict(float)
for cost in costs:
breakdown[cost.category] += cost.amount
return dict(breakdown)
def generate_monthly_report(
self,
month_start: float,
month_end: float,
) -> Dict[str, Any]:
"""生成月度成本归属报告。"""
report = {
"period_start": month_start,
"period_end": month_end,
"agents": {},
"total_cost": 0.0,
"top_categories": defaultdict(float),
}
for agent_id, costs in self._costs.items():
month_costs = [
c for c in costs
if month_start <= c.timestamp <= month_end
]
if not month_costs:
continue
agent_total = sum(c.amount for c in month_costs)
breakdown = defaultdict(float)
for cost in month_costs:
breakdown[cost.category] += cost.amount
report["top_categories"][cost.category] += cost.amount
budget = self._budgets.get(agent_id, 0)
report["agents"][agent_id] = {
"total_cost": agent_total,
"budget": budget,
"utilization_pct": (agent_total / budget * 100) if budget else 0,
"transaction_count": len(month_costs),
"breakdown": dict(breakdown),
}
report["total_cost"] += agent_total
report["top_categories"] = dict(
sorted(
report["top_categories"].items(),
key=lambda x: x[1],
reverse=True,
)
)
return report
### 将所有组件整合在一起
以下是生产环境中多代理部署中所有可观测性组件的集成方式:
markdown
# Agent 可观测性栈:多智能体系统的分布式追踪、指标监控与告警
版本:1.3.1
分块:17/17from greenhelix import GreenHelixClient
client = GreenHelixClient(api_key="your-api-key")
tracer = AgentTracer(agent_id="agent-marketplace", client=client)
metrics = MetricsCollector(agent_id="agent-marketplace", client=client)
detector = AnomalyDetector()
alert_manager = AlertManager(
agent_id="agent-marketplace", client=client, anomaly_detector=detector
)
sla_monitor = SLAMonitor(
agent_id="agent-marketplace", metrics_collector=metrics, client=client
)
cost_tracker = CostAttributor(metrics_collector=metrics)
sla_monitor.define_sla(SLADefinition(
name="availability",
metric_name="agent.availability_pct",
target_value=99.9,
comparison="gte",
measurement_window="24h",
))
sla_monitor.define_sla(SLADefinition(
name="latency_p95",
metric_name="transactions.duration_ms",
target_value=500,
comparison="lte",
measurement_window="1h",
))
cost_tracker.set_budget("agent-marketplace", budget=1000.0)
cost_tracker.set_budget("agent-data-provider", budget=500.0)
sla_monitor.on_violation(lambda status: alert_manager.evaluate(
status.definition.metric_name,
status.current_value,
{"sla": status.definition.name},
))
cost_tracker.on_budget_warning(
lambda agent, total, budget, util: alert_manager.evaluate(
"budget.utilization_pct",
util * 100,
{"agent": agent},
)
)
metrics.start()
alert_manager.start()
def handle_transaction(request):
with tracer.trace_agent_call(
request["target"], request["operation"]
) as span:
start = time.time()
result = process(request)
duration_ms = (time.time() - start) * 1000
# 指标记录
metrics.record_transaction(
operation=request["operation"],
duration_ms=duration_ms,
amount=request.get("amount", 0),
status="success" if result["ok"] else "error",
peer_agent=request["target"],
)
# 异常检测 + 告警
alert_manager.evaluate("transactions.duration_ms", duration_ms)
alert_manager.evaluate(
"transactions.errors",
0 if result["ok"] else 1,
)
# SLA 检查
sla_monitor.check_sla("latency_p95", duration_ms)
# 成本归因
cost_tracker.record_cost(
agent_id="agent-marketplace",
category="gateway_fee",
amount=result.get("fee", 0),
transaction_id=result.get("transaction_id", ""),
)
return result
此集成意味着,通过你智能体的每个事务都会自动生成用于调试的追踪数据、用于仪表盘的指标、用于自动发现问题的异常检测、用于通知运维人员的告警、用于合规报告的 SLA 跟踪,以及用于业务分析的成本归因。操作开销极低——每笔事务仅增加几毫秒的内存缓冲时间——但带来的可见性提升是根本性的。
---
## 下一步
本指南从底层开始构建可观测性栈:分布式追踪、指标采集、异常检测、仪表盘、告警机制和 SLA 监控。各组件可独立运行,但整合后价值最大化。
投入生产时,建议从 `AgentTracer` 和 `MetricsCollector` 开始。这两个组件能以最小成本提供即时可见性。待积累数天基线数据后,再引入 `AnomalyDetector`。使用保守阈值配置 `AlertManager`,随着对系统正常行为的理解加深,逐步收紧阈值。当拥有付费客户并需要合规报告时,再部署 `SLAMonitor` 和 `CostAttributor`。
本系列后续指南将涵盖智能体安全加固(防范对抗性智能体)、智能体市场经济学(定价策略与佣金结构)以及智能体扩展模式(应对百倍流量增长而无需重写架构)。这些内容均建立在本节所奠定的可观测性基础之上,因为无法看见的事物,就无法保障安全、合理定价或实现扩展。已收录 3 个 Skill