Ot Aiops
支持多协议工业设备数据采集与智能诊断,具备高风险写入防护机制。
用于企业数据提取、清洗与加载的自动化流水线工具。
openclaw skills install @samledger67-dotcom/data-pipeline-agent命令、参数、文件名以原文为准
构建、运行并监控业务数据的 ETL(提取 → 转换 → 加载)管道。专注于财务数据流、API 集成以及会计与运营团队常用的数据仓库加载模式。
# 从 REST API 提取数据,清理后输出为 CSV
import requests, pandas as pd, json
from datetime import datetime, timedelta
def extract(api_url, headers, params=None):
"""从任意 REST 接口提取分页的 JSON 数据。"""
results = []
while api_url:
r = requests.get(api_url, headers=headers, params=params)
r.raise_for_status()
data = r.json()
results.extend(data.get("data", data if isinstance(data, list) else [data]))
api_url = data.get("next_page_url") # 分页处理
params = None # 仅首次调用时传递 params
return results
def clean(records, rename_map=None, drop_nulls_on=None, date_cols=None):
"""标准化、重命名、解析日期、去除空值。"""
df = pd.DataFrame(records)
if rename_map:
df = df.rename(columns=rename_map)
if date_cols:
for col in date_cols:
df[col] = pd.to_datetime(df[col], errors="coerce")
if drop_nulls_on:
df = df.dropna(subset=drop_nulls_on)
df = df.drop_duplicates()
return df
def load_csv(df, output_path):
df.to_csv(output_path, index=False)
print(f"✅ 已保存 {len(df)} 行 → {output_path}")
# 示例:QBO 发票数据提取
HEADERS = {"Authorization": "Bearer <TOKEN>", "Accept": "application/json"}
records = extract("https://quickbooks.api.intuit.com/v3/company/<REALM>/query?query=SELECT * FROM Invoice", HEADERS)
df = clean(records, rename_map={"TxnDate": "invoice_date", "TotalAmt": "amount"}, date_cols=["invoice_date"])
load_csv(df, f"data/invoices_{datetime.today().date()}.csv")import pandas as pd
def merge_gl_with_bank(gl_path, bank_path, match_on="amount", date_tolerance_days=3):
"""
将总账条目与银行交易进行匹配。
标记未匹配的行以供人工审查。
"""
gl = pd.read_csv(gl_path, parse_dates=["date"])
bank = pd.read_csv(bank_path, parse_dates=["date"])
# 基于金额 + 日期接近度进行合并
merged = pd.merge_asof(
gl.sort_values("date"),
bank.sort_values("date"),
on="date",
by=match_on,
tolerance=pd.Timedelta(days=date_tolerance_days),
direction="nearest",
suffixes=("_gl", "_bank")
)
unmatched_gl = gl[~gl.index.isin(merged.dropna(subset=["date_bank"]).index)]
unmatched_bank = bank[~bank.index.isin(merged.dropna(subset=["date_gl"]).index)]
print(f"✅ 匹配成功: {len(merged.dropna())} | ⚠️ 未匹配总账: {len(unmatched_gl)} | 银行记录: {len(unmatched_bank)}")
return merged, unmatched_gl, unmatched_bankimport pandas as pd
def audit_dataset(df, required_cols=None, expected_types=None):
"""
执行数据质量检查。返回报告字典。
"""
report = {
"row_count": len(df),
"duplicate_rows": int(df.duplicated().sum()),
"null_summary": df.isnull().sum().to_dict(),
"issues": []
}
if required_cols:
missing = [c for c in required_cols if c not in df.columns]
if missing:
report["issues"].append(f"缺少必需字段: {missing}")
if expected_types:
for col, dtype in expected_types.items():
if col in df.columns and not pd.api.types.is_dtype_equal(df[col].dtype, dtype):
report["issues"].append(f"{col}: 期望类型 {dtype}, 实际类型 {df[col].dtype}")
# 标记空值占比超过 20% 的列
for col, nulls in report["null_summary"].items():
pct = nulls / len(df) * 100
if pct > 20:
report["issues"].append(f"{col}: {pct:.1f}% 为空 — 需要复查")
return report
# 使用示例
df = pd.read_csv("data/ar_aging.csv")
report = audit_dataset(
df,
required_cols=["customer_id", "invoice_date", "amount", "due_date"],
expected_types={"amount": "float64", "customer_id": "object"}
)
print(report)#!/bin/bash
# daily-gl-sync.sh — 通过 cron 或 OpenClaw cron 工具执行
# 提取总账数据,清洗后加载至 SQLite,出错时发送通知
set -euo pipefail
LOG="logs/gl-sync-$(date +%Y-%m-%d).log"
mkdir -p logs data
echo "[$(date)] 开始总账同步..." | tee -a "$LOG"
python3 pipelines/gl_extract.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_clean.py >> "$LOG" 2>&1 && \
python3 pipelines/gl_load.py >> "$LOG" 2>&1 && \
echo "[$(date)] ✅ 总账同步完成" | tee -a "$LOG" || \
echo "[$(date)] ❌ 总账同步失败 — 请查看 $LOG" | tee -a "$LOG"markdown
版本: 98.0.1
分块: 2/2
import pandas as pd
import sqlite3
def load_to_sqlite(df, db_path, table_name, if_exists="replace"):
"""
将 DataFrame 加载到 SQLite 数据库中。使用 if_exists='append' 可实现增量加载。
"""
conn = sqlite3.connect(db_path)
df.to_sql(table_name, conn, if_exists=if_exists, index=False)
conn.close()
print(f"✅ 已加载 {len(df)} 行 → {db_path}::{table_name}")
# PostgreSQL 版本(需要 psycopg2 + sqlalchemy)
from sqlalchemy import create_engine
def load_to_postgres(df, conn_str, table_name, schema="public", if_exists="replace"):
engine = create_engine(conn_str)
df.to_sql(table_name, engine, schema=schema, if_exists=if_exists, index=False)
print(f"✅ 已加载 {len(df)} 行 → {schema}.{table_name}")1. 提取:QBO 发票 API → 原始 JSON
2. 转换:计算逾期天数,划分账龄区间(0-30、31-60、61-90、90+)
3. 补充:关联客户联系信息
4. 加载:Google Sheets "应收账款账龄" 表 + SQLite 历史存档
5. 警报:标记逾期超过 60 天的发票至跟进队列1. 提取:银行 API(Plaid/CSV 导出) + QBO 总账数据
2. 转换:统一日期、金额、备注字段格式
3. 匹配:基于金额 + 日期进行模糊匹配(±3 天容差)
4. 标记:未匹配交易 → 生成需人工审核的 CSV 文件
5. 加载:对账日志 → SQLite 存储 + 邮件摘要通知1. 提取:工资系统 CSV 导出(Gusto、ADP 等)
2. 转换:将工资代码映射为总账科目编号
3. 验证:核对总额是否与工资单一致
4. 加载:生成凭证模板 → QBO 批量导入格式
5. 归档:原始文件与转换后文件保存至按日期命名的文件夹在构建任何流水线前,请确认以下事项:
| 问题 | 解决方案 |
|---|---|
| 日期格式混杂 | pd.to_datetime(col, infer_datetime_format=True) |
| 货币字符串("$1,234.56") | col.str.replace(r'[$,]', '', regex=True).astype(float) |
| 重复行 | df.drop_duplicates(subset=['id']) |
| 金额为空 | df['amount'].fillna(0) 或 df.dropna(subset=['amount']) |
| 大小写不一致 | df['name'].str.strip().str.title() |
| 首尾空白字符 | df.apply(lambda x: x.str.strip() if x.dtype == 'object' else x) |
| 异常值检测 | df[df['amount'].between(df['amount'].quantile(.01), df['amount'].quantile(.99))] |
# 每日格林威治标准时间 6 点(芝加哥时间)
Schedule: cron "0 6 * * *" tz=America/Chicago
Payload: agentTurn — "在 ~/workspace/pipelines/ 中运行每日 GL 同步流水线"
Delivery: 流水线完成或失败时通过 Telegram 发送通知安装 Python 数据处理相关包:
pip install pandas requests sqlalchemy psycopg2-binary openpyxl xlrd
# Google Sheets 支持
pip install gspread gspread-dataframe google-auth
# Plaid 银行数据支持
pip install plaid-pythonworkspace/
pipelines/
gl_extract.py
gl_clean.py
gl_load.py
ar_aging.py
bank_reconcile.py
data/
raw/ ← API 响应、CSV 导入文件(从不手动修改)
processed/ ← 清洗并转换后的数据
archive/ ← 按日期命名的历史快照
logs/
pipeline-YYYY-MM-DD.log
scripts/
run-daily-pipelines.sh已收录 1 个 Skill