|
2019-05-17
-- 建表语句 create table public.test1 ( id INT, str_one VARCHAR(50), str_two VARCHAR(50), str_thr VARCHAR(50), PRIMARY key(id) ); -- 插入语句 INSERT INTO public.test1 VALUES (1, 'hello world', 'b', 'Oceanus-1'); INSERT INTO public.test1 VALUES (2, 'good job', 'c', 'Oceanus-2'); INSERT INTO public.test1 VALUES (3, 'hello oceanus', 'd', 'Oceanus-3');
CREATE TABLE default.pg_to_ck on cluster default_cluster ( id Int8, str_one String, str_two String, str_thr String, Sign Int8 ) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/default/pg_to_ck', '{replica}',Sign) ORDER BY (id);
-- PostgreSQL CDC Source。 CREATE TABLE PostgreSourceTable ( id INT, str_one VARCHAR, str_two VARCHAR, str_thr VARCHAR, PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义 ) WITH ( 'connector' = 'postgres-cdc', -- 必须为 'postgres-cdc' 'hostname' = '10.0.0.236', -- 数据库的 IP 'port' = '5432', -- 数据库的访问端口 'username' = 'root', -- 数据库访问使用的用户名(需要提供 REPLICATION 权限, 日志级别必须大于等于 logical, 且设置后需要重启实例) 'password' = 'xxxxxxxxxxx', -- 数据库访问使用的密码 'database-name' = 'postgres', -- 需要同步的数据库名 'schema-name' = 'public', -- 需要同步的数据库模式 (Schema) 'table-name' = 'test1' -- 需要同步的数据表名 );
-- ClickHouse Sink (不完全支持upsert,详见说明文档)。配合 flink-connector-clickhouse 使用。 CREATE TABLE clickhouse_sink ( id INT, str_one VARCHAR, str_two VARCHAR, str_thr VARCHAR, PRIMARY KEY (id) NOT ENFORCED -- 如果要同步的数据库表定义了主键, 则这里也需要定义 ) WITH ( 'connector' = 'clickhouse', -- connector 类型为 clickhouse 'url' = 'clickhouse://10.0.0.178:8123', -- 指定数据库链接 url 'database-name' = 'default', -- 需要写入的 clickhouse 库名 'table-name' = 'pg_to_ck', -- 需要写入的 clickhouse 表名 'table.collapsing.field' = 'Sign' -- 采用 CollapsingMergeTree 引擎的 clickhouse 表,Collapsing 列字段的名称 );
INSERT INTO clickhouse_sink SELECT id, --INITCAP:将 str_one 中的单词转为大写开头,例如 INITCAP('i have a dream') 返回 'I Have A Dream'。 INITCAP(str_one) AS str_one, --TO_BASE64:将 string 表示的字符串编码为 Base64 字符串。 TO_BASE64(str_two) AS str_two, --REPLACE:将 string1 字符串中所有的 string2 替换为 string3。例如 REPLACE('banana', 'a', 'A') 返回 'bAnAnA'。 REPLACE(str_thr,'Oceanus','Hello Oceanus') AS str_thr FROM PostgreSourceTable;
CREATE ROLE debezium_user REPLICATION LOGIN; GRANT USAGE ON DATABASE database_name TO debezium_user; GRANT USAGE ON SCHEMA schema_name TO debezium_user; GRANT SELECT ON scheam_name.table_name, scheam_name.table_name TO debezium_user;
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002