要使用内置的Schemas需要添加如下依赖:
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-avro</artifactId>
- <version>1.7.0</version>
- </dependency>
(3) 读取位置配置
我们在消费Kafka数据时候,可能需要指定消费的位置,Apache Flink 的FlinkKafkaConsumer提供很多便利的位置设置,如下:
- consumer.setStartFromEarliest() - 从最早的记录开始;
- consumer.setStartFromLatest() - 从最新记录开始;
- consumer.setStartFromTimestamp(...); // 从指定的epoch时间戳(毫秒)开始;
- consumer.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。
上面的位置指定可以精确到每个分区,比如如下代码:
- Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
- specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L); // 第一个分区从23L开始
- specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);// 第二个分区从31L开始
- specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);// 第三个分区从43L开始
-
- consumer.setStartFromSpecificOffsets(specificStartOffsets);
对于没有指定的分区还是默认的setStartFromGroupOffsets方式。
(4) Topic发现
Kafka支持Topic自动发现,也就是用正则的方式创建FlinkKafkaConsumer,比如:
- // 创建消费者
- FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>( java.util.regex.Pattern.compile(sourceTopic.concat("-[0-9]")),
- new KafkaMsgSchema(),
- p);
在上面的示例中,当作业开始运行时,消费者将订阅名称与指定正则表达式匹配的所有Topic(以sourceTopic的值开头并以单个数字结尾)。
3. 定义Watermark(Window)
对Kafka Connector的应用不仅限于上面的简单数据提取,我们更多时候是期望对Kafka数据进行Event-time的窗口操作,那么就需要在Flink Kafka Source中定义Watermark。
要定义Event-time,首先是Kafka数据里面携带时间属性,假设我们数据是String#Long的格式,如only for test#1000。那么我们将Long作为时间列。
- KafkaWithTsMsgSchema - 完整代码
(编辑:西安站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|