|
2019-05-17
在表中插入2条数据。
INSERT INTO `user` (`user_id`, `user_name`, `create_time`) VALUES (1001, '小明', '2021-10-01 00:00:00'); INSERT INTO `user` (`user_id`, `user_name`, `create_time`) VALUES (1002, 'TONY', '2021-10-02 00:00:00');
点击实例 ID,在实例详情页面点击【数据库管理】进入【参数设置】面板,设置binlog_row_image=FULL来开启数据库变化的同步。
通过MySQL集成数据到 Oceanus (Flink) 集群,可以使用flink-connector-jdbc或者flink-connector-mysq-cdc。使用MySQL-cdc特性时,flink-connector-mysq-cdc 连接器需要设置 MySQL 数据库的参数 binlog_row_image=FULL。
创建 Oceanus 集群和 MySQL 集群时所选 VPC 必须是同一 VPC。
进入 Elasticsearch 控制台[5],点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问创建 Elasticsearch 集群[6]。
创建 ES 集群和 Oceanus 集群时所选私有网络 VPC 必须是同一 VPC。
CREATE TABLE `user_source` ( `user_id` int, `user_name` varchar(50), PRIMARY KEY (`user_id`) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义 ) WITH ( 'connector' = 'mysql-cdc', -- 必须为 'mysql-cdc' 'hostname' = '10.0.0.158', -- 数据库的 IP 'port' = '3306', -- 数据库的访问端口 'username' = 'root', -- 数据库访问的用户名(需要提供 SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, SELECT, RELOAD 权限) 'password' = 'yourpassword', -- 数据库访问的密码 'database-name' = 'testdb', -- 需要同步的数据库 'table-name' = 'user' -- 需要同步的数据表名 );
-- Elasticsearch 只能作为数据目的表(Sink)写入 -- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector CREATE TABLE es_sink ( `user_id` INT, `user_name` VARCHAR ) WITH ( 'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch 'connector.version' = '6', -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 'connector.hosts' = 'http://10.0.0.175:9200', 'connector.index' = 'User', 'connector.document-type' = 'user', 'connector.username' = 'elastic', 'connector.password' = 'yourpassword', 'update-mode' = 'upsert', -- 捕捉数据库变化时,需使用 'upsert' 模式 'connector.key-delimiter' = '$', -- 可选参数, 复合主键的连接字符 (默认是 _ 符号) 'connector.key-null-literal' = 'n/a', -- 主键为 null 时的替代字符串,默认是 'null' 'connector.connection-max-retry-timeout' = '300', -- 每次请求的最大超时时间 (ms) 'format.type' = 'json' -- 输出数据格式, 目前只支持 'json' );
insert into es_sink ( select user_id, LOWER(user_name) -- LOWER()函数会将用户名转换为小写 from user_source );
点击【保存】>【发布草稿】运行作业。
请根据实际购买的 Elasticsearch 版本选择对应的 Connector ,1.13 版本之后无需选择可自动匹配 Connector。
ES 作为Source/Sink , 使用时间戳 timestamp 类型字段时长度需指定,如:timestamp(3)
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002