|
2019-05-17
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.demos</groupId> <artifactId>DemoCEP</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.13.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep_2.11</artifactId> <version>1.13.2</version> </dependency> <!-- test --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.13.2</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>3.2.0</version> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.demos.CEPTest</mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
2. 代码编写
// POJO类用于模式匹配时,需实现 equals 和 hashCode 方法。 public class Stock { private String symbol; private String rowtime; private int price; private int tax; public Stock() { } public Stock(String symbol, String rowtime, int price, int tax) { this.symbol = symbol; this.rowtime = rowtime; this.price = price; this.tax = tax; } public String getSymbol() { return symbol; } public void setSymbol(String symbol) { this.symbol = symbol; } public String getRowtime() { return rowtime; } public void setRowtime(String rowtime) { this.rowtime = rowtime; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } public int getTax() { return tax; } public void setTax(int tax) { this.tax = tax; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Stock stock = (Stock) o; return Objects.equals(symbol, stock.symbol) && Objects.equals(rowtime, stock.rowtime) && Objects.equals(price, stock.price) && Objects.equals(tax, stock.tax); } @Override public int hashCode() { return Objects.hash(symbol, rowtime, price, tax); } @Override public String toString() { return "Stock{" + "symbol='" + symbol + ''' + ", rowtime='" + rowtime + ''' + ", price=" + price + ", tax=" + tax + '}'; } }
StockSerializerDeserializer 类
// 序列化和反序列类 public class StockSerializerDeserializer implements SerializationSchema<Stock>, DeserializationSchema<Stock> { private final ObjectMapper mapper = new ObjectMapper(); @Override public byte[] serialize(Stock stock) { try { return mapper.writeValueAsBytes(stock); } catch (JsonProcessingException e) { throw new RuntimeException(e); } } @Override public Stock deserialize(byte[] bytes) throws IOException { return mapper.readValue(bytes, Stock.class); } @Override public boolean isEndOfStream(Stock secEvent) { return false; } @Override public TypeInformation<Stock> getProducedType() { return TypeExtractor.getForClass(Stock.class); } }
CEPTest 主程序类
public class CEPTest { public static void main(String[] args) { // 设置环境 StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:9092"); properties.setProperty("group.id", "test"); String intTopic = "demo6-cep-source"; FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<Stock>(intTopic, new StockSerializerDeserializer(), properties); consumer.setStartFromLatest(); // 添加数据源 DataStream<Stock> input = streamEnv.addSource(consumer); // 定义要匹配的模式。即股票的低点 Pattern<Stock, ?> pattern = Pattern.<Stock>begin("start").where( new SimpleCondition<Stock>() { @Override public boolean filter(Stock stock) { return stock.getPrice() > 10; } } ).next("bottom").where( new SimpleCondition<Stock>() { @Override public boolean filter(Stock stock) { return stock.getPrice() < 10; } } ).next("up").where( new SimpleCondition<Stock>() { @Override public boolean filter(Stock Stock) { return Stock.getPrice() > 10; } } ); DataStream<String> result = CEP.pattern(input, pattern) .inProcessingTime() .flatSelect( (p, o) -> { StringBuilder builder = new StringBuilder(); builder.append(" "); builder.append(p.get("start").get(0)) .append(", ") .append(p.get("bottom").get(0)) .append(", ") .append(p.get("up").get(0)); o.collect(builder.toString()); }, Types.STRING); String topicOut = "demo6-cep-dest"; FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<String>(topicOut, new SimpleStringSchema(), properties); // 输出到 Kafka Topic result.addSink(producer); try { streamEnv.execute(); } catch (Exception e) { e.printStackTrace(); } } }
mvn clean package
命令行打包后生成的 Jar 包可以在项目 target 目录下找到,Jar 名为 jar_demos-1.0-SNAPSHOT.jar。
[root@VM-3-centos ~]# cd /opt/kafka_2.11-2.4.1/bin/ [root@VM-3-centos bin]# bash kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic demo6-cep-source --producer.config ../config/producer.properties
模拟数据示例:
{"symbol": "ACME", "rowtime": "2020-12-11 10:00:00", "price": 12, "tax":1} {"symbol": "ACME", "rowtime": "2020-12-11 10:00:01", "price": 9, "tax":2} {"symbol": "ACME", "rowtime": "2020-12-11 10:00:02", "price": 12, "tax":1}
ps:更多接入方式请参考 CKafka 收发消息 [5]
[6] 模式检测:https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/match_recognize/
编辑:航网科技 来源:腾讯云 本文版权归原作者所有 转载请注明出处
微信扫一扫咨询客服
全国免费服务热线
0755-36300002