|
2019-05-17
# Kafka 客户端启动生产者命令 bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic oceanus_advanced3_input --producer.config ../config/producer.properties
// 数据格式 { "id": 1, "amount": 10, "times": "2021-10-01 10:00:00" }
# 下载 ClickHouse-Client 命令 wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpm wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm # 安装客户端 rpm -ivh *.rpm # 使用 tcp 端口登陆 ClickHouse 集群,IP 地址可通过控制台查看 clickhouse-client -hxx.xx.xx.xx --port 9000 -m
CREATE TABLE default.oceanus_advanced3_output1 on cluster default_cluster ( win_start TIMESTAMP, win_end TIMESTAMP, id Int8, amount_all Int16, Sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/oceanus_advanced3_output1', '{replica}',Sign) ORDER BY (win_start,win_end,id);
CREATE TABLE `kafka_json_source_table` ( `id` INT, `amount` INT, `times` TIMESTAMP(3), WATERMARK FOR times AS times - INTERVAL '3' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'oceanus_advanced3_input', -- 替换为您要消费的 Topic 'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种 'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID 'format' = 'json', 'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。 );2. 创建 Sink
CREATE TABLE `clickhouse_sink` ( `win_start` TIMESTAMP(3), `win_end` TIMESTAMP(3), `id` INT, `amount_all` INT, PRIMARY KEY (win_start,win_end,id) NOT ENFORCED ) WITH ( 'connector' = 'clickhouse', 'url' = 'clickhouse://10.0.0.178:8123', 'database-name' = 'default', 'table-name' = 'oceanus_advanced3_output1', 'table.collapsing.field' = 'Sign' -- CollapsingMergeTree 类型列字段的名称 );3. 编写业务 SQL
INSERT INTO clickhouse_sink SELECT HOP_START(times,INTERVAL '30' SECOND,INTERVAL '1' MINUTE) AS win_start, -- 滑动窗口的开始时间 HOP_END(times,INTERVAL '30' SECOND,INTERVAL '1' MINUTE) AS win_end, -- 滑动窗口的结束时间 id, SUM(amount) AS amount_all FROM kafka_json_source_table -- 这里使用滑动窗口函数和用户 id 进行分组聚合,统计了每分钟各用户的视频点击量,每30s更新一次。 GROUP BY HOP(times,INTERVAL '30' SECOND,INTERVAL '1' MINUTE),id;
ps:新版 Flink 1.13 集群 SQL 作业不需要用户自己选择内置 Connector 总结 HOP WINDOW(滑动窗口)将元素分配到固定长度的窗口中,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。HOP WINDOW(滑动窗口)保持窗口大小(Size:INTERVAL '1' MINUTE)不变,每次滑动指定的时间周期(Slide:INTERVAL '30' SECOND),因而允许窗口之间的相互重叠。 Slide 的大小决定了 Flink 创建新窗口的频率。 a.当 Slide 小于 Size 时,相邻窗口会重叠,一个时间会被分配到多个窗口。 b.当 Slide 大于 Size 时,可能会导致有些事件被丢弃。 c.当 Slide 等于 Size 时,等于是 TUMBLE WINDOW(滚动窗口)。 更多时间窗口函数请参考 Oceanus 官方文档 [9]。 参考链接 [1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview [2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298 [3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1 [4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839 [5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854 [6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840 [7] ClickHouse 控制台:https://console.cloud.tencent.com/cdwch?region=ap-guangzhou [8] ClickHouse 快速入门:https://cloud.tencent.com/document/product/1299/49824 [9] 时间窗口函数:https://cloud.tencent.com/document/product/849/18077
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002