Data Pipeline Agent

用于企业数据提取、清洗与加载的自动化流水线工具。

已扫描
适合谁
财务分析师、运营数据专员
不适合谁
需要实时流处理的用户、仅需手动导出数据的用户
国内可用性
需网络配置。可能需要网络配置或第三方服务可访问。
安装难度
新手友好(★☆☆)。基于终端操作、依赖、API Key 和本地环境要求的初步判断。

安装与下载

openclaw skills install @samledger67-dotcom/data-pipeline-agent

Skill 说明

命令、参数、文件名以原文为准

Data Pipeline Agent

构建、运行并监控业务数据的 ETL(提取 → 转换 → 加载)管道。专注于财务数据流、API 集成以及会计与运营团队常用的数据仓库加载模式。

何时使用

  • 从 API 提取数据(如 QBO、Stripe、Salesforce、银行流水等)
  • 清理和标准化杂乱的电子表格或 CSV 导出文件
  • 将多个数据源的数据合并为一个统一的数据集
  • 将转换后的数据加载到数据库、数据仓库或 Google Sheets
  • 定期调度数据同步任务(如每日总账拉取、每周应收账款账龄更新等)
  • 数据质量审计 — 检测空值、重复项、类型不匹配等问题

不应使用的情况

  • 实时流处理 — 若需亚秒级延迟,请使用 Kafka、Kinesis 或 Pub/Sub
  • 交互式仪表板 — 此代理输出数据;可视化应由 BI 工具完成
  • 原始 SQL 查询优化 — 查询计划与索引优化请使用 DBA 工具
  • 一次性手动导出 — 若仅发生一次,直接下载 CSV 即可
  • 对客户端系统的写入操作 — 默认仅支持只读提取,除非 Irfan 批准写入权限

管道模式

模式 1:API 提取 → 清理 → CSV

# 从 REST API 提取数据,清理后输出为 CSV
import requests, pandas as pd, json
from datetime import datetime, timedelta

def extract(api_url, headers, params=None):
    """从任意 REST 接口提取分页的 JSON 数据。"""
    results = []
    while api_url:
        r = requests.get(api_url, headers=headers, params=params)
        r.raise_for_status()
        data = r.json()
        results.extend(data.get("data", data if isinstance(data, list) else [data]))
        api_url = data.get("next_page_url")  # 分页处理
        params = None  # 仅首次调用时传递 params
    return results

def clean(records, rename_map=None, drop_nulls_on=None, date_cols=None):
    """标准化、重命名、解析日期、去除空值。"""
    df = pd.DataFrame(records)
    if rename_map:
        df = df.rename(columns=rename_map)
    if date_cols:
        for col in date_cols:
            df[col] = pd.to_datetime(df[col], errors="coerce")
    if drop_nulls_on:
        df = df.dropna(subset=drop_nulls_on)
    df = df.drop_duplicates()
    return df

def load_csv(df, output_path):
    df.to_csv(output_path, index=False)
    print(f"✅ 已保存 {len(df)} 行 → {output_path}")

# 示例:QBO 发票数据提取
HEADERS = {"Authorization": "Bearer <TOKEN>", "Accept": "application/json"}
records = extract("https://quickbooks.api.intuit.com/v3/company/<REALM>/query?query=SELECT * FROM Invoice", HEADERS)
df = clean(records, rename_map={"TxnDate": "invoice_date", "TotalAmt": "amount"}, date_cols=["invoice_date"])
load_csv(df, f"data/invoices_{datetime.today().date()}.csv")

模式 2:多源数据合并

import pandas as pd

def merge_gl_with_bank(gl_path, bank_path, match_on="amount", date_tolerance_days=3):
    """
    将总账条目与银行交易进行匹配。
    标记未匹配的行以供人工审查。
    """
    gl = pd.read_csv(gl_path, parse_dates=["date"])
    bank = pd.read_csv(bank_path, parse_dates=["date"])

    # 基于金额 + 日期接近度进行合并
    merged = pd.merge_asof(
        gl.sort_values("date"),
        bank.sort_values("date"),
        on="date",
        by=match_on,
        tolerance=pd.Timedelta(days=date_tolerance_days),
        direction="nearest",
        suffixes=("_gl", "_bank")
    )

    unmatched_gl = gl[~gl.index.isin(merged.dropna(subset=["date_bank"]).index)]
    unmatched_bank = bank[~bank.index.isin(merged.dropna(subset=["date_gl"]).index)]

    print(f"✅ 匹配成功: {len(merged.dropna())} | ⚠️ 未匹配总账: {len(unmatched_gl)} | 银行记录: {len(unmatched_bank)}")
    return merged, unmatched_gl, unmatched_bank

模式 3:数据质量审计

import pandas as pd

def audit_dataset(df, required_cols=None, expected_types=None):
    """
    执行数据质量检查。返回报告字典。
    """
    report = {
        "row_count": len(df),
        "duplicate_rows": int(df.duplicated().sum()),
        "null_summary": df.isnull().sum().to_dict(),
        "issues": []
    }

    if required_cols:
        missing = [c for c in required_cols if c not in df.columns]
        if missing:
            report["issues"].append(f"缺少必需字段: {missing}")

    if expected_types:
        for col, dtype in expected_types.items():
            if col in df.columns and not pd.api.types.is_dtype_equal(df[col].dtype, dtype):
                report["issues"].append(f"{col}: 期望类型 {dtype}, 实际类型 {df[col].dtype}")

    # 标记空值占比超过 20% 的列
    for col, nulls in report["null_summary"].items():
        pct = nulls / len(df) * 100
        if pct > 20:
            report["issues"].append(f"{col}: {pct:.1f}% 为空 — 需要复查")

    return report

# 使用示例
df = pd.read_csv("data/ar_aging.csv")
report = audit_dataset(
    df,
    required_cols=["customer_id", "invoice_date", "amount", "due_date"],
    expected_types={"amount": "float64", "customer_id": "object"}
)
print(report)

模式 4:定时 Cron 管道

#!/bin/bash
# daily-gl-sync.sh — 通过 cron 或 OpenClaw cron 工具执行
# 提取总账数据,清洗后加载至 SQLite,出错时发送通知

set -euo pipefail
LOG="logs/gl-sync-$(date +%Y-%m-%d).log"
mkdir -p logs data

echo "[$(date)] 开始总账同步..." | tee -a "$LOG"

python3 pipelines/gl_extract.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_clean.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_load.py >> "$LOG" 2>&1 && \
echo "[$(date)] ✅ 总账同步完成" | tee -a "$LOG" || \
echo "[$(date)] ❌ 总账同步失败 — 请查看 $LOG" | tee -a "$LOG"

模式 5:加载至 SQLite / PostgreSQL

markdown

Data Pipeline Agent

版本: 98.0.1

分块: 2/2

import pandas as pd
import sqlite3

def load_to_sqlite(df, db_path, table_name, if_exists="replace"):
    """
    将 DataFrame 加载到 SQLite 数据库中。使用 if_exists='append' 可实现增量加载。
    """
    conn = sqlite3.connect(db_path)
    df.to_sql(table_name, conn, if_exists=if_exists, index=False)
    conn.close()
    print(f"✅ 已加载 {len(df)} 行 → {db_path}::{table_name}")

# PostgreSQL 版本(需要 psycopg2 + sqlalchemy)
from sqlalchemy import create_engine

def load_to_postgres(df, conn_str, table_name, schema="public", if_exists="replace"):
    engine = create_engine(conn_str)
    df.to_sql(table_name, engine, schema=schema, if_exists=if_exists, index=False)
    print(f"✅ 已加载 {len(df)} 行 → {schema}.{table_name}")

常见业务数据流水线

应收账款账龄刷新流水线

1. 提取:QBO 发票 API → 原始 JSON
2. 转换:计算逾期天数,划分账龄区间(0-30、31-60、61-90、90+)
3. 补充:关联客户联系信息
4. 加载:Google Sheets "应收账款账龄" 表 + SQLite 历史存档
5. 警报:标记逾期超过 60 天的发票至跟进队列

银行流水对账流水线

1. 提取:银行 API(Plaid/CSV 导出) + QBO 总账数据
2. 转换:统一日期、金额、备注字段格式
3. 匹配:基于金额 + 日期进行模糊匹配(±3 天容差)
4. 标记:未匹配交易 → 生成需人工审核的 CSV 文件
5. 加载:对账日志 → SQLite 存储 + 邮件摘要通知

工资单 → 总账科目映射流水线

1. 提取:工资系统 CSV 导出(Gusto、ADP 等)
2. 转换:将工资代码映射为总账科目编号
3. 验证:核对总额是否与工资单一致
4. 加载:生成凭证模板 → QBO 批量导入格式
5. 归档:原始文件与转换后文件保存至按日期命名的文件夹

流水线设计检查清单

在构建任何流水线前,请确认以下事项:

  • [ ] 幂等性 — 流水线能否重复运行而不产生数据重复?
  • [ ] 错误处理 — 若 API 不可用?部分加载失败如何应对?
  • [ ] 日志记录 — 每一步是否包含时间戳的日志?
  • [ ] 数据质量 — 是否能捕获空值、重复项和类型不匹配?
  • [ ] 可回滚性 — 若出错,能否撤销已加载的数据?
  • [ ] 速率限制 — 来源 API 是否有调用频率限制?是否添加重试机制?
  • [ ] 密钥安全 — API 密钥是否存储在环境变量中,而非硬编码?
  • [ ] 调度安排 — 该流水线多久运行一次?由谁监控?

数据清洗速查表

问题解决方案
日期格式混杂pd.to_datetime(col, infer_datetime_format=True)
货币字符串("$1,234.56")col.str.replace(r'[$,]', '', regex=True).astype(float)
重复行df.drop_duplicates(subset=['id'])
金额为空df['amount'].fillna(0)df.dropna(subset=['amount'])
大小写不一致df['name'].str.strip().str.title()
首尾空白字符df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x)
异常值检测df[df['amount'].between(df['amount'].quantile(.01), df['amount'].quantile(.99))]

使用 OpenClaw Cron 进行调度

# 每日格林威治标准时间 6 点(芝加哥时间)
Schedule: cron "0 6 * * *" tz=America/Chicago
Payload: agentTurn — "在 ~/workspace/pipelines/ 中运行每日 GL 同步流水线"
Delivery: 流水线完成或失败时通过 Telegram 发送通知

依赖项安装

安装 Python 数据处理相关包:

pip install pandas requests sqlalchemy psycopg2-binary openpyxl xlrd
# Google Sheets 支持
pip install gspread gspread-dataframe google-auth
# Plaid 银行数据支持
pip install plaid-python

文件目录结构

workspace/
  pipelines/
    gl_extract.py
    gl_clean.py
    gl_load.py
    ar_aging.py
    bank_reconcile.py
  data/
    raw/          ← API 响应、CSV 导入文件(从不手动修改)
    processed/    ← 清洗并转换后的数据
    archive/      ← 按日期命名的历史快照
  logs/
    pipeline-YYYY-MM-DD.log
  scripts/
    run-daily-pipelines.sh

安全规则

  1. 提取阶段始终只读 — 在提取过程中不得向源系统写入数据。
  2. 保留原始数据归档 — 在任何转换前,必须先备份原始数据。
  3. 验证行数 — 每次转换前后均需核对数据行数。
  4. 先用样本测试 — 全量运行前,务必在 10-100 行样本数据上测试。
  5. 客户端系统写入需 Irfan 审批 — 默认情况下,QBO、银行 API、工资系统均为只读提取。
  6. 禁止硬编码凭证 — 使用环境变量或 1Password CLI 管理密钥。
SD
@samledger67-dotcom

已收录 1 个 Skill

相关推荐