|
2019-05-17
-- 建表语句 CREATE TABLE `oceanus_advanced1_student_grade` ( `name` varchar(50) NOT NULL DEFAULT '', `grade` int(3) DEFAULT NULL, PRIMARY KEY (`name`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 -- 数据插入 INSERT INTO `oceanus_advanced1_student_grade` (`name`, `grade`) VALUES ('Oceanus-1', 85); INSERT INTO `oceanus_advanced1_student_grade` (`name`, `grade`) VALUES ('Oceanus-2', 95)
# 进入 HBase 命令 root@yourhostname~# hbase shell
-- 建表语句 create 'oceanus_advanced1_student_info','StuInfo' -- 数据插入 put 'oceanus_advanced1_student_info','Oceanus-1','StuInfo:Class','01' put 'oceanus_advanced1_student_info','Oceanus-1','StuInfo:Age','17' put 'oceanus_advanced1_student_info','Oceanus-2','StuInfo:Class','01' put 'oceanus_advanced1_student_info','Oceanus-2','StuInfo:Age','20' put 'oceanus_advanced1_student_info','Oceanus-3','StuInfo:Class','01' put 'oceanus_advanced1_student_info','Oceanus-3','StuInfo:Age','18'
CREATE TABLE `mysql_cdc_source_table` ( `name` STRING, `grade` STRING, `proc_time` AS PROCTIME(), -- 这里 proc_time 字段配合下面流维 join 时使用。 PRIMARY KEY (`name`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义 ) WITH ( 'connector' = 'mysql-cdc', -- 固定值 'mysql-cdc' 'hostname' = '10.0.0.158', -- 数据库的 IP 'port' = '3306', -- 数据库的访问端口 'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT、SELECT 和 RELOAD 权限) 'password' = 'Tencent123$', -- 数据库访问的密码 'database-name' = 'testdb', -- 需要同步的数据库 'table-name' = 'oceanus_advanced1_student_grade' -- 需要同步的数据表名 );
CREATE TABLE hbase_table ( rowkey STRING, StuInfo ROW <Class STRING,Age STRING>, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'hbase-1.4', -- Flink 1.13 支持 hbase-2.2 'table-name' = 'oceanus_advanced1_student_info', -- HBase 表名 'zookeeper.quorum' = '10.0.0.118:2181,10.0.0.119:2181,10.0.0.3:2181' -- HBase 的 zookeeper 地址 );
CREATE TABLE elasticsearch6_sink_table ( `class` STRING, `amount` BIGINT, PRIMARY KEY(`class`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-6', -- 输出到 Elasticsearch 6 'username' = 'elastic', -- 选填 用户名 'password' = 'Tencent123$', -- 选填 密码 'hosts' = 'http://10.0.0.97:9200', -- Elasticsearch 的连接地址 'index' = 'oceanus_advanced1', -- Elasticsearch 的 Index 名 'document-type' = '_doc', -- Elasticsearch 的 Document 类型 'sink.bulk-flush.max-actions' = '1', -- 每条数据都刷新 'format' = 'json' -- 输出数据格式,目前只支持 'json' );
INSERT INTO elasticsearch6_sink_table SELECT b.StuInfo.Class AS class, COUNT(a.name) AS amount FROM mysql_cdc_source_table AS a JOIN hbase_table FOR SYSTEM_TIME AS OF a.proc_time AS b -- 这里一定要加入 for SYSTEM_TIME as of 语句,否则虽然仍然可以执行 JOIN,但是只会全量读取一次数据库,结果可能不符合预期。 ON a.name = b.rowkey WHERE CAST(a.grade AS INT) >= 90 AND CAST(b.StuInfo.Age AS INT) >= 18 GROUP BY b.StuInfo.Class
[9] 内置维表参考列表: https://cloud.tencent.com/document/product/849/48264
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002