|
2019-05-17
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.oceanus</groupId> <artifactId>jar_demos</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- Oceanus 平台自带了 flink-java、flink-streaming 等依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <!-- 使用 Oceanus 内置 Kafka Connector --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <!-- test --> <!-- flink-clients 用于本地调试 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.13.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>3.2.0</version> <configuration> <!-- 设置主类 --> <archive> <manifestEntries> <Main-Class>com.demos.HelloWorld</Main-Class> </manifestEntries> </archive> </configuration> </plugin> </plugins> </build> </project>
package com.demos; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class HelloWorld { public static void main(String[] args) throws Exception { // 1. 设置运行环境 StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment(); List<Integer> data = new ArrayList<>(); for (int i = 0; i < 100; i++) { data.add(i); } // 2. 配置数据源读取数据 // 预定义数据源支持从文件、套接字、集合读入数据;自定义数据源支持 Kafka、MySQL 等使用 addSource() 函数读入数据 DataStreamSource<List<Integer>> dataStream = sEnv.fromElements(data); // 3. 数据加工 DataStream ds = dataStream.flatMap(new FlatMapFunction<List<Integer>, String>() { @Override public void flatMap(List<Integer> value, Collector<String> out) throws Exception { value.forEach(v -> out.collect(v.toString())); } }); // 4. 数据输出 // 预定义目的端支持把数据写入文件、标准输出(stdout)、标准错误输出(stderr)和 socket;自定义目的端支持 Kafka、MySQL 等使用 addSink() 函数写出数据 Properties sinkProps = new Properties(); String hosts = "10.0.0.29:9092"; sinkProps.setProperty("bootstrap.servers", hosts); String outTopic = "flink-demo9"; FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(outTopic, new SimpleStringSchema(), sinkProps); ds.addSink(producer); // ds.print(); // 5. 执行程序 sEnv.execute("helloworld"); } }
mvn clean package
命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002