|
2019-05-17
#!/usr/bin/python3 # 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块 import json import random import time from kafka import KafkaProducer broker_lists = ['10.0.0.29:9092'] topic_oceanus_quickstart = 'oceanus7_test1' producer = KafkaProducer(bootstrap_servers=broker_lists, value_serializer=lambda m: json.dumps(m).encode('ascii')) def generate_oceanus_test_data(): results = [] for _ in range(0, 10): int_one = random.randint(1000,10000) int_two = random.randint(1,10) random_thr = random.random() msg_kv = {"int_one":int_one,"int_two":int_two,"random_thr":random_thr} results.append(msg_kv) return results def send_data(topic, msgs): for msg in msgs: import time time.sleep(1) producer.send(topic, msg) print(msg) producer.flush() if __name__ == '__main__': count = 1 while True: msg_oceanus_test_data = generate_oceanus_test_data() send_data(topic_oceanus_quickstart, msg_oceanus_test_data) time.sleep(30)
-- 建表语句 create table public.oceanus7_test1 ( id INT, random_thr DOUBLE PRECISION, PRIMARY KEY(id) );
CREATE TABLE `kafka_json_source_table` ( int_one INT, int_two INT, random_thr DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = 'oceanus7_test1', -- 替换为您要消费的 Topic 'scan.startup.mode' = 'earliest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种 'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'oceanus_group2', -- 必选参数, 一定要指定 Group ID -- 定义数据格式 (JSON 格式) 'format' = 'json', 'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。 );2. 创建 Sink
CREATE TABLE jdbc_sink ( id INT, random_thr DOUBLE, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', -- connector 类型为'jdbc' 'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true', -- 请替换为您的实际 PostgreSQL 连接参数 'table-name' = 'oceanus7_test1', -- 需要写入的数据表 'username' = 'root', -- 数据库用户名(需要提供 INSERT 权限) 'password' = 'Tencent123$', -- 数据库密码 -- 数据目的 Sink 性能调优参数 'sink.buffer-flush.max-rows' = '5000', -- 可选参数, 表示每批数据的最大缓存条数, 默认值是 5000 'sink.buffer-flush.interval' = '2s', -- 可选参数, 表示每批数据的刷新周期, 默认值是 0s 'sink.max-retries' = '3' -- 可选参数, 表示数据库写入出错时, 最多重试的次数 );3. 编写业务 SQL
INSERT INTO jdbc_sink SELECT MOD(int_one,int_two) AS id, TRUNCATE(random_thr,2) AS random_thr FROM kafka_json_source_table;
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002