|
2019-05-17
// 数据格式 { "id": 1, "message": "流计算 Oceanus 1元限量秒杀活动", "userInfo": { "name": "张三", "phone": ["12345678910", "8547942"] }, "companyInfo": { "name": "Tencent", "address": "深圳市腾讯大厦" } }
-- 建表语句 CREATE TABLE `oceanus_advanced2` ( `id` int (100) NOT NULL, `message` varchar (100) NULL DEFAULT '', `name` varchar (50) NULL DEFAULT '', `phone` varchar (11) NULL DEFAULT '', `company_name` varchar (100) NULL DEFAULT '', `company_address` varchar (100) NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE = innodb
CREATE TABLE `kafka_json_source_table` ( `id` INT, `message` STRING, `userInfo` ROW<`name` STRING,`phone` ARRAY<STRING>>, -- 采用 ROW 嵌套 ARRAY 格式接收 JSON 字段 `companyInfo` MAP<STRING,STRING> -- 采用 MAP 格式接收 JSON 字段 ) WITH ( 'connector' = 'kafka', 'topic' = 'oceanus_advanced2', -- 替换为您要消费的 Topic 'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset/earliest-offset/specific-offsets/group-offsets/timestamp 的任何一种 'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID 'format' = 'json', -- 定义 JSON 格式,部分其他格式可能不支持抽取平铺 'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。 );
CREATE TABLE `jdbc_upsert_sink_table` ( `id` INT, `message` STRING, `name` STRING, `phone` STRING, `company_name` STRING, `company_address` STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://10.0.0.158:3306/testdb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数 'table-name' = 'oceanus_advanced2', -- 需要写入的数据表 'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限) 'password' = 'Tencent123$', -- 数据库访问的密码 'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数 'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔 );
INSERT INTO `jdbc_upsert_sink_table` SELECT id AS id, message AS message, userInfo.name AS name, -- 获取 Row 中成员采用.成员的方式 userInfo.phone[1] AS phone, -- 获取 Array 中成员采用 [数组下标] 的方式 companyInfo['name'] AS company_name, -- 获取 Map 中成员采用 ['属性名'] 的方式 companyInfo['address'] AS company_address FROM `kafka_json_source_table`;
注:新版 Flink 1.13 集群无需用户选择内置 Connector,平台自动匹配获取
[9] 内置运算符和函数:https://cloud.tencent.com/document/product/849/18083
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002