Flink 实践教程:进阶5-乱序调整

| 2019-05-17


前置准备

创建流计算 Oceanus 集群
 
进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。
 
创建消息队列 CKafka
进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。
 
创建 Topic:  进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。
 
数据准备:  进入同子网的 CVM 下,启动 Kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端 [6]。
# 启动 Kafka 生产者命令
bash kafka-console-producer.sh --broker-list 10.0.0.29:9092 --topic oceanus_advanced5_input --producer.config ../config/producer.properties
// 按顺序插入如下数据,注意这里数据时间是乱序的
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:16"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:30"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:50"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:59"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:43"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:09"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:01"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:29:50"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:15"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:30:50"}
{"order_id":"10000","num":1,"event_time":"2021-12-22 14:31:15"}
创建 MySQL 实例
进入 MySQL 控制台 [7],点击【新建】。具体可参考官方文档 创建 MySQL 实例 [8]。
-- 建表语句
CREATE TABLE `oceanus_advanced5_output` (
  `window_start`   datetime NOT NULL,
  `window_end`     datetime NOT NULL,
  `num`            int(11) DEFAULT NULL,
  PRIMARY KEY (`window_start`,`window_end`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
流计算 Oceanus 作业
1. 创建 Source
CREATE TABLE `kafka_json_source_table` (
  `order_id`    VARCHAR,
  `num`         INT,
  `event_time`  TIMESTAMP(3),
  -- 根据事件时间 `event_time` 设置 10s 的延迟水印
  WATERMARK FOR event_time AS event_time - INTERVAL '10' SECOND 
) WITH (
  'connector' = 'kafka',
  'topic' = 'oceanus_advanced5_input',     -- 替换为您要消费的 Topic
  'scan.startup.mode' = 'latest-offset',   -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种
  'properties.bootstrap.servers' = '10.0.0.29:9092',  -- 替换为您的 Kafka 连接地址
  'properties.group.id' = 'testGroup',     -- 必选参数, 一定要指定 Group ID
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',  -- 如果设置为 false, 则遇到缺失字段不会报错。
  'json.ignore-parse-errors' = 'true'      -- 如果设置为 true,则忽略任何解析报错。
);
2. 创建 Sink
CREATE TABLE `jdbc_upsert_sink_table` (
    window_start  TIMESTAMP(3),
    window_end    TIMESTAMP(3),
    num           INT,
    PRIMARY KEY(window_start,window_end) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',         -- 请替换为您的实际 MySQL 连接参数
    'table-name' = 'oceanus_advanced5_output',  -- 需要写入的数据表
    'username' = 'root',                        -- 数据库访问的用户名(需要提供 INSERT 权限)
    'password' = 'Yourpassword',                 -- 数据库访问的密码
    'sink.buffer-flush.max-rows' = '200',       -- 批量输出的条数
    'sink.buffer-flush.interval' = '2s'         -- 批量输出的间隔
);
3. 编写业务 SQL
INSERT INTO `jdbc_upsert_sink_table`
SELECT
window_start,window_end,SUM(num) AS num
FROM TABLE(
    -- Windowing TVF
    TUMBLE(TABLE `kafka_json_source_table`,DESCRIPTOR(event_time),INTERVAL '1' MINUTES)
) GROUP BY window_start,window_end;
4. 查询数据
进入 MySQL 控制台 [7],单击右侧【登陆】快速登陆数据库,选择相应的库表查询数据。

MySQL 查询数据
总结
a.  WARTERMARK是跟随在每条数据上的一条特殊标签,而且只增不减(可以相等)。WARTERMARK并不能影响数据出现在哪个窗口(本例中由event_time决定),其主要决定窗口是否关闭(当水印时间大于窗口结束时间时,窗口关闭并计算)。
b.  如果数据延时过大,例如小时级别,可以配合allowedLateness算子合理性使用WARTERMARK,当达到水印结束时间时,窗口并不关闭,只进行计算操作,当时间到达allowedLateness算子设置的时间后,窗口才真正关闭,并在原先的基础上再次进行计算。如在allowedLateness算子设置的时间后才达到的数据,我们可以使用sideOutputLateData算子将迟到的数据输出到侧输出流进行计算。这里需要注意allowedLateness和sideOutputLateData算子目前只能使用 Stream API 实现。
c.  目前 flink 1.13 的 Windowing TVF 函数并不能单独使用,需配合AGGREGATE、JOIN、TOPN使用。建议优先使用 Windowing TVF 实现窗口聚合等功能,因为 Windowing TVF 更符合 SQL 书写规范,底层优化逻辑也更好。

 
参考链接
[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview  
[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298  
[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1  
[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839  
[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854  
[6] 运行 Kafka 客户端:https://cloud.tencent.com/document/product/597/56840  
[7] MySQL 控制台:https://console.cloud.tencent.com/cdb  
[8] 创建 MySQL 实例:https://cloud.tencent.com/document/product/236/46433 

编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处

在线客服

微信扫一扫咨询客服


全国免费服务热线
0755-36300002

返回顶部