flink版本 1.10.0  没有使用checkpoint
Kafka version : 0.10.2.1
数据源为kafka
代码如下:
val topicHkList = List(HqKafkaTopic.KAFKA_TOPIC_HK_TRADE_TICKER, 
HqKafkaTopic.KAFKA_TOPIC_HK_INDEX)
    val kafkaHkConsumer: FlinkKafkaConsumer[Tuple3[String, String, String]] =
      new FlinkKafkaConsumer(topicHkList, new 
CustomKafkaDeserializationSchema(), properties)

    // 配置 Kafka Consumer 开始消费的位置
    kafkaHkConsumer.setStartFromLatest()
    val sourseHk = env
      .addSource(kafkaHkConsumer).name("hk kafka source")
      .map(new HkKafkaDecodeMap)
      .map(new HkKafkaObj2HqMap)
      .map(new HkMsgPushDecodeMap)
      .filter(new HkMsgPushFilter)

消费数据的时候, 发现数据出不来, 打印debug日志:
consumer.internals.Fetcher  - Ignoring fetched records for 
hq.hk-index-topic-new-0 at offset 6921349 since the current position is 6921364
consumer.internals.Fetcher  - Sending fetch for partitions 
[hq.hk-tradeTicker-topic-new-0, hq.hk-index-topic-new-0] to broker 
192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher  - Ignoring fetched records for 
hq.hk-tradeTicker-topic-new-0 at offset 12716799 since the current position is 
12716919
consumer.internals.Fetcher  - Sending fetch for partitions 
[hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 
192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher  - Ignoring fetched records for 
hq.hk-tradeTicker-topic-new-0 at offset 12716919 since the current position is 
12717048
consumer.internals.Fetcher  - Sending fetch for partitions 
[hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 
192.168.91.85:9092 (id: 0 rack: null)
consumer.internals.Fetcher  - Ignoring fetched records for 
hq.hk-tradeTicker-topic-new-0 at offset 12717048 since the current position is 
12717071
consumer.internals.Fetcher  - Sending fetch for partitions 
[hq.hk-index-topic-new-0, hq.hk-tradeTicker-topic-new-0] to broker 
192.168.91.85:9092 (id: 0 rack: null)

好像是跟offset有关, 请问这个是什么原因呢? 我这边代码需要设置什么吗?




wch...@163.com

回复