Flink 实践入门9-Jar作业开发

| 2019-05-17

前置准备

创建流计算 Oceanus 集群
 
进入 Oceanus 控制台 [1],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [2]。
 
创建消息队列 CKafka
进入 CKafka 控制台 [3],点击左上角【新建】,即可完成 CKafka 的创建,具体可参考 CKafka 创建实例 [4]。
 
创建 Topic:
进入 CKafka 实例,点击【topic 管理】>【新建】,即可完成 Topic 的创建,具体可参考 CKafka 创建 Topic [5]。
 
开发 DataStream 作业
1. 新建 Maven 工程。
在本地 IDEA 中新建Maven Project,并配置 pom.xml 文件。pom.xml 文件内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion><groupId>com.oceanus</groupId>
    <artifactId>jar_demos</artifactId>
    <version>1.0-SNAPSHOT</version><properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>
    <dependencies>
        <!-- Oceanus 平台自带了 flink-java、flink-streaming 等依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <!-- 使用 Oceanus 内置 Kafka Connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency><!-- test -->
        <!-- flink-clients 用于本地调试 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies><build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <!-- 设置主类 -->
                    <archive>
                        <manifestEntries>
                            <Main-Class>com.demos.HelloWorld</Main-Class>
                        </manifestEntries>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2. 代码编写

Flink DataStream 作业代码如下:
package com.demos;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
​
​
public class HelloWorld {
    public static void main(String[] args) throws Exception {
        // 1. 设置运行环境
        StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            data.add(i);
        }// 2. 配置数据源读取数据
        // 预定义数据源支持从文件、套接字、集合读入数据;自定义数据源支持 Kafka、MySQL 等使用 addSource() 函数读入数据
        DataStreamSource<List<Integer>> dataStream = sEnv.fromElements(data);// 3. 数据加工
        DataStream ds = dataStream.flatMap(new FlatMapFunction<List<Integer>, String>() {
            @Override
            public void flatMap(List<Integer> value, Collector<String> out) throws Exception {
                value.forEach(v -> out.collect(v.toString()));
            }
        });// 4. 数据输出
        // 预定义目的端支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket;自定义目的端支持 Kafka、MySQL 等使用 addSink() 函数写出数据
        Properties sinkProps = new Properties();
        String hosts = "10.0.0.29:9092";
        sinkProps.setProperty("bootstrap.servers", hosts);
        String outTopic = "flink-demo9";
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(outTopic, new SimpleStringSchema(), sinkProps);
        ds.addSink(producer);
        // ds.print();// 5. 执行程序
        sEnv.execute("helloworld");
    }
}
打包 Jar 包
使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。 命令行打包命令:
mvn clean package

命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。  

流计算 Oceanus 作业
1. 上传依赖
在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。
 
2. 创建作业
在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Jar 作业,点击【开发调试】进入作业编辑页面。
 
【主程序包】选择刚刚上传的依赖,并选择最新版本。参考 pom.xml 文件填写主类,此处填入 com.demos.HelloWorld。
 
3. 运行作业
   点击【发布草稿】即可运行,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。
 
总结
DataStream 作业支持各类异构数据源与数据目的端。自定义数据源支持 Kafka、MySQL 等,使用 addSource() 函数读入数据;自定义目的端支持 Kafka、MySQL 等,使用 addSink() 函数写出数据。
打包时无需打包 flink 核心依赖,流计算 Oceanus 平台已提供。
 
阅读参考
[1] Oceanus 控制台:https://console.cloud.tencent.com/oceanus
[2] 创建独享集群:https://cloud.tencent.com/document/product/849/48298  
[3] CKafka 控制台:https://console.cloud.tencent.com/ckafka
[4] CKafka 创建实例:https://cloud.tencent.com/document/product/597/54839  
[5] Ckafka 创建 Topic:https://cloud.tencent.com/document/product/597/54854 

编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处

在线客服

微信扫一扫咨询客服


全国免费服务热线
0755-36300002

返回顶部