|
2019-05-17
活动购买链接 1 元购买 Oceanus 集群。
进入 Oceanus 控制台,点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群。
进入 Elasticsearch 控制台,点击左上方【新建】,创建 Elasticsearch 实例,具体操作请访问 创建 Elasticsearch 集群
!创建 Oceanus 集群和 Elasticsearch 集群时所选 VPC 必须是同一 VPC。
-- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/datagen.html CREATE TABLE random_source ( f_sequence INT, f_random INT, f_random_str VARCHAR ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', -- 每秒产生的数据条数 'fields.f_sequence.kind'='sequence', -- 有界序列(结束后自动停止输出) 'fields.f_sequence.start'='1', -- 序列的起始值 'fields.f_sequence.end'='10000', -- 序列的终止值 'fields.f_random.kind'='random', -- 无界的随机数 'fields.f_random.min'='1', -- 随机数的最小值 'fields.f_random.max'='1000', -- 随机数的最大值 'fields.f_random_str.length'='10' -- 随机字符串的长度 );
-- Elasticsearch 只能作为数据目的表(Sink)写入 -- 参见 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#elasticsearch-connector CREATE TABLE Student ( `user_id` INT, `user_name` VARCHAR ) WITH ( 'connector.type' = 'elasticsearch', -- 输出到 Elasticsearch 'connector.version' = '6', -- 指定 Elasticsearch 的版本, 例如 '6', '7'. 注意务必要和所选的内置 Connector 版本一致 'connector.hosts' = 'http://10.0.0.175:9200', -- Elasticsearch 的连接地址 'connector.index' = 'Student', -- Elasticsearch 的 Index 名 'connector.document-type' = 'stu', -- Elasticsearch 的 Document 类型 'connector.username' = 'elastic', -- 可选参数: 请替换为实际 Elasticsearch 用户名 'connector.password' = 'xxxxxxxxxx', -- 可选参数: 请替换为实际 Elasticsearch 密码 'update-mode' = 'append', -- 可选无主键的 'append' 模式,或有主键的 'upsert' 模式 'connector.key-delimiter' = '$', -- 可选参数, 复合主键的连接字符 (默认是 _ 符号, 例如 key1_key2_key3) 'connector.key-null-literal' = 'n/a', -- 主键为 null 时的替代字符串,默认是 'null' 'connector.failure-handler' = 'retry-rejected', -- 可选的错误处理。可选择 'fail' (抛出异常)、'ignore'(忽略任何错误)、'retry-rejected'(重试) 'connector.connection-max-retry-timeout' = '300', -- 每次请求的最大超时时间 (ms) 'format.type' = 'json' -- 输出数据格式, 目前只支持 'json' );
INSERT INTO Student SELECT f_sequence AS user_id, f_random_str AS user_name FROM random_source;
点击【作业参数】,在【内置 Connector】选择 flink-connector-elasticsearch6
,点击【保存】>【发布草稿】运行作业。
?新版 Flink 1.13 集群不需要用户选择内置 Connector。其他版本集群请根据实际购买的 Elasticsearch 版本选择对应的 Connector。
进入 Elasticsearch 控制台,点击之前购买的 Elasticsearch 实例,点击右上角【Kibana】,进入 Kibana 查询数据。具体查询方法请参考 通过 Kibana 访问集群
本示例用 Datagen
连接器随机生成数据,经过 Oceanus 实现最基础的数据转换功能,最后 Sink 到Elasticsearch 中,用户无需提前在 Elasticsearch 中创建索引。
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002