hi

如果是要排查问题的话可以在消费 kafka 的时候通过 JSONKeyValueDeserializationSchema
来将数据的元数据(topic/parttion/offset)获取,这样可以排查你的数据到底来自哪些分区,这样你就不会再有困惑了。

eg:

      env.addSource(new FlinkKafkaConsumer011<>(
parameters.get("topic"),                new
JSONKeyValueDeserializationSchema(true),
buildKafkaProps(parameters)))                .flatMap(new
FlatMapFunction<ObjectNode, ObjectNode>() {
@Override                    public void flatMap(ObjectNode jsonNodes,
Collector<ObjectNode> collector) throws Exception {
    System.out.println(jsonNodes.get("value"));
System.out.println(jsonNodes.get("metadata").get("topic").asText());

System.out.println(jsonNodes.get("metadata").get("offset").asText());

System.out.println(jsonNodes.get("metadata").get("partition").asText());
                       collector.collect(jsonNodes);
    }                })                .print();

Best

zhisheng


Lynn Chen <alynnc...@163.com> 于2020年10月23日周五 上午12:13写道:

>
>
>
>
>
>
> hi,  Qijun Feng:
>
>
> 我也遇到了类似的问题, 请问您后来是怎么解决的哈?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-03 09:27:52,"LakeShen" <shenleifight...@gmail.com> 写道:
> >Hi Qijun,
> >
> >看下 kafka 是不是所有分区都有数据呢,或者在这个时间截后:1585670400000L,后面是不是只有分区3写入数据,个人的想法。
> >
> >Best,
> >LakeShen
> >
> >Qijun Feng <jun1st.f...@gmail.com> 于2020年4月2日周四 下午5:44写道:
> >
> >> Dear All,
> >>
> >> 我的 Kafka cluster 有三个机器,topic 也分了三个 partition, 我用 Flink 读取 Kafka
> >> 消息的时候,感觉只从一个 partition 读取了东西, 一开始我的 bootstrap.servers 只写了一个地址,
> >>  现在改成了所有地址,也换了 group.id
> >>
> >>
> >> Properties properties = new Properties();
> >> properties.setProperty("bootstrap.servers", "10.216.85.201:9092,
> >> 10.216.77.170:9092,10.216.77.188:9092");
> >> properties.setProperty("group.id", "behavior-logs-aggregator");
> >>
> >> FlinkKafkaConsumer010<BehaviorLog> kafkaConsumer010 =
> >>        new FlinkKafkaConsumer010<BehaviorLog>("behavior-logs_dev", new
> >> BehaviorLogDeserializationSchema(), properties);
> >> kafkaConsumer010.setStartFromTimestamp(1585670400000L); //2020/04/01
> >>
> >> 处理完的数据,写到数据库里,看下了感觉少数据, 从 Log 里看到,也是。。,只有 partition=3, 没有 partiton=1,或者
> 2
> >> 的,
> >>
> >> 2020-04-02 14:54:58,532 INFO
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase  -
> >> Consumer subtask 0 creating fetcher with offsets
> >> {KafkaTopicPartition{topic='behavior-logs_dev', partition=3}=38}.
> >>
> >>
> >> 是哪里有问题吗?
> >>
> >>
>

回复