[
https://issues.apache.org/jira/browse/KAFKA-10875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yifei Gong updated KAFKA-10875:
-------------------------------
Description:
I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1
I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am
trying to seek to offsets after certain timestamp inside
{{onPartitionsAssigned}} method by calling {{offsetsForTimes}}.
I found this strange behavior of method {{offsetsForTimes}}:
When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020
5:06:55.534 AM) like below:
{code:java}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
// calling assignment just to ensure my consumer is actually assigned the
partitions
Set<TopicPartition> tps = consumer.assignment();
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream()
.collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L))));
}
{code}
By setting breakpoint, I can see I got below map:
{noformat}
{TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493}
"(timestamp=1607922521082, leaderEpoch=282, offset=22475886)"
{TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496}
"(timestamp=1607922523035, leaderEpoch=328, offset=25587551)"
{TopicPartition@5498} "My.Data.Topic-5" -> null
{TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501}
"(timestamp=1607924819752, leaderEpoch=323, offset=24578937)"
{TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504}
"(timestamp=1607922522143, leaderEpoch=299, offset=23439914)"
{TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507}
"(timestamp=1607938218461, leaderEpoch=318, offset=23415078)"
{TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510}
"(timestamp=1607922521019, leaderEpoch=298, offset=22002124)"
{TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513}
"(timestamp=1607922520780, leaderEpoch=332, offset=23406692)"
{TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516}
"(timestamp=1607922522800, leaderEpoch=285, offset=22215781)"
{TopicPartition@5518} "My.Data.Topic-6" -> null
{noformat}
As you can see some of the partitions (5 and 6) it returned null.
However, if I seek a more recent timestamp like {{1607941818423L}} (GMT
December 14, 2020 10:30:18.423 AM), I got offsets for all partitions:
{noformat}
{TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493}
"(timestamp=1607942934371, leaderEpoch=282, offset=22568732)"
{TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496}
"(timestamp=1607941818435, leaderEpoch=328, offset=25685999)"
{TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499}
"(timestamp=1607941818424, leaderEpoch=309, offset=24333860)"
{TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502}
"(timestamp=1607941818424, leaderEpoch=323, offset=24666385)"
{TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505}
"(timestamp=1607941818433, leaderEpoch=299, offset=23529597)"
{TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508}
"(timestamp=1607941818423, leaderEpoch=318, offset=23431817)"
{TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511}
"(timestamp=1607941818517, leaderEpoch=298, offset=22082849)"
{TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514}
"(timestamp=1607941818423, leaderEpoch=332, offset=23491462)"
{TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517}
"(timestamp=1607942934371, leaderEpoch=285, offset=22306422)"
{TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520}
"(timestamp=1607941818424, leaderEpoch=317, offset=24677423)"
{noformat}
So I am confused why seeking to an older timestamp gave me nulls when there are
indeed messages with later timestamp as I tried the 2nd time?
was:
I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1
I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am
trying to seek to offsets after certain timestamp inside
{{onPartitionsAssigned}} method by calling {{offsetsForTimes}}.
I found this strange behavior of method {{offsetsForTimes}}:
When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020
5:06:55.534 AM) like below:
{code:java}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
// calling assignment just to ensure my consumer is actually assigned the
partitions
Set<TopicPartition> tps = consumer.assignment();
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream()
.collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L))));
}
{code}
By setting breakpoint, I can see I got below map:
{noformat}
{TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493}
"(timestamp=1607922521082, leaderEpoch=282, offset=22475886)"
{TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496}
"(timestamp=1607922523035, leaderEpoch=328, offset=25587551)"
{TopicPartition@5498} "My.Data.Topic-5" -> null
{TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501}
"(timestamp=1607924819752, leaderEpoch=323, offset=24578937)"
{TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504}
"(timestamp=1607922522143, leaderEpoch=299, offset=23439914)"
{TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507}
"(timestamp=1607938218461, leaderEpoch=318, offset=23415078)"
{TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510}
"(timestamp=1607922521019, leaderEpoch=298, offset=22002124)"
{TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513}
"(timestamp=1607922520780, leaderEpoch=332, offset=23406692)"
{TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516}
"(timestamp=1607922522800, leaderEpoch=285, offset=22215781)"
{TopicPartition@5518} "My.Data.Topic-6" -> null
{noformat}
As you can see some of the partitions (5 and 6) it returned null.
However, if I seek a more recent timestamp like {{1607941818423L}} (GMT
December 14, 2020 10:30:18.423 AM), I got offsets for all partitions:
{noformat}
{TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493}
"(timestamp=1607942934371, leaderEpoch=282, offset=22568732)"
{TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496}
"(timestamp=1607941818435, leaderEpoch=328, offset=25685999)"
{TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499}
"(timestamp=1607941818424, leaderEpoch=309, offset=24333860)"
{TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502}
"(timestamp=1607941818424, leaderEpoch=323, offset=24666385)"
{TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505}
"(timestamp=1607941818433, leaderEpoch=299, offset=23529597)"
{TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508}
"(timestamp=1607941818423, leaderEpoch=318, offset=23431817)"
{TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511}
"(timestamp=1607941818517, leaderEpoch=298, offset=22082849)"
{TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514}
"(timestamp=1607941818423, leaderEpoch=332, offset=23491462)"
{TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517}
"(timestamp=1607942934371, leaderEpoch=285, offset=22306422)"
{TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520}
"(timestamp=1607941818424, leaderEpoch=317, offset=24677423)"
{noformat}
So I am confused why seeking to an older timestamp gave me nulls when there are
indeed messages with later timestamp as I tried the 2nd time?
> offsetsForTimes returns null for some partitions when it shouldn't?
> -------------------------------------------------------------------
>
> Key: KAFKA-10875
> URL: https://issues.apache.org/jira/browse/KAFKA-10875
> Project: Kafka
> Issue Type: Bug
> Reporter: Yifei Gong
> Priority: Minor
>
> I use spring-boot 2.2.11, spring-kafka 2.4.11 and apache kafka-clients 2.4.1
> I have my consumer {{implements ConsumerAwareRebalanceListener}}, and I am
> trying to seek to offsets after certain timestamp inside
> {{onPartitionsAssigned}} method by calling {{offsetsForTimes}}.
> I found this strange behavior of method {{offsetsForTimes}}:
> When I seek an earlier timestamp {{1607922415534L}} (GMT December 14, 2020
> 5:06:55.534 AM) like below:
> {code:java}
> @Override
> public void onPartitionsAssigned(Consumer<?, ?> consumer,
> Collection<TopicPartition> partitions) {
> // calling assignment just to ensure my consumer is actually assigned the
> partitions
> Set<TopicPartition> tps = consumer.assignment();
> Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = new HashMap<>();
> offsetsForTimes.putAll(consumer.offsetsForTimes(partitions.stream()
> .collect(Collectors.toMap(tp -> tp, epoch -> 1607922415534L))));
> }
> {code}
> By setting breakpoint, I can see I got below map:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493}
> "(timestamp=1607922521082, leaderEpoch=282, offset=22475886)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496}
> "(timestamp=1607922523035, leaderEpoch=328, offset=25587551)"
> {TopicPartition@5498} "My.Data.Topic-5" -> null
> {TopicPartition@5500} "My.Data.Topic-4" -> {OffsetAndTimestamp@5501}
> "(timestamp=1607924819752, leaderEpoch=323, offset=24578937)"
> {TopicPartition@5503} "My.Data.Topic-3" -> {OffsetAndTimestamp@5504}
> "(timestamp=1607922522143, leaderEpoch=299, offset=23439914)"
> {TopicPartition@5506} "My.Data.Topic-2" -> {OffsetAndTimestamp@5507}
> "(timestamp=1607938218461, leaderEpoch=318, offset=23415078)"
> {TopicPartition@5509} "My.Data.Topic-9" -> {OffsetAndTimestamp@5510}
> "(timestamp=1607922521019, leaderEpoch=298, offset=22002124)"
> {TopicPartition@5512} "My.Data.Topic-8" -> {OffsetAndTimestamp@5513}
> "(timestamp=1607922520780, leaderEpoch=332, offset=23406692)"
> {TopicPartition@5515} "My.Data.Topic-7" -> {OffsetAndTimestamp@5516}
> "(timestamp=1607922522800, leaderEpoch=285, offset=22215781)"
> {TopicPartition@5518} "My.Data.Topic-6" -> null
> {noformat}
> As you can see some of the partitions (5 and 6) it returned null.
> However, if I seek a more recent timestamp like {{1607941818423L}} (GMT
> December 14, 2020 10:30:18.423 AM), I got offsets for all partitions:
> {noformat}
> {TopicPartition@5492} "My.Data.Topic-1" -> {OffsetAndTimestamp@5493}
> "(timestamp=1607942934371, leaderEpoch=282, offset=22568732)"
> {TopicPartition@5495} "My.Data.Topic-0" -> {OffsetAndTimestamp@5496}
> "(timestamp=1607941818435, leaderEpoch=328, offset=25685999)"
> {TopicPartition@5498} "My.Data.Topic-5" -> {OffsetAndTimestamp@5499}
> "(timestamp=1607941818424, leaderEpoch=309, offset=24333860)"
> {TopicPartition@5501} "My.Data.Topic-4" -> {OffsetAndTimestamp@5502}
> "(timestamp=1607941818424, leaderEpoch=323, offset=24666385)"
> {TopicPartition@5504} "My.Data.Topic-3" -> {OffsetAndTimestamp@5505}
> "(timestamp=1607941818433, leaderEpoch=299, offset=23529597)"
> {TopicPartition@5507} "My.Data.Topic-2" -> {OffsetAndTimestamp@5508}
> "(timestamp=1607941818423, leaderEpoch=318, offset=23431817)"
> {TopicPartition@5510} "My.Data.Topic-9" -> {OffsetAndTimestamp@5511}
> "(timestamp=1607941818517, leaderEpoch=298, offset=22082849)"
> {TopicPartition@5513} "My.Data.Topic-8" -> {OffsetAndTimestamp@5514}
> "(timestamp=1607941818423, leaderEpoch=332, offset=23491462)"
> {TopicPartition@5516} "My.Data.Topic-7" -> {OffsetAndTimestamp@5517}
> "(timestamp=1607942934371, leaderEpoch=285, offset=22306422)"
> {TopicPartition@5519} "My.Data.Topic-6" -> {OffsetAndTimestamp@5520}
> "(timestamp=1607941818424, leaderEpoch=317, offset=24677423)"
> {noformat}
> So I am confused why seeking to an older timestamp gave me nulls when there
> are indeed messages with later timestamp as I tried the 2nd time?
--
This message was sent by Atlassian Jira
(v8.3.4#803005)