Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-08 Thread via GitHub


lucasbru merged PR #15525:
URL: https://github.com/apache/kafka/pull/15525


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1550191029


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   All good then, just wanting to make sure we're not loosing it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-03 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1550094782


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   This appears in the PR because i didn't rebase correctly.  You've actually 
moved the test to here: 
https://github.com/apache/kafka/blob/21479a31bdff0e15cfe7ee0a4e509232ed064b41/core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala#L261



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-03 Thread via GitHub


lianetm commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2034772501

   Hey @philipnee, thanks for the updates, just one minor comment left above. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-03 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1549857291


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -808,16 +808,55 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
   @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   Is this one being removed intentionally? the suggestion was only to move it 
to the `PlainTextConsumerCommit` file, where all tests related to committing 
offsets are now. Ok for me if you think it's not worth keeping, but just to 
make sure it's intentional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2033505687

   hi @lianetm - Much appreciate for the reviews.  I think I've addressed your 
comments. LMK if there's anything more. cc @lucasbru 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548891748


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   I think this is an error from rebase. so this should be removed from the PR. 
 Thanks for catching this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548887021


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,60 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   Sorry - wasn't looking carefully at it.  Putting things back to the original 
place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548614429


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -304,6 +304,60 @@ class PlaintextConsumerCommitTest extends 
AbstractConsumerTest {
 consumeAndVerifyRecords(consumer = otherConsumer, numRecords = 1, 
startingOffset = 5, startingTimestamp = startingTimestamp)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   oh this one (and the one below) are related to partition's offsets, not 
committed offsets, so I would say they need to stay in the PlaintextConsumer, 
where you had them (I was only suggesting to move the 
`testSubscribeAndCommitSync` here, because it relates to committed offsets)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548603219


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   yes, agree that's the failure we noticed on the sys tests, but conceptually 
we're creating a new `OffsetAndTimestampInternal` class that is the same as the 
existing `OffsetAndTimestamp`, with the only difference that the former does 
not throw on negative offsets or negative timestamps, right? so for the class 
doc makes sense to mention it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548599057


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   I think the problem is negative timestamp in the response causing 
`org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: 
Invalid negative timestamp`.  More specifically is this part that was 
complaining:
   ```
   if (timestamp < 0)
   throw new IllegalArgumentException("Invalid negative timestamp");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548596033


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   it's actually both! he he, so let's maybe add _negative offsets and 
timestamps_



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593720


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {
+// This test ensure that the member ID is propagated from the group 
coordinator when the
+// assignment is received into a subsequent offset commit
+val consumer = createConsumer()
+assertEquals(0, consumer.assignment.size)
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+consumer.seek(tp, 0)
+
+consumer.commitSync()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+val numRecords = 1
+ (0 until numRecords).map { i =>
+  val timestamp = startingTimestamp + i.toLong
+  val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
+  producer.send(record)
+  record
+}
+producer.flush()
+
+val consumer = createConsumer()
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+val endOffsets = consumer.endOffsets(Set(tp).asJava)
+assertEquals(numRecords, endOffsets.get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   maybe `testFetchOffsetsForTime`, which already implies searching at a given 
timestamps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593720


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {
+// This test ensure that the member ID is propagated from the group 
coordinator when the
+// assignment is received into a subsequent offset commit
+val consumer = createConsumer()
+assertEquals(0, consumer.assignment.size)
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+consumer.seek(tp, 0)
+
+consumer.commitSync()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+val numRecords = 1
+ (0 until numRecords).map { i =>
+  val timestamp = startingTimestamp + i.toLong
+  val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
+  producer.send(record)
+  record
+}
+producer.flush()
+
+val consumer = createConsumer()
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+val endOffsets = consumer.endOffsets(Set(tp).asJava)
+assertEquals(numRecords, endOffsets.get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   maybe `testFindOffsetsForTime`, which already implies searching at a given 
timestamps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548593248


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1146,29 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
-.stream()
-.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+.stream()
+.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-timestampToSearch,
-false,
-timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
-return offsetAndTimestampMap
-.entrySet()
-.stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+timestampToSearch,
+timer,
+false);
+
+Map 
offsetAndTimestampMap;
+if (timeout.isZero()) {
+applicationEventHandler.add(listOffsetsEvent);

Review Comment:
   Thanks for the explanation! Totally ok to tackle it with that separate Jira. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548591307


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   uhm...what `OffsetsAndTimestamp` does not allow is negative offsets 
[here](https://github.com/apache/kafka/blob/ee61bb721eecb0404929f125fe43392f3d024453/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndTimestamp.java#L35),
 and that's the requirement this new one is removing. Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548585054


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##
@@ -240,20 +241,48 @@ Map getOffsetResetTimestamp() {
 return offsetResetTimestamps;
 }
 
-static Map 
buildOffsetsForTimesResult(final Map timestampsToSearch,
-   final 
Map fetchedOffsets) {
-HashMap offsetsByTimes = new 
HashMap<>(timestampsToSearch.size());
+static  Map buildListOffsetsResult(

Review Comment:
   good catch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548581658


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   Timestamps I assume.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548580872


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {
+// This test ensure that the member ID is propagated from the group 
coordinator when the
+// assignment is received into a subsequent offset commit
+val consumer = createConsumer()
+assertEquals(0, consumer.assignment.size)
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+consumer.seek(tp, 0)
+
+consumer.commitSync()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testEndOffsets(quorum: String, groupProtocol: String): Unit = {
+val producer = createProducer()
+val startingTimestamp = System.currentTimeMillis()
+val numRecords = 1
+ (0 until numRecords).map { i =>
+  val timestamp = startingTimestamp + i.toLong
+  val record = new ProducerRecord(tp.topic(), tp.partition(), timestamp, 
s"key $i".getBytes, s"value $i".getBytes)
+  producer.send(record)
+  record
+}
+producer.flush()
+
+val consumer = createConsumer()
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+val endOffsets = consumer.endOffsets(Set(tp).asJava)
+assertEquals(numRecords, endOffsets.get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testTimestampsToSearch(quorum: String, groupProtocol: String): Unit = {

Review Comment:
   Including the func name we're testing (`offsetsAndTimestamps`) would 
probably make the test name clearer... maybe something around 
`testOffsetsAndTimestampsTargetTimestamps`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548576809


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -805,4 +805,73 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
 consumer2.close()
   }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubscribeAndCommitSync(quorum: String, groupProtocol: String): Unit 
= {

Review Comment:
   We have split the consumer tests into separate files grouped by feature, and 
there is now one `PlaintextConsumerCommitTest`, I would expect this test should 
go there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548574194


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1146,29 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
-.stream()
-.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+.stream()
+.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-timestampToSearch,
-false,
-timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
-return offsetAndTimestampMap
-.entrySet()
-.stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+timestampToSearch,
+timer,
+false);
+
+Map 
offsetAndTimestampMap;
+if (timeout.isZero()) {
+applicationEventHandler.add(listOffsetsEvent);

Review Comment:
   hi @lianetm thanks for the comment. There's a ticket to align the behavior 
of the two apis per your suggestions there.  The plan is to do that in a 
separated pr. https://issues.apache.org/jira/browse/KAFKA-16433
   
   Back to your first comment, it is not immediately obvious to see why people 
use these two apis with zero timeout.  The only thing sensible thing it does to 
updating the local highwatermark as you mentioned.  I think it is worth 
addressing this ambiguity after 4.0 release. So I'll leave a comment per your 
request.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548571273


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##
@@ -240,20 +241,48 @@ Map getOffsetResetTimestamp() {
 return offsetResetTimestamps;
 }
 
-static Map 
buildOffsetsForTimesResult(final Map timestampsToSearch,
-   final 
Map fetchedOffsets) {
-HashMap offsetsByTimes = new 
HashMap<>(timestampsToSearch.size());
+static  Map buildListOffsetsResult(

Review Comment:
   This generic `buildListOffsetsResult` is currently only being used from 
`buildOffsetsForTimesResult`, was the intention to used it also from 
`buildOffsetsForTimeInternalResult`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548554563


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetAndTimestampInternal.java:
##
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+
+import java.util.Optional;
+
+/**
+ * Internal representation of {@link OffsetAndTimestamp}.

Review Comment:
   I would add : Internal representation of {@link OffsetAndTimestamp} **that 
allows negative offsets**



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548547539


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1146,29 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
-.stream()
-.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+.stream()
+.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-timestampToSearch,
-false,
-timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
-return offsetAndTimestampMap
-.entrySet()
-.stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+timestampToSearch,
+timer,
+false);
+
+Map 
offsetAndTimestampMap;
+if (timeout.isZero()) {
+applicationEventHandler.add(listOffsetsEvent);

Review Comment:
   so if I get it right we are intentionally leaving this? generating an event 
to get offsets, when in the end we return right away without waiting for a 
response? I do get that the old consumer does it, and I could be missing the 
purpose of it, but seems to me an unneeded request, even considering the side 
effect of the onSuccess handler. The handler just updates the positions to 
reuse the offsets it just retrieved, and it does make sense to reuse the result 
when we do need to make a request, but I wouldn't say we need to generate an 
unneeded event/request just for that when the user requested offsets with 
max-time-to-wait=0. 
   
   In any case, if we prefer to keep this, I would suggest 2 things:
   
   1. to add a comment explaining why (handler), because it looks like a weird 
overhead to add the event and return, 
   2. to be consistent and generate the event also in the case of the 
`offsetsForTimes` before the early return (ln 1104). In the case of the old 
consumer, it's a common logic so both path, `offsetsForTimes` and 
`beginning/endOffsets` do the same request+return



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-04-02 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1548547539


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1146,29 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
-.stream()
-.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
+.stream()
+.collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
-timestampToSearch,
-false,
-timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
-listOffsetsEvent,
-timer);
-return offsetAndTimestampMap
-.entrySet()
-.stream()
-.collect(Collectors.toMap(Map.Entry::getKey, e -> 
e.getValue().offset()));
+timestampToSearch,
+timer,
+false);
+
+Map 
offsetAndTimestampMap;
+if (timeout.isZero()) {
+applicationEventHandler.add(listOffsetsEvent);

Review Comment:
   so if I get it right we are intentionally leaving this? generating an event 
to get offsets, when in the end we return right away without waiting for a 
response? I do get that the old consumer does it, and I could be missing the 
purpose of it, but seems to me an unneeded request, even considering the side 
effect of the onSuccess handler. The handler just updates the positions to 
reuse the offsets it just retrieved, and it does make sense to reuse the result 
when we do need to make a request, but I wouldn't say we need to generate an 
unneeded event/request just for that when the user requested offsets with 
max-time-to-wait=0. 
   
   In any case, if we prefer to keep this, I would suggest 2 things:
   
   1. to add a comment explaining why (handler), because it looks like a weird 
overhead to add the event and return, 2. to be consistent and generate the 
event also in the case of the `offsetsForTimes` before the early return (ln 
1104). In the case of the old consumer, it's a common logic so both path, 
`offsetsForTimes` and `beginning/endOffsets` do the same request+return



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-28 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2026690869

   @lucasbru - Thanks for taking time reviewing this PR.  This PR is ready for 
another pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-28 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1544067091


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##
@@ -946,7 +956,7 @@ public void testOffsetsForTimesWithZeroTimeout() {
 @Test
 public void testWakeupCommitted() {
 consumer = newConsumer();
-final HashMap offsets = 
mockTopicPartitionOffset();

Review Comment:
   just cleaning up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-28 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1544046037


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1141,27 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
 .stream()
 .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
 timestampToSearch,
-false,
 timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
+
+// shortcut the request if the timeout is zero.
+if (timeout.isZero()) {

Review Comment:
   scratch off the previous comment - addAndGet actually doesn't. We will need 
to explicitly return an empty result.  See the code change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-28 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2025522451

   hi @lucasbru - Let me address Lianets comment in this PR and have a 
separated PR for the behavior inconsistency as it does require some changes to 
the unit test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-28 Thread via GitHub


lucasbru commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2024835245

   @philipnee Okay, thanks for creating the ticket. Not sure if it's blocker 
priority though. If it's a quick thing, you could address it in this PR.
   
   Are you going to implement Lianets suggestion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-27 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2023293507

   @lucasbru - If I'm not mistaken, the current implementation for both 
beginningOrEndOffsets and OffsetsForTimes both need to send out a request upon 
getting ZERO duration.  Seems like both code paths are invoking this logic
   ```
   // if timeout is set to zero, do not try to poll the network 
client at all
   // and return empty immediately; otherwise try to get the 
results synchronously
   // and throw timeout exception if it cannot complete in time
   if (timer.timeoutMs() == 0L)
   return result;
   ```
   
   But the offsets for time seems to shortcircuit it here:
   ```
   // If timeout is set to zero return empty immediately; otherwise 
try to get the results
   // and throw timeout exception if it cannot complete in time.
   if (timeout.toMillis() == 0L)
   return listOffsetsEvent.emptyResult();
   
   return applicationEventHandler.addAndGet(listOffsetsEvent, 
timer);
   ```
   
   I'll create a ticket to align the behavior of these two APIs in the new 
consumers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-27 Thread via GitHub


lucasbru commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2022475971

   > @lucasbru - Thanks again for reviewing the PR. Sorry about the 
misinterpretation on short circuting logic so here I updated the 
beginningOrEndOffsets API. It seems like the right thing to do here is to still 
send out the request but return it immediately for zero timeout (a bit strange 
because it does throw timeout when time runs out which seems inconsistent).
   
   Yes, the behavior of the existing consumer is a bit curious, but it's not 
the only place where a zero duration is treated different from 0.01s. 
Either way, we probably have to do it this way for compatibility. This part 
looks good to me now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-27 Thread via GitHub


lucasbru commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1540879732


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java:
##
@@ -25,22 +25,15 @@
 import java.util.Map;
 
 /**
- * Event for retrieving partition offsets by performing a
+ * Application Event for retrieving partition offsets by performing a
  * {@link org.apache.kafka.common.requests.ListOffsetsRequest 
ListOffsetsRequest}.
- * This event is created with a map of {@link TopicPartition} and target 
timestamps to search
- * offsets for. It is completed with the map of {@link TopicPartition} and
- * {@link OffsetAndTimestamp} found (offset of the first message whose 
timestamp is greater than
- * or equals to the target timestamp)
  */
-public class ListOffsetsEvent extends 
CompletableApplicationEvent> {
-
+public class ListOffsetsEvent extends 
CompletableApplicationEvent> {

Review Comment:
   I'm personally not concerned about having two events, because they are very 
simple. The alternative is to have a common code-path that carries a 
`requiresTimestamp` boolean to differentiate behavior again, which isn't really 
any simpler. But I agree there is a certain amount of code duplication here 
that we could eliminate using your approach @lianetm , so I'm not against it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-26 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1540114887


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java:
##
@@ -25,22 +25,15 @@
 import java.util.Map;
 
 /**
- * Event for retrieving partition offsets by performing a
+ * Application Event for retrieving partition offsets by performing a
  * {@link org.apache.kafka.common.requests.ListOffsetsRequest 
ListOffsetsRequest}.
- * This event is created with a map of {@link TopicPartition} and target 
timestamps to search
- * offsets for. It is completed with the map of {@link TopicPartition} and
- * {@link OffsetAndTimestamp} found (offset of the first message whose 
timestamp is greater than
- * or equals to the target timestamp)
  */
-public class ListOffsetsEvent extends 
CompletableApplicationEvent> {
-
+public class ListOffsetsEvent extends 
CompletableApplicationEvent> {

Review Comment:
   thanks, sounds like a good idea to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-26 Thread via GitHub


lianetm commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1539933946


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java:
##
@@ -25,22 +25,15 @@
 import java.util.Map;
 
 /**
- * Event for retrieving partition offsets by performing a
+ * Application Event for retrieving partition offsets by performing a
  * {@link org.apache.kafka.common.requests.ListOffsetsRequest 
ListOffsetsRequest}.
- * This event is created with a map of {@link TopicPartition} and target 
timestamps to search
- * offsets for. It is completed with the map of {@link TopicPartition} and
- * {@link OffsetAndTimestamp} found (offset of the first message whose 
timestamp is greater than
- * or equals to the target timestamp)
  */
-public class ListOffsetsEvent extends 
CompletableApplicationEvent> {
-
+public class ListOffsetsEvent extends 
CompletableApplicationEvent> {

Review Comment:
   KInd of a general comment looking for simplification: couldn't we just have 
an internal new class `OffsetAndTimestampInternal` (better named),  that allows 
negatives and knows how to build an `OffsetAndTimestamp`? Seems to solve the 
problem we have, without having to split the `ListOffsets` into 2 events, with 
separate paths for beginning/endOffsets and offsetsForTimes, where in reality 
they have everything in common, except for the object we use to encapsulate the 
result (same result). These new splitted path leak down to the OffsetsManager 
event, when in reality, at the request/response level the manager is 
responsible for, everything is the same for both paths. With this approach the 
change would only be at the API level, on the consumer, where the result of the 
event would build the map with Longs for the beginning/end, or the map with 
OffsetAndTimestamp for the offsetsForTimes (data is the same, we just need to 
change how we return it). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-26 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2020963772

   @lucasbru - Thanks again for reviewing the PR.  Sorry about the 
misinterpretation on short circuting logic so here I updated the 
beginningOrEndOffsets API.  It seems like the right thing to do here is to 
still send out the request but return it immediately for zero timeout (a bit 
strange because it does throw timeout when time runs out which seems 
inconsistent).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-26 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1539524047


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1141,27 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
 .stream()
 .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
 timestampToSearch,
-false,
 timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
+
+// shortcut the request if the timeout is zero.
+if (timeout.isZero()) {

Review Comment:
   Hey @lucasbru - It seems like one of the handlers would also update the 
subscription state upon completion. See the snippet below:
   ```
   public void onSuccess(ListOffsetResult value) {
   synchronized (future) {
   result.fetchedOffsets.putAll(value.fetchedOffsets);
   
remainingToSearch.keySet().retainAll(value.partitionsToRetry);
   
   
offsetFetcherUtils.updateSubscriptionState(value.fetchedOffsets, 
isolationLevel);
   }
   }
   
   ```
   
   I think addAndGet seems to be sufficient to handle such logic so I'll revert 
this code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-26 Thread via GitHub


lucasbru commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1538901757


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1141,27 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
 .stream()
 .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
 timestampToSearch,
-false,
 timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
+
+// shortcut the request if the timeout is zero.
+if (timeout.isZero()) {

Review Comment:
   I've tried to dig into this a bit. So short-cutting is definitely the right 
thing to do, since otherwise we'll run into a time-out exception. The old 
consumer is a bit weird in that it fires the list offset request, but never 
returns a result. But I also couldn't find a cache that that is influenced by 
the list offsets request, so what's the point of sending the request?
   
   Replicating the old consumer behavior would mean creating the event for the 
background thread, but not waiting for the result. We can consider changing the 
behavior here, but let's make sure we do it consciously.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-26 Thread via GitHub


lucasbru commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1538863396


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##
@@ -1141,21 +1141,27 @@ private Map 
beginningOrEndOffset(Collection timestampToSearch = partitions
 .stream()
 .collect(Collectors.toMap(Function.identity(), tp -> 
timestamp));
 Timer timer = time.timer(timeout);
 ListOffsetsEvent listOffsetsEvent = new ListOffsetsEvent(
 timestampToSearch,
-false,
 timer);
-Map offsetAndTimestampMap = 
applicationEventHandler.addAndGet(
+
+// shortcut the request if the timeout is zero.
+if (timeout.isZero()) {

Review Comment:
   Sorry, only noticed this now, but the original consumer seems to send a list 
offset request even if the timeout is 0, and you are specifically introducing 
code to avoid that. Isn't that going against what Kirk is trying to achieve? cc 
@kirktrue 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-25 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2018886221

   Hey @lucasbru - Thanks for taking the time to review this PR.  Let me know 
if there's anything to add to the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-25 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1538195311


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.java:
##
@@ -423,11 +432,11 @@ private CompletableFuture 
sendListOffsetsRequestsAndResetPositions(
 });
 });
 
-if (unsentRequests.size() > 0) {
+if (unsentRequests.isEmpty()) {

Review Comment:
   @lucasbru - Switched the order as !__.isEmpty is rather difficult to read



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-25 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1537804695


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -411,8 +411,8 @@ public int memberEpoch() {
 public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData 
response) {
 if (response.errorCode() != Errors.NONE.code()) {
 String errorMessage = String.format(
-"Unexpected error in Heartbeat response. Expected no 
error, but received: %s",

Review Comment:
   good call.  I think it was editor's auto correction.  Reverting it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-25 Thread via GitHub


lucasbru commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1537350070


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java:
##
@@ -32,8 +32,7 @@
  * {@link OffsetAndTimestamp} found (offset of the first message whose 
timestamp is greater than
  * or equals to the target timestamp)
  */
-public class ListOffsetsEvent extends 
CompletableApplicationEvent> {
-
+public class ListOffsetsEvent extends 
CompletableApplicationEvent> {

Review Comment:
   How about using two separate events `ListOffsets` and 
`ListOffsetsWithTimestamps`? We could save the "requireTimestamps" boolean and 
the use of generics here, which I think would simplify the code. That would 
also get rid of some `unchecked` warnings, that you are currently suppressing, 
I'd expect.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##
@@ -411,8 +411,8 @@ public int memberEpoch() {
 public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponseData 
response) {
 if (response.errorCode() != Errors.NONE.code()) {
 String errorMessage = String.format(
-"Unexpected error in Heartbeat response. Expected no 
error, but received: %s",

Review Comment:
   nit: White space change in a file you aren't otherwise changing, I'd avoid 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-24 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2017309149

   Hey @lucasbru - Would it be possible to ask you to review this PR? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-21 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1535039670


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##
@@ -480,11 +475,16 @@ public void testRequestFails_AuthenticationException() {
 // Response received with auth error
 NetworkClientDelegate.UnsentRequest unsentRequest = 
res.unsentRequests.get(0);
 ClientResponse clientResponse =
-buildClientResponseWithAuthenticationException(unsentRequest);

Review Comment:
   the function was only used once...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-21 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1535038917


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##
@@ -131,13 +131,11 @@ public void testListOffsetsRequest_Success() throws 
ExecutionException, Interrup
 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
 mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
-CompletableFuture> result = 
requestManager.fetchOffsets(
-timestampsToSearch,
-false);
+CompletableFuture> result = 
requestManager.beginningOrEndOffset(timestampsToSearch);
 assertEquals(1, requestManager.requestsToSend());
 assertEquals(0, requestManager.requestsToRetry());
 
-Map expectedOffsets = 
Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L));
+Map expectedOffsets = 
Collections.singletonMap(TEST_PARTITION_1, 5L);

Review Comment:
   Note: A lot of mock results are converted to Long because most tests have 
the `requireTimestamps` field marked false, which implies they are only 
invoking beginningOrEndOffsets



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-21 Thread via GitHub


philipnee commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1535038917


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetsRequestManagerTest.java:
##
@@ -131,13 +131,11 @@ public void testListOffsetsRequest_Success() throws 
ExecutionException, Interrup
 ListOffsetsRequest.EARLIEST_TIMESTAMP);
 
 mockSuccessfulRequest(Collections.singletonMap(TEST_PARTITION_1, 
LEADER_1));
-CompletableFuture> result = 
requestManager.fetchOffsets(
-timestampsToSearch,
-false);
+CompletableFuture> result = 
requestManager.beginningOrEndOffset(timestampsToSearch);
 assertEquals(1, requestManager.requestsToSend());
 assertEquals(0, requestManager.requestsToRetry());
 
-Map expectedOffsets = 
Collections.singletonMap(TEST_PARTITION_1, new OffsetAndTimestamp(5L, 1L));
+Map expectedOffsets = 
Collections.singletonMap(TEST_PARTITION_1, 5L);

Review Comment:
   Note: A lot of mock results are converted to Long because most tests have 
the `requireTimestamps` field marked false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org