[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16777: --- Description: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when called continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, but filing this one to provide more context and point out the test failures and suggested new tests,. All fail even with the current patch in KAFKA-16637 so needs investigation. was: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too and KafkaConsumerTest.testResetToCommittedOffset (also failing due to fetch committed offsets never completing when polling with ZERO) The issue seems to be around calling poll with ZERO timeout, that even when called continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3e
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16777: --- Description: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too and KafkaConsumerTest.testResetToCommittedOffset (also failing due to fetch committed offsets never completing when polling with ZERO) The issue seems to be around calling poll with ZERO timeout, that even when called continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, but filing this one to provide more context and point out the test failures and suggested new tests,. All fail even with the current patch in KAFKA-16637 so needs investigation. was: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when called continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3e
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16777: --- Priority: Blocker (was: Major) > New consumer should throw NoOffsetForPartitionException on continuous poll > zero if no reset strategy > > > Key: KAFKA-16777 > URL: https://issues.apache.org/jira/browse/KAFKA-16777 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Blocker > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If the consumer does not define an offset reset strategy, a call to poll > should fail with NoOffsetForPartitionException. That works as expected on the > new consumer when polling with a timeout > 0 (existing integration test > [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), > but fails when polling continuously with ZERO timeout. > This can be easily reproduced with a new integration test like this (passes > for the legacy consumer but fails for the new consumer). We should add it as > part of the fix, for better coverage: > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, > groupProtocol: String): Unit = { > this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "none") > val consumer = createConsumer(configOverrides = this.consumerConfig) > consumer.assign(List(tp).asJava) > // continuous poll should eventually fail because there is no offset > reset strategy set (fail only when resetting positions after coordinator is > known) > TestUtils.tryUntilNoAssertionError() { > assertThrows(classOf[NoOffsetForPartitionException], () => > consumer.poll(Duration.ZERO)) > } > } > {code} > Also this is covered in the unit test > [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], > that is currently enabled only for the LegacyConsumer. After fixing this > issue we should be able to enable it for the new consumer too. > The issue seems to be around calling poll with ZERO timeout, that even when > called continuously, the consumer is not able to > initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it > to > [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], > where the exception is thrown. > > There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, > but filing this one to provide more context and point out the test failures > and suggested new tests,. All fail even with the current patch in KAFKA-16637 > so needs investigation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16777: -- Fix Version/s: 3.8.0 > New consumer should throw NoOffsetForPartitionException on continuous poll > zero if no reset strategy > > > Key: KAFKA-16777 > URL: https://issues.apache.org/jira/browse/KAFKA-16777 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > Fix For: 3.8.0 > > > If the consumer does not define an offset reset strategy, a call to poll > should fail with NoOffsetForPartitionException. That works as expected on the > new consumer when polling with a timeout > 0 (existing integration test > [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), > but fails when polling continuously with ZERO timeout. > This can be easily reproduced with a new integration test like this (passes > for the legacy consumer but fails for the new consumer). We should add it as > part of the fix, for better coverage: > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, > groupProtocol: String): Unit = { > this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "none") > val consumer = createConsumer(configOverrides = this.consumerConfig) > consumer.assign(List(tp).asJava) > // continuous poll should eventually fail because there is no offset > reset strategy set (fail only when resetting positions after coordinator is > known) > TestUtils.tryUntilNoAssertionError() { > assertThrows(classOf[NoOffsetForPartitionException], () => > consumer.poll(Duration.ZERO)) > } > } > {code} > Also this is covered in the unit test > [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], > that is currently enabled only for the LegacyConsumer. After fixing this > issue we should be able to enable it for the new consumer too. > The issue seems to be around calling poll with ZERO timeout, that even when > called continuously, the consumer is not able to > initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it > to > [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], > where the exception is thrown. > > There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, > but filing this one to provide more context and point out the test failures > and suggested new tests,. All fail even with the current patch in KAFKA-16637 > so needs investigation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16777: --- Description: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when called continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, but filing this one to provide more context and point out the test failures and suggested new tests,. All fail even with the current patch in KAFKA-16637 so needs investigation. was: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-16777: -- Component/s: clients > New consumer should throw NoOffsetForPartitionException on continuous poll > zero if no reset strategy > > > Key: KAFKA-16777 > URL: https://issues.apache.org/jira/browse/KAFKA-16777 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848-client-support > > If the consumer does not define an offset reset strategy, a call to poll > should fail with NoOffsetForPartitionException. That works as expected on the > new consumer when polling with a timeout > 0 (existing integration test > [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), > but fails when polling continuously with ZERO timeout. > This can be easily reproduced with a new integration test like this (passes > for the legacy consumer but fails for the new consumer). We should add it as > part of the fix, for better coverage: > {code:java} > @ParameterizedTest(name = > TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) > @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) > def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, > groupProtocol: String): Unit = { > this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "none") > val consumer = createConsumer(configOverrides = this.consumerConfig) > consumer.assign(List(tp).asJava) > // continuous poll should eventually fail because there is no offset > reset strategy set (fail only when resetting positions after coordinator is > known) > TestUtils.tryUntilNoAssertionError() { > assertThrows(classOf[NoOffsetForPartitionException], () => > consumer.poll(Duration.ZERO)) > } > } > {code} > Also this is covered in the unit test > [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], > that is currently enabled only for the LegacyConsumer. After fixing this > issue we should be able to enable it for the new consumer too. > The issue seems to be around calling poll with ZERO timeout, that even when > continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, > so the updateFetchPositions never makes it to > [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], > where the exception is thrown. > > There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, > but filing this one to provide more context and point out the test failures > and suggested new tests,. All fail even with the current patch in KAFKA-16637 > so needs investigation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
[ https://issues.apache.org/jira/browse/KAFKA-16777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans updated KAFKA-16777: --- Description: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. There is the related issue https://issues.apache.org/jira/browse/KAFKA-16637, but filing this one to provide more context and point out the test failures and suggested new tests,. All fail even with the current patch in KAFKA-16637 so needs investigation. was: If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception i