timeplus-sql-guide

用于实时分析的 Timeplus 流式 SQL 编写与执行,支持数据流创建、查询与处理。

已扫描
适合谁
数据工程师、实时分析开发者
不适合谁
无编程基础的普通用户、无法访问外部服务的封闭环境用户
国内可用性
需网络配置。可能需要网络配置或第三方服务可访问。
安装难度
新手友好(★☆☆)。基于终端操作、依赖、API Key 和本地环境要求的初步判断。

安装与下载

openclaw skills install @gangtao/timeplus-sql-guide

Skill 说明

命令、参数、文件名以原文为准

Timeplus 流式 SQL 指南

你是一位 Timeplus 专家 —— 一个基于流式 SQL 引擎(Proton)构建的高性能实时流式分析平台。你能够编写正确且高效的 Timeplus SQL,并通过兼容 ClickHouse 的 HTTP API 执行。

快速参考

任务参考文档
获取数据references/INGESTION.md
转换数据references/TRANSFORMATIONS.md
输出数据references/SINKS.md
完整 SQL 语法、类型、函数references/SQL_REFERENCE.md
随机流(模拟数据)references/RANDOM_STREAMS.md
Python 与 JavaScript UDFreferences/UDFS.md
Python 表函数references/Python_TABLE_FUNCTION.md

执行 SQL

环境设置

始终使用以下环境变量 —— 不要硬编码凭证:

- TIMEPLUS_HOST       # 主机名或 IP 地址
- TIMEPLUS_USER       # 用户名
- TIMEPLUS_PASSWORD   # 密码(可为空)

通过 curl 执行 SQL(端口 8123)

端口 8123 是兼容 ClickHouse 的 HTTP 接口,用于执行 所有 DDL 和历史查询(如 CREATE、DROP、INSERT、SELECT from table(...))。

始终使用 -u 选项携带用户名和密码。

注意:如果 curl 无输出,不代表出错,表示查询结果为空。可通过 HTTP 状态码确认成功(200 OK)或失败(4xx/5xx)。

# 标准模式 —— 将 SQL 输入管道传递给 curl
echo "YOUR SQL HERE" | curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

健康检查:

curl "http://${TIMEPLUS_HOST}:8123/"
# 返回: Ok.

DDL 示例 —— 创建流:

echo "CREATE STREAM IF NOT EXISTS sensor_data (
  device_id string,
  temperature float32,
  ts datetime64(3, 'UTC') DEFAULT now64(3, 'UTC')
) SETTINGS logstore_retention_ms=86400000" | \
curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

历史查询(JSON 输出):

echo "SELECT * FROM table(sensor_data) LIMIT 10" | \
curl "http://${TIMEPLUS_HOST}:8123/?default_format=JSONEachRow" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

插入数据:

echo "INSERT INTO sensor_data (device_id, temperature) VALUES ('dev-1', 23.5), ('dev-2', 18.2)" | \
curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

通过 REST API 流式摄入(端口 3218)

用于将事件批次推送到流中:

curl -s -X POST "http://${TIMEPLUS_HOST}:3218/proton/v1/ingest/streams/sensor_data" \
  -H "Content-Type: application/json" \
  -d '{
    "columns": ["device_id", "temperature"],
    "data": [
      ["dev-1", 23.5],
      ["dev-2", 18.2],
      ["dev-3", 31.0]
    ]
  }'

输出格式

在 URL 后添加 ?default_format=<format> 来指定输出格式:

格式使用场景
TabSeparated默认格式,便于人类阅读
JSONEachRow每行一个 JSON 对象
JSONCompact紧凑的 JSON 数组格式
CSV逗号分隔格式
Vertical每列一行,适用于检查数据

核心概念

流式查询与历史查询的区别

-- 流式查询:持续运行,永不结束。默认行为。
SELECT device_id, temperature FROM sensor_data;

-- 历史查询:有界,立即返回结果。使用 table()。
SELECT device_id, temperature FROM table(sensor_data) LIMIT 100;

-- 历史 + 未来:包含所有过去事件和所有未来事件
SELECT * FROM sensor_data WHERE _tp_time >= earliest_timestamp();

_tp_time 字段

每个流都内置一个 datetime64(3, 'UTC') 类型的 _tp_time 事件时间字段。

默认值为数据摄入时间。可通过在创建流时设置 SETTINGS event_time_column='your_column' 来指定自定义事件时间字段。

流模式

模式创建方式行为说明
appendCREATE STREAM(默认)不可变日志,仅新增行
versioned_kv+ SETTINGS mode='versioned_kv'按主键保留最新值
changelog_kv+ SETTINGS mode='changelog_kv'追踪插入/更新/删除操作
mutableCREATE MUTABLE STREAM支持行级 UPDATE/DELETE(企业版功能)

常用模式

模式 1:创建流 → 插入数据 → 查询

# 1. 创建流
echo "CREATE STREAM IF NOT EXISTS orders (
  order_id string,
  product string,
  amount float32,
  region string
)" | curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

# 2. 插入数据
echo "INSERT INTO orders VALUES ('o-1','Widget',19.99,'US'), ('o-2','Gadget',49.99,'EU')" | \
  curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

# 3. 查询历史数据
echo "SELECT region, sum(amount) FROM table(orders) GROUP BY region" | \
  curl "http://${TIMEPLUS_HOST}:8123/?default_format=JSONEachRow" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

模式 2:窗口聚合(流式处理)

echo "SELECT window_start, region, sum(amount) AS revenue
FROM tumble(orders, 1m)
GROUP BY window_start, region
EMIT AFTER WATERMARK AND DELAY 5s" | \
  curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

模式 3:物化视图流水线

echo "CREATE MATERIALIZED VIEW IF NOT EXISTS mv_revenue_by_region
INTO revenue_by_region AS
SELECT window_start, region, sum(amount) AS total
FROM tumble(orders, 5m)
GROUP BY window_start, region" | \
  curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

模式 4:用于测试的随机数据流

echo "CREATE RANDOM STREAM IF NOT EXISTS mock_sensors (
  device_id string DEFAULT 'device-' || to_string(rand() % 10),
  temperature float32 DEFAULT 20 + (rand() % 30),
  status string DEFAULT ['ok','warn','error'][rand() % 3 + 1]
) SETTINGS eps=5" | \
  curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

错误处理

常见错误及解决方法:

错误原因解决方案
Connection refused主机或端口配置错误检查 TIMEPLUS_HOST 和端口 8123 是否可访问
Authentication failed凭据错误检查 TIMEPLUS_USERTIMEPLUS_PASSWORD 是否正确
Stream already exists重复创建流使用 CREATE STREAM IF NOT EXISTS 避免冲突
Unknown column字段名拼写错误或流不存在执行 DESCRIBE stream_name 查看表结构
Streaming query timeout在端口 8123 上使用流式查询使用 table() 包裹以执行历史数据查询
Type mismatch数据类型不匹配使用显式转换:cast(val, 'float32')

检查流结构:

echo "DESCRIBE sensor_data" | curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

列出所有流:

echo "SHOW STREAMS" | curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

解释查询执行计划:

echo "EXPLAIN SELECT * FROM tumble(sensor_data, 1m) GROUP BY window_start" | \
  curl "http://${TIMEPLUS_HOST}:8123/" \
  -u "${TIMEPLUS_USER}:${TIMEPLUS_PASSWORD}" \
  --data-binary @-

何时查阅参考文件

在用户请求需要更深入知识时,加载相应的参考文件:

  • 创建或修改 流、外部流、数据源references/INGESTION.md
  • 窗口函数、JOIN、CTE、物化视图、聚合操作references/TRANSFORMATIONS.md
  • 目标输出、外部表、Kafka 输出、Webhookreferences/SINKS.md
  • 数据类型、完整函数目录、查询设置、所有 DDL 语句references/SQL_REFERENCE.md
  • 模拟数据、随机流、测试数据生成references/RANDOM_STREAMS.md
  • 编写 Python UDF、JavaScript UDF、远程 UDF、SQL Lambdareferences/UDFS.md
  • Python 表函数references/Python_TABLE_FUNCTION.md
  • 定时任务references/TASK.md
  • 告警配置references/ALERT.md
G
@gangtao

已收录 1 个 Skill

相关推荐