Flink 实践教程:进阶6-CEP 复杂事件处理

| 2019-05-17

前置准备

创建流计算 Oceanus 集群
进入 Oceanus 控制台 [2],点击左侧【集群管理】,点击左上方【创建集群】,具体可参考 Oceanus 官方文档 创建独享集群 [3]。  
 
创建 Kafka  Topic
进入 CKafka 控制台 [4],点击左上角【新建】,即可完成 CKafka 实例的创建,并创建 2 个 Topic,demo6-cep-source 和 demo6-cep-dest 。
 
发 DataStream 作业
1. 新建 Maven 工程。
在本地 IDEA 中新建 Maven 工程,并配置 pom.xml 文件。pom.xml 文件内容如下:  
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.demos</groupId>
    <artifactId>DemoCEP</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.13.2</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.11</artifactId>
            <version>1.13.2</version>
        </dependency>

        <!-- test -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.13.2</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.2.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.demos.CEPTest</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2. 代码编写

在 Flink DataStream 作业中,Stock POJO 类用于从 Kafka 中接受 JSON 格式数据,StockSerializerDeserializer 类用于序列化和反序列化,CEPTest 为主程序类。  Stock 类  
// POJO类用于模式匹配时,需实现 equals 和 hashCode 方法。
public class Stock {
    private String symbol;
    private String rowtime;
    private int price;
    private int tax;

    public Stock() {
    }

    public Stock(String symbol, String rowtime, int price, int tax) {
        this.symbol = symbol;
        this.rowtime = rowtime;
        this.price = price;
        this.tax = tax;
    }

    public String getSymbol() {
        return symbol;
    }

    public void setSymbol(String symbol) {
        this.symbol = symbol;
    }

    public String getRowtime() {
        return rowtime;
    }

    public void setRowtime(String rowtime) {
        this.rowtime = rowtime;
    }

    public int getPrice() {
        return price;
    }

    public void setPrice(int price) {
        this.price = price;
    }

    public int getTax() {
        return tax;
    }

    public void setTax(int tax) {
        this.tax = tax;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Stock stock = (Stock) o;
        return Objects.equals(symbol, stock.symbol) && Objects.equals(rowtime, stock.rowtime) && Objects.equals(price, stock.price) && Objects.equals(tax, stock.tax);
    }

    @Override
    public int hashCode() {
        return Objects.hash(symbol, rowtime, price, tax);
    }

    @Override
    public String toString() {
        return "Stock{" +
                "symbol='" + symbol + ''' +
                ", rowtime='" + rowtime + ''' +
                ", price=" + price +
                ", tax=" + tax +
                '}';
    }
}

StockSerializerDeserializer 类  

// 序列化和反序列类
public class StockSerializerDeserializer implements SerializationSchema<Stock>, DeserializationSchema<Stock> {

    private final ObjectMapper mapper = new ObjectMapper();

    @Override
    public byte[] serialize(Stock stock) {
        try {
            return mapper.writeValueAsBytes(stock);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public Stock deserialize(byte[] bytes) throws IOException {
        return mapper.readValue(bytes, Stock.class);
    }

    @Override
    public boolean isEndOfStream(Stock secEvent) {
        return false;
    }

    @Override
    public TypeInformation<Stock> getProducedType() {
        return TypeExtractor.getForClass(Stock.class);
    }
}

CEPTest 主程序类 

public class CEPTest {
    public static void main(String[] args) {
        // 设置环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
        properties.setProperty("group.id", "test");
        String intTopic = "demo6-cep-source";
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Stock>(intTopic, new StockSerializerDeserializer(), properties);
        consumer.setStartFromLatest();
        
        // 添加数据源
        DataStream<Stock> input = streamEnv.addSource(consumer);

        // 定义要匹配的模式。即股票的低点
        Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start").where(
                new SimpleCondition<Stock>() {
                    @Override
                    public boolean filter(Stock stock) {
                        return stock.getPrice() > 10;
                    }
                }
        ).next("bottom").where(
                new SimpleCondition<Stock>() {
                    @Override
                    public boolean filter(Stock stock) {
                        return stock.getPrice() < 10;
                    }
                }
        ).next("up").where(
                new SimpleCondition<Stock>() {
                    @Override
                    public boolean filter(Stock Stock) {
                        return Stock.getPrice() > 10;
                    }
                }
        );

        DataStream<String> result = CEP.pattern(input, pattern)
                .inProcessingTime()
                .flatSelect(
                        (p, o) -> {
                            StringBuilder builder = new StringBuilder();
                            builder.append("
");
                            builder.append(p.get("start").get(0))
                                    .append(",
")
                                    .append(p.get("bottom").get(0))
                                    .append(",
")
                                    .append(p.get("up").get(0));

                            o.collect(builder.toString());
                        },
                        Types.STRING);

        String topicOut = "demo6-cep-dest";
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(topicOut, new SimpleStringSchema(), properties);
      
        // 输出到 Kafka Topic
        result.addSink(producer);
        try {
            streamEnv.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3. 项目打包

使用 IDEA 自带打包工具 Build Artifacts 或者命令行进行打包。命令行打包命令:  
mvn clean package

命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。  

流计算 Oceanus 作业
1. 上传依赖
在 Oceanus 控制台,点击左侧【依赖管理】,点击左上角【新建】新建依赖,上传本地 Jar 包。 
 
2. 创建作业
在 Oceanus 控制台,点击左侧【作业管理】,点击左上角【新建】新建作业,作业类型选择 Jar 作业,点击【开发调试】进入作业编辑页面。  【主程序包】选择刚刚上传的依赖,并选择最新版本。参考 pom.xml 文件填写主类,此处填入 com.demos.CEPTest。  
 
3. 运行作业
点击【发布草稿】后启动作业,可通过【日志】面板 TaskManager 或 Flink UI 查看运行信息。
 
4. 模拟数据
通过 Kafka Client 发送数据到 Topic demo6-cep-source。  发送命令:  
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/
[root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic demo6-cep-source --producer.config ../config/producer.properties

模拟数据示例:  

{"symbol": "ACME", "rowtime": "2020-12-11 10:00:00", "price": 12, "tax":1}
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:01", "price": 9, "tax":2}
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:02", "price": 12, "tax":1}

ps:更多接入方式请参考 CKafka 收发消息 [5]  

5. 查看运行结果
在 Topic demo6-cep-dest中查看收到的数据,得到期望的数据。  

                                                                                  运行结果

总结
1.  使用 DataStream 中的 CEP 时,必须实现 POJO 类的 equals()和hashCode()方法。 因为 Flink CEP 会根据 POJO 类的 equals()和hashCode()方法进行对象的比较和匹配事件。  
2.  使用 Table SQL 中的 CEP,请参考 模式检测[6]。 
3.  打包时无需打包 flink 核心依赖,流计算 Oceanus 平台已提供。  
 
阅读参考
[1] Flink CEP(复杂事件处理): https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/libs/cep/  
[2] Oceanus 控制台:https://console.cloud.tencent.com/oceanus/overview  
[3] 创建独享集群:https://cloud.tencent.com/document/product/849/48298  
[4] CKafka 控制台:https://console.cloud.tencent.com/ckafka/index?rid=1  
[5] CKafka 收发消息:https://cloud.tencent.com/document/product/597/54834   

[6] 模式检测:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/match_recognize/ 

编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处

在线客服

微信扫一扫咨询客服


全国免费服务热线
0755-36300002

返回顶部