flink版本 1.10.0 没有使用checkpoint Kafka version : 0.10.2.1 数据源为kafka 代码如下: val topicHkList = List(HqKafkaTopic.KAFKA_TOPIC_HK_TRADE_TICKER, HqKafkaTopic.KAFKA_TOPIC_HK_INDEX) val kafkaHkConsumer: FlinkKafkaConsumer[Tuple3[String, String, String]] = new FlinkKafkaConsumer(topicHkList, new CustomKafkaDeserializationSchema(), properties)
// 配置 Kafka Consumer 开始消费的位置 kafkaHkConsumer.setStartFromLatest() val sourseHk = env .addSource(kafkaHkConsumer).name("hk kafka source") .map(new HkKafkaDecodeMap) .map(new HkKafkaObj2HqMap) .map(new HkMsgPushDecodeMap) .filter(new HkMsgPushFilter) 消费数据的时候, 发现数据出不来, 打印debug日志: consumer.internals.Fetcher - Ignoring fetched records for hq.hk-index-topic-new-0 at offset 6921349 since the current position is 6921364 consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-tradeTicker-topic-new-0, hq.hk-index-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null) consumer.internals.Fetcher - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12716799 since the current position is 12716919 consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null) consumer.internals.Fetcher - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12716919 since the current position is 12717048 consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null) consumer.internals.Fetcher - Ignoring fetched records for hq.hk-tradeTicker-topic-new-0 at offset 12717048 since the current position is 12717071 consumer.internals.Fetcher - Sending fetch for partitions [hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 192.168.91.85:9092 (id: 0 rack: null) 好像是跟offset有关, 请问这个是什么原因呢? 我这边代码需要设置什么吗? wch...@163.com