|
2019-05-17
# 启动 Kafka 生产者命令 bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic oceanus_advanced5_input --producer.config ../config/producer.properties
// 按顺序插入如下数据,注意这里数据时间是乱序的 {"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:16"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:30"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:50"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:59"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:43"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:09"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:01"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:50"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:15"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:50"} {"order_id":"10000","num":1,"event_time":"2021-12-22 14:31:15"}
-- 建表语句 CREATE TABLE `oceanus_advanced5_output` ( `window_start` datetime NOT NULL, `window_end` datetime NOT NULL, `num` int(11) DEFAULT NULL, PRIMARY KEY (`window_start`,`window_end`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE `kafka_json_source_table` ( `order_id` VARCHAR, `num` INT, `event_time` TIMESTAMP(3), -- 根据事件时间 `event_time` 设置 10s 的延迟水印 WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'oceanus_advanced5_input', -- 替换为您要消费的 Topic 'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种 'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID 'format' = 'json', 'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。 );2. 创建 Sink
CREATE TABLE `jdbc_upsert_sink_table` ( window_start TIMESTAMP(3), window_end TIMESTAMP(3), num INT, PRIMARY KEY(window_start,window_end) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数 'table-name' = 'oceanus_advanced5_output', -- 需要写入的数据表 'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限) 'password' = 'Yourpassword', -- 数据库访问的密码 'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数 'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔 );3. 编写业务 SQL
INSERT INTO `jdbc_upsert_sink_table` SELECT window_start,window_end,SUM(num) AS num FROM TABLE( -- Windowing TVF TUMBLE(TABLE `kafka_json_source_table`,DESCRIPTOR(event_time),INTERVAL '1' MINUTES) ) GROUP BY window_start,window_end;
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002