|
2019-05-17
前置准备
#!/usr/bin/python3 # 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块 import json import random import time from kafka import KafkaProducer broker_lists = ['10.0.0.29:9092'] kafka_topic_oceanus = 'oceanus_advanced4_input' producer = KafkaProducer(bootstrap_servers=broker_lists, value_serializer=lambda m: json.dumps(m).encode('ascii')) def send_data(topic): user_id = random.randint(1,50) item_id = random.randint(1,1000) category_id = random.randint(1,20) user_behaviors = ['pv','buy','cart','fav'] current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) msg = { 'user_id':user_id, 'item_id':item_id, 'category_id':category_id, 'user_behavior':user_behaviors[random.randint(0,len(user_behaviors)-1)], 'time_stamp':current_time } producer.send(topic, msg) print(msg) producer.flush() if __name__ == '__main__': count = 1 while True: # 每秒发送一条数据 time.sleep(1) send_data(kafka_topic_oceanus)
-- 建表语句 create table public.oceanus_advanced4_output ( win_start TIMESTAMP, category_id INT, buy_count INT, PRIMARY KEY(win_start,category_id) );
CREATE TABLE `kafka_json_source_table` ( user_id INT, item_id INT, category_id INT, user_behavior VARCHAR, time_stamp TIMESTAMP(3), WATERMARK FOR time_stamp AS time_stamp - INTERVAL '3' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'oceanus_advanced4_input', -- 替换为您要消费的 Topic 'scan.startup.mode' = 'latest-offset', -- 可以是 latest-offset / earliest-offset / specific-offsets / group-offsets / timestamp 的任何一种 'properties.bootstrap.servers' = '10.0.0.29:9092', -- 替换为您的 Kafka 连接地址 'properties.group.id' = 'testGroup', -- 必选参数, 一定要指定 Group ID 'format' = 'json', 'json.fail-on-missing-field' = 'false', -- 如果设置为 false, 则遇到缺失字段不会报错。 'json.ignore-parse-errors' = 'true' -- 如果设置为 true,则忽略任何解析报错。 );2. 创建 Sink
CREATE TABLE `jdbc_upsert_sink_table` ( win_start TIMESTAMP(3), category_id INT, buy_count INT, PRIMARY KEY (win_start,category_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:postgresql://10.0.0.236:5432/postgres?currentSchema=public&reWriteBatchedInserts=true', -- 请替换为您的实际 MySQL 连接参数 'table-name' = 'oceanus_advanced4_output', -- 需要写入的数据表 'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限) 'password' = 'yourpassword', -- 数据库访问的密码 'sink.buffer-flush.max-rows' = '200', -- 批量输出的条数 'sink.buffer-flush.interval' = '2s' -- 批量输出的间隔 );3. 编写业务 SQL
-- 创建临时视图,用于将原始数据过滤、窗口聚合 CREATE VIEW `kafka_json_source_view` AS SELECT TUMBLE_START(time_stamp,INTERVAL '1' MINUTE) AS win_start, category_id, COUNT(1) AS buy_count FROM `kafka_json_source_table` WHERE user_behavior = 'buy' GROUP BY TUMBLE(time_stamp,INTERVAL '1' MINUTE),category_id;
-- 统计每分钟 Top3 购买种类 INSERT INTO `jdbc_upsert_sink_table` SELECT b.win_start, b.category_id, CAST(b.buy_count AS INT) AS buy_count FROM (SELECT * ,ROW_NUMBER() OVER (PARTITION BY win_start ORDER BY buy_count DESC) AS rn FROM `kafka_json_source_view` ) b WHERE b.rn <= 3;
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002