[jira] [Created] (KAFKA-16792) Enable related unit tests that fail only for new consumer with to poll(0)
Lianet Magrans created KAFKA-16792: -- Summary: Enable related unit tests that fail only for new consumer with to poll(0) Key: KAFKA-16792 URL: https://issues.apache.org/jira/browse/KAFKA-16792 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Lianet Magrans Enable the following unit tests for the new async consumer in KafkaConsumerTest: - testFetchStableOffsetThrowInPoll - testCurrentLag - testListOffsetShouldUpdateSubscriptions -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16786) New consumer subscribe should not require the deprecated partition.assignment.strategy
Lianet Magrans created KAFKA-16786: -- Summary: New consumer subscribe should not require the deprecated partition.assignment.strategy Key: KAFKA-16786 URL: https://issues.apache.org/jira/browse/KAFKA-16786 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans The partition.assignment.strategy config is deprecated with the new consumer group protocol KIP-848. With the new protocol, server side assignors are supported for now, defined in the property group.remote.assignor, and with default values selected by the broker, so it's not even a required property. The new AsyncKafkaConsumer supports the new protocol only, but it currently throws an IllegalStateException if a call to subscribe is made and the deprecated config partition.assignment.strategy is empty (see [throwIfNoAssignorsConfigured|https://github.com/apache/kafka/blob/056d232f4e28bf8e67e00f8ed2c103fdb0f3b78e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1715]). We should remove the reference to ConsumerPartitionAssignor in the AsyncKafkaConsumer, along with it's validation for non-empty on subscribe (only use it has) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16777) New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy
Lianet Magrans created KAFKA-16777: -- Summary: New consumer should throw NoOffsetForPartitionException on continuous poll zero if no reset strategy Key: KAFKA-16777 URL: https://issues.apache.org/jira/browse/KAFKA-16777 Project: Kafka Issue Type: Bug Components: consumer Reporter: Lianet Magrans If the consumer does not define an offset reset strategy, a call to poll should fail with NoOffsetForPartitionException. That works as expected on the new consumer when polling with a timeout > 0 (existing integration test [here|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/core/src/test/scala/integration/kafka/api/PlaintextConsumerFetchTest.scala#L36]), but fails when polling continuously with ZERO timeout. This can be easily reproduced with a new integration test like this (passes for the legacy consumer but fails for the new consumer). We should add it as part of the fix, for better coverage: {code:java} @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) def testNoOffsetForPartitionExceptionOnPollZero(quorum: String, groupProtocol: String): Unit = { this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") val consumer = createConsumer(configOverrides = this.consumerConfig) consumer.assign(List(tp).asJava) // continuous poll should eventually fail because there is no offset reset strategy set (fail only when resetting positions after coordinator is known) TestUtils.tryUntilNoAssertionError() { assertThrows(classOf[NoOffsetForPartitionException], () => consumer.poll(Duration.ZERO)) } } {code} Also this is covered in the unit test [KafkaConsumerTest.testMissingOffsetNoResetPolicy|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L915], that is currently enabled only for the LegacyConsumer. After fixing this issue we should be able to enable it for the new consumer too. The issue seems to be around calling poll with ZERO timeout, that even when continuously, the consumer is not able to initWithCommittedOffsetsIfNeeded, so the updateFetchPositions never makes it to [resetInitializingPositions|https://github.com/apache/kafka/blob/ba19eedb90fc3a3efc97714e3ea125647338fc66/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1663], where the exception is thrown. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16766) New consumer offsetsForTimes timeout exception does not have the proper message
Lianet Magrans created KAFKA-16766: -- Summary: New consumer offsetsForTimes timeout exception does not have the proper message Key: KAFKA-16766 URL: https://issues.apache.org/jira/browse/KAFKA-16766 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Fix For: 3.8.0 If a call to consumer.offsetsForTimes times out, the new AsyncKafkaConsumer will throw a org.apache.kafka.common.errors.TimeoutException as expected, but with the following as message: "java.util.concurrent.TimeoutException". We should provide a clearer message, and I would even say we keep the same message that the LegacyConsumer shows in this case, ex: "Failed to get offsets by times in 6ms". To fix this we should consider catching the timeout exception in the consumer when offsetsForTimes result times out ([here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1115]), and propagate it with the message specific to offsetsForTimes. After the fix, we should be able to write a test like the [testOffsetsForTimesTimeout|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java#L3246] that exist for the legacy consumer. Note that we would need a different test given that the legacy consumer does not issue a FindCoordinator request in this case but the AsyncConsumer does, so the test would have to account for that when matching requests/responses. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16764) New consumer should throw InvalidTopicException on poll when invalid topic in metadata
Lianet Magrans created KAFKA-16764: -- Summary: New consumer should throw InvalidTopicException on poll when invalid topic in metadata Key: KAFKA-16764 URL: https://issues.apache.org/jira/browse/KAFKA-16764 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans A call to consumer.poll should throw InvalidTopicException if an invalid topic is discovered in metadata. This can be easily reproduced by calling subscribe("invalid topic") and then poll, for example.The new consumer does not throw the expected InvalidTopicException like the LegacyKafkaConsumer does. The legacy consumer achieves this by checking for metadata exceptions on every iteration of the ConsumerNetworkClient (see [here|https://github.com/apache/kafka/blob/0e023e1f736ea03568032cc282803df8e61eb451/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L315]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16406) Split long-running consumer integration test
[ https://issues.apache.org/jira/browse/KAFKA-16406?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16406. Resolution: Fixed > Split long-running consumer integration test > > > Key: KAFKA-16406 > URL: https://issues.apache.org/jira/browse/KAFKA-16406 > Project: Kafka > Issue Type: Task >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Fix For: 3.8.0 > > > PlaintextConsumerTest contains integration tests for the consumer. Since the > introduction of the new consumer group protocol (KIP-848) and the new > KafkaConsumer, this test has been parametrized to run with multiple > combinations, making sure we test the logic for the old and new coordinator, > as well as for the legacy and new KafkaConsumer. > This led to this being one of the longest-running integration tests, so in > the aim of reducing the impact on the build times we could split it to allow > for parallelization. The tests covers multiple areas of the consumer logic, > in a single file, so splitting based on the high-level features being tested > would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16737) Clean up KafkaConsumerTest TODOs enabling tests for new consumer
Lianet Magrans created KAFKA-16737: -- Summary: Clean up KafkaConsumerTest TODOs enabling tests for new consumer Key: KAFKA-16737 URL: https://issues.apache.org/jira/browse/KAFKA-16737 Project: Kafka Issue Type: Task Components: consumer Reporter: Lianet Magrans KafkaConsumerTest.java contains lots of TODOs (50+) related to tests that are only enabled for the CLASSIC protocol and should be reviewed and enabled for the new CONSUMER group protocol when applicable. Some tests also have TODOs to enable them for the new consumer when certain features/bugs are addressed. The new protocol and consumer implementation have evolved a lot since those TODOs where added, so we should review them all, enable tests for the new protocol when applicable and removing the TODOs from the code. Note that there is another AsyncKafkaConsumerTest.java, testing logic specific to the internals of the new consumer, but still many tests in the KafkaConsumerTest apply to both the new and legacy consumer, and we should enable them for both. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16695) Improve expired poll interval logging by showing exceeded time
Lianet Magrans created KAFKA-16695: -- Summary: Improve expired poll interval logging by showing exceeded time Key: KAFKA-16695 URL: https://issues.apache.org/jira/browse/KAFKA-16695 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans When a consumer poll iteration takes longer than the max.poll.interval, the consumer logs a warn suggesting that the max.poll.interval config was exceeded, and pro-actively leaves the group. The log suggests to consider adjusting the max.poll.interval.config which should help in the cases of long processing times. We should consider adding the info of how much time the interval was exceeded, since it could be helpful in guiding the user to effectively adjust the config. This is done in other clients, that log this kind of messages in this situation: {quote}Application maximum poll interval (30ms) exceeded by 255ms (adjust max.poll.interval.ms for long-running message processing): leaving group{quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16675) Move rebalance callback test for positions to callbacks test file
Lianet Magrans created KAFKA-16675: -- Summary: Move rebalance callback test for positions to callbacks test file Key: KAFKA-16675 URL: https://issues.apache.org/jira/browse/KAFKA-16675 Project: Kafka Issue Type: Task Components: consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Integration test testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback was added to the PlaintextConsumerTest.scala in this PR https://github.com/apache/kafka/pull/15856, as there was no specific file for testing callbacks at the moment. Another PR is in-flight, adding the file for callback-related tests, https://github.com/apache/kafka/pull/15408. Once 15408 gets merged, we should move testGetPositionOfNewlyAssignedPartitionFromPartitionsAssignedCallback to it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16665) Fail to get partition's position from within onPartitionsAssigned callback in new consumer
Lianet Magrans created KAFKA-16665: -- Summary: Fail to get partition's position from within onPartitionsAssigned callback in new consumer Key: KAFKA-16665 URL: https://issues.apache.org/jira/browse/KAFKA-16665 Project: Kafka Issue Type: Task Components: clients, consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Assignee: Lianet Magrans Fix For: 3.8.0 If we attempt to call consumer.position(tp) from within the onPartitionsAssigned callback, the new consumer fails with a TimeoutException. The expectation is that we should be able to retrieve the position of newly assigned partitions, as it happens with the legacy consumer, that allows this call. This is actually used from places within Kafka itself (ex. Connect [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715]) The failure in the new consumer is because the partitions that are assigned but awaiting the onPartitionsAssigned callback, are excluded from the list of partitions that should initialize. We should allow the partitions to initialize their positions, without allowing to fetch data from them (which is already achieve based on the isFetchable flag in the subscription state). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16628) Add system test for validating static consumer bounce and assignment when not eager
Lianet Magrans created KAFKA-16628: -- Summary: Add system test for validating static consumer bounce and assignment when not eager Key: KAFKA-16628 URL: https://issues.apache.org/jira/browse/KAFKA-16628 Project: Kafka Issue Type: Task Components: consumer, system tests Reporter: Lianet Magrans Existing system [test|https://github.com/apache/kafka/blob/e7792258df934a5c8470c2925c5d164c7d5a8e6c/tests/kafkatest/tests/client/consumer_test.py#L209] include a test for validating that partitions are not re-assigned when a static member is bounced, but the test design and setup is intended for testing this for the Eager assignment strategy only (based on the eager protocol where all dynamic members revoke their partitions when a rebalance happens). We should considering adding a test that would ensure that partitions are not re-assigned when using the cooperative sticky assignor or the new consumer group protocol assignments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16566) Update static membership fencing system test to support new protocol
Lianet Magrans created KAFKA-16566: -- Summary: Update static membership fencing system test to support new protocol Key: KAFKA-16566 URL: https://issues.apache.org/jira/browse/KAFKA-16566 Project: Kafka Issue Type: Bug Affects Versions: 3.7.0 Reporter: Lianet Magrans Assignee: Lianet Magrans Fix For: 3.8.0 consumer_test.py contains OffsetValidationTest.test_fencing_static_consumer that verifies the sequence in which static members join a group when using conflicting instance id. This behaviour is different in the classic and consumer protocol, so the tests should be updated to set the right expectations when running with the new consumer protocol. Note that what the tests covers (params, setup), apply to both protocols. It is the expected results that are not the same. When conflicts between static members joining a group: Classic protocol: all members join the group with the same group instance id, and then the first one will eventually receive a HB error with FencedInstanceIdException Consumer protocol: new member with an instance Id already in use is not able to join, receiving an UnreleasedInstanceIdException in the response to the HB to join the group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16528) Reset member heartbeat interval when request sent
Lianet Magrans created KAFKA-16528: -- Summary: Reset member heartbeat interval when request sent Key: KAFKA-16528 URL: https://issues.apache.org/jira/browse/KAFKA-16528 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Fix For: 3.8.0 Member should reset the heartbeat timer when the request is sent, rather than when a response is received. This aims to ensure that we don't add-up to interval any delay there might be in a response. With this, we better respect the contract of members sending HB on the interval to remain in the group, and avoid potential unwanted rebalances. Note that there is already a logic in place to avoid sending a request if a response hasn't been received. So that will ensure that, even with the reset of the interval on the send, the next HB will only be sent as when the response is received. (Will be sent out on the next poll of the HB manager, and respecting the minimal backoff for sending consecutive requests). This will btw be consistent with how the interval timing & in-flights is handled for auto-commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16493) Avoid unneeded subscription regex check if metadata version unchanged
Lianet Magrans created KAFKA-16493: -- Summary: Avoid unneeded subscription regex check if metadata version unchanged Key: KAFKA-16493 URL: https://issues.apache.org/jira/browse/KAFKA-16493 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans When using pattern subscription (java pattern), the new consumer regularly checks if the list of topics that match the regex has changed. This is done as part of the consumer poll loop, and it evaluates the regex using the latest cluster metadata. As an improvement, we should avoid evaluating the regex if the metadata version hasn't changed (similar to what the legacy coordinator does [here|https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L435C10-L435C41]) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16418) Split long-running admin client integration tests
Lianet Magrans created KAFKA-16418: -- Summary: Split long-running admin client integration tests Key: KAFKA-16418 URL: https://issues.apache.org/jira/browse/KAFKA-16418 Project: Kafka Issue Type: Task Components: clients Reporter: Lianet Magrans Assignee: Lianet Magrans Review PlaintextAdminIntegrationTest and attempt to split it to allow for parallelization and improve build times. This tests is the longest running integration test in kafka.api, so a similar approach to what has been done with the consumer tests in PlaintextConsumerTest should be a good improvement. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
[ https://issues.apache.org/jira/browse/KAFKA-16375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16375. Resolution: Fixed > Fix logic for discarding reconciliation if member rejoined > -- > > Key: KAFKA-16375 > URL: https://issues.apache.org/jira/browse/KAFKA-16375 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Critical > Labels: client-transitions-issues, kip-848-client-support > Fix For: 3.8.0 > > > The current implementation of the new consumer discards the result of a > reconciliation if the member rejoined, based on a comparison of the member > epoch at the start and end of the reconciliation. If the epochs changed the > reconciliation is discarded. This is not right because the member epoch could > be incremented without an assignment change. This should be fixed to ensure > that the reconciliation is discarded if the member rejoined, probably based > on a flag that truly reflects that it went through a transition to joining > while reconciling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16406) Split long-running consumer integration test
Lianet Magrans created KAFKA-16406: -- Summary: Split long-running consumer integration test Key: KAFKA-16406 URL: https://issues.apache.org/jira/browse/KAFKA-16406 Project: Kafka Issue Type: Task Reporter: Lianet Magrans Assignee: Lianet Magrans PlaintextConsumerTest contains integration tests for the consumer. Since the introduction of the new consumer group protocol (KIP-848) and the new KafkaConsumer, this test has been parametrized to run with multiple combinations, making sure we test the logic for the old and new coordinator, as well as for the legacy and new KafkaConsumer. This led to this being one of the longest-running integration tests, so in the aim of reducing the impact on the build times we could split it to allow for parallelization. The tests covers multiple areas of the consumer logic, in a single file, so splitting based on the high-level features being tested would be sensible and achieve the result wanted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16375) Fix logic for discarding reconciliation if member rejoined
Lianet Magrans created KAFKA-16375: -- Summary: Fix logic for discarding reconciliation if member rejoined Key: KAFKA-16375 URL: https://issues.apache.org/jira/browse/KAFKA-16375 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans The current implementation of the new consumer discards the result of a reconciliation if the member rejoined, based on a comparison of the member epoch at the start and end of the reconciliation. If the epochs changed the reconciliation is discarded. This is not right because the member epoch could be incremented without an assignment change. This should be fixed to ensure that the reconciliation is discarded if the member rejoined, probably based on a flag that truly reflects that it went through a transition to joining. As a potential improvement, consider if the member could keep the reconciliation if it rejoined but got the same assignment. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16301) Review fenced member unsubscribe/subscribe callbacks interaction
Lianet Magrans created KAFKA-16301: -- Summary: Review fenced member unsubscribe/subscribe callbacks interaction Key: KAFKA-16301 URL: https://issues.apache.org/jira/browse/KAFKA-16301 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans When a member gets fenced, it triggers the onPartitionsLost callback if any, and then rejoins the group. If while the callback completes the member attempts to leave the group (ex. unsubscribe), the leave operation detects that the member is already removed from the group (fenced), and just aligns the client state with the current broker state, and marks the client as UNSUBSCRIBED (client side state for not in group). This means that the member could attempt to rejoin the group if the user calls subscribe, get an assignment, and trigger onPartitionsAssigned, when maybe the onPartitionsLost hasn't completed. This approach keeps the client state machine simple given that it does not need to block the new member (it will effectively be a new member because the old one got fenced). The new member could rejoin, get an assignment and make progress. Downside is that it would potentially allow for overlapped callback executions (lost and assign) in the above edge case, which is not the behaviour in the old coordinator. Review and validate. Alternative would definitely require more complex logic on the client to ensure that we do not allow a new member to rejoin until the fenced one completes the callback -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16298) Ensure user callbacks exceptions are propagated to the user on consumer poll
Lianet Magrans created KAFKA-16298: -- Summary: Ensure user callbacks exceptions are propagated to the user on consumer poll Key: KAFKA-16298 URL: https://issues.apache.org/jira/browse/KAFKA-16298 Project: Kafka Issue Type: Sub-task Components: clients, consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans When user-defined callbacks fail with an exception, the expectation is that the error should be propagated to the user as a KafkaExpception and break the poll loop (behaviour in the legacy coordinator). The new coordinator executes callbacks in the application thread, and sends en event to the background with the callback result ([here|https://github.com/apache/kafka/blob/98a658f871fc2c533b16fb5fd567a5ceb1c340b7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L252]), but does not seem to propagate the exception to the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16258) Stale member should trigger onPartitionsLost when leaving group
Lianet Magrans created KAFKA-16258: -- Summary: Stale member should trigger onPartitionsLost when leaving group Key: KAFKA-16258 URL: https://issues.apache.org/jira/browse/KAFKA-16258 Project: Kafka Issue Type: Sub-task Components: clients, consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Fix For: 3.8.0 When the poll timer expires, the new consumer proactively leaves the group and clears its assignments, but it should also invoke the onPartitionsLost callback. The legacy coordinator does the following sequence on poll timer expiration: send leave group request ([here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1517]), invoke onPartitionsLost, and when it completes it clears the assignment (onJoinPrepare [here|https://github.com/apache/kafka/blob/e8c70fce26626ed2ab90f2728a45f6e55e907ec1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L779]). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16251) Fenced member should not send heartbeats while waiting for onPartitionsLost to complete
Lianet Magrans created KAFKA-16251: -- Summary: Fenced member should not send heartbeats while waiting for onPartitionsLost to complete Key: KAFKA-16251 URL: https://issues.apache.org/jira/browse/KAFKA-16251 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans When a member gets fenced, it transitions to FENCED state and triggers the onPartitionsLost callback to release it assignment. Members should stop sending heartbeats while FENCED, and resume sending it only after completing the callback, when it transitions to JOINING. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16233) Review auto-commit continuously committing when no progress
Lianet Magrans created KAFKA-16233: -- Summary: Review auto-commit continuously committing when no progress Key: KAFKA-16233 URL: https://issues.apache.org/jira/browse/KAFKA-16233 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans When auto-commit is enabled, the consumer (legacy and new) will continuously send commit requests with the current positions, even if no progress is made and positions remain unchanged. We could consider if this is really needed for some reason, or if we could improve it and just send auto-commit on the interval if positions have moved, avoiding sending repeatedly the same commit request. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16133) Commits during reconciliation always time out
[ https://issues.apache.org/jira/browse/KAFKA-16133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16133. Fix Version/s: 3.7.0 (was: 3.8.0) Resolution: Fixed > Commits during reconciliation always time out > - > > Key: KAFKA-16133 > URL: https://issues.apache.org/jira/browse/KAFKA-16133 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.7.0 >Reporter: Lucas Brutschy >Assignee: Lianet Magrans >Priority: Critical > Labels: consumer-threading-refactor, reconciliation, timeout > Fix For: 3.7.0 > > > This only affects the AsyncKafkaConsumer, which is in Preview in 3.7. > In MembershipManagerImpl there is a confusion between timeouts and deadlines. > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java#L836C38-L836C38] > This causes all autocommits during reconciliation to immediately time out. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16224) Fix handling of deleted topic when auto-committing before revocation
Lianet Magrans created KAFKA-16224: -- Summary: Fix handling of deleted topic when auto-committing before revocation Key: KAFKA-16224 URL: https://issues.apache.org/jira/browse/KAFKA-16224 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Current logic for auto-committing offsets when partitions are revoked will retry continuously when getting UNKNOWN_TOPIC_OR_PARTITION, leading to the member not completing the revocation in time. We should consider this as an indication of the topic being deleted, and in the context of committing offsets to revoke partitions, we should abort the commit attempt and move on to complete and ack the revocation. While reviewing this, review the behaviour around this error for other commit operations as well in case a similar reasoning should be applied. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16215) Consumer does not rejoin after fenced on delayed revocation
Lianet Magrans created KAFKA-16215: -- Summary: Consumer does not rejoin after fenced on delayed revocation Key: KAFKA-16215 URL: https://issues.apache.org/jira/browse/KAFKA-16215 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Consumer with partitions assigned for T1, T1 gets deleted, consumer gets stuck attempting to commit offsets so the reconciliation does not complete. It gets fenced then but is not attempting to rejoin as it should. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16203) AutoCommit of empty offsets blocks following requests due to inflight flag
Lianet Magrans created KAFKA-16203: -- Summary: AutoCommit of empty offsets blocks following requests due to inflight flag Key: KAFKA-16203 URL: https://issues.apache.org/jira/browse/KAFKA-16203 Project: Kafka Issue Type: Bug Components: clients, consumer Affects Versions: 3.7.0 Reporter: Lianet Magrans Assignee: Lianet Magrans Fix For: 3.8.0 The logic for auto-committing offsets completes without generating a request, but mistakenly leaves the inflight request flag on. This makes that following auto-commits won't generate requests, even if offsets have been consumed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16134) kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16134. Assignee: Lianet Magrans Resolution: Fixed Fix merged in https://github.com/apache/kafka/pull/15215 > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > -- > > Key: KAFKA-16134 > URL: https://issues.apache.org/jira/browse/KAFKA-16134 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Stanislav Kozlovski >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The following test is very flaky. It failed 3 times consecutively in Jenkins > runs for the 3.7 release candidate. > kafka.api.PlaintextConsumerTest.testPerPartitionLagMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16135) kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, String).quorum=kraft+kip848.groupProtocol=consumer is flaky
[ https://issues.apache.org/jira/browse/KAFKA-16135?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16135. Assignee: Lianet Magrans Resolution: Fixed Fix merged in https://github.com/apache/kafka/pull/15215 > kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer is flaky > --- > > Key: KAFKA-16135 > URL: https://issues.apache.org/jira/browse/KAFKA-16135 > Project: Kafka > Issue Type: Test > Components: clients, consumer >Reporter: Stanislav Kozlovski >Assignee: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.8.0 > > > The test > kafka.api.PlaintextConsumerTest.testPerPartitionLeadMetricsCleanUpWithSubscribe(String, > String).quorum=kraft+kip848.groupProtocol=consumer > is incredibly flaky - it failed 3 builds in a row for the 3.7 release > candidate, but with different JDK versions. Locally it also fails often and > requires a few retries to pass > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16190) Member should send full heartbeat when rejoining
Lianet Magrans created KAFKA-16190: -- Summary: Member should send full heartbeat when rejoining Key: KAFKA-16190 URL: https://issues.apache.org/jira/browse/KAFKA-16190 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans The heartbeat request builder should make sure that all fields are sent in the heartbeat request when the consumer rejoins (currently the HeartbeatRequestManager request builder is only reset on failure scenarios). This should fix the issue that a client that is subscribed to a topic and gets fenced, should try to rejoin providing the same subscription it had. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16185) Fix client reconciliation of same assignment received in different epochs
Lianet Magrans created KAFKA-16185: -- Summary: Fix client reconciliation of same assignment received in different epochs Key: KAFKA-16185 URL: https://issues.apache.org/jira/browse/KAFKA-16185 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Currently, the intention in the client state machine is that the client always reconciles whatever it has pending that has not been removed by the coordinator. There is still an edge case where this does not happen, and the client might get stuck JOINING/RECONCILING, with a pending reconciliation (delayed), and it receives the same assignment, but in a new epoch (ex. after being FENCED). First time it receives the assignment it takes no action, as it already has it as pending to reconcile, but when the reconciliation completes it discards the result because the epoch changed. And this is wrong. Note that after sending the assignment with the new epoch one time, the broker continues to send null assignments. Here is a sample sequence leading to the client stuck JOINING: - client joins, epoch 0 - client receives assignment tp1, stuck RECONCILING, epoch 1 - member gets FENCED on the coord, coord bumps epoch to 2 - client tries to rejoin (JOINING), epoch 0 provided by the client - new member added to the group (group epoch bumped to 3), client receives same assignment that is currently trying to reconcile (tp1), but with epoch 3 - previous reconciliation completes, but will discard the result because it will notice that the memberHasRejoined (memberEpochOnReconciliationStart != memberEpoch). Client is stuck JOINING, with the server sending null target assignment because it hasn't changed since the last one sent (tp1) (We should end up with a test similar to the existing #testDelayedReconciliationResultDiscardedIfMemberRejoins but with the case that the member receives the same assignment after being fenced and rejoining) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16165) Consumer invalid transition on expired poll interval
Lianet Magrans created KAFKA-16165: -- Summary: Consumer invalid transition on expired poll interval Key: KAFKA-16165 URL: https://issues.apache.org/jira/browse/KAFKA-16165 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Running system tests with the new async consumer revealed an invalid transition related to the consumer not being polled on the interval in some kind of scenario (maybe relates to consumer close, as the transition is leaving->stale) Log trace: [2024-01-17 19:45:07,379] WARN [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager:188) [2024-01-17 19:45:07,379] ERROR [Consumer clientId=consumer.6aa7cd1c-c83f-47e1-8f8f-b38a459a05d8-0, groupId=consumer-groups-test-2] Unexpected error caught in consumer network thread (org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread:91) java.lang.IllegalStateException: Invalid state transition from LEAVING to STALE at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionTo(MembershipManagerImpl.java:303) at org.apache.kafka.clients.consumer.internals.MembershipManagerImpl.transitionToStale(MembershipManagerImpl.java:739) at org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager.poll(HeartbeatRequestManager.java:194) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.lambda$runOnce$0(ConsumerNetworkThread.java:137) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:197) at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179) at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:657) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.runOnce(ConsumerNetworkThread.java:139) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread.run(ConsumerNetworkThread.java:88) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16156) System test failing on endOffsets with negative timestamps
Lianet Magrans created KAFKA-16156: -- Summary: System test failing on endOffsets with negative timestamps Key: KAFKA-16156 URL: https://issues.apache.org/jira/browse/KAFKA-16156 Project: Kafka Issue Type: Sub-task Components: clients Reporter: Lianet Magrans TransactionalMessageCopier run with 3.7 new consumer fails with "Invalid negative timestamp". Trace: [2024-01-15 07:42:33,933] ERROR Shutting down after unexpected error in event loop (org.apache.kafka.tools.TransactionalMessageCopier) org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: Invalid negative timestamp at org.apache.kafka.clients.consumer.internals.ConsumerUtils.maybeWrapAsKafkaException(ConsumerUtils.java:234) at org.apache.kafka.clients.consumer.internals.ConsumerUtils.getResult(ConsumerUtils.java:212) at org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent.get(CompletableApplicationEvent.java:44) at org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler.addAndGet(ApplicationEventHandler.java:113) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.beginningOrEndOffset(AsyncKafkaConsumer.java:1134) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1113) at org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer.endOffsets(AsyncKafkaConsumer.java:1108) at org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:1651) at org.apache.kafka.tools.TransactionalMessageCopier.messagesRemaining(TransactionalMessageCopier.java:246) at org.apache.kafka.tools.TransactionalMessageCopier.runEventLoop(TransactionalMessageCopier.java:342) at org.apache.kafka.tools.TransactionalMessageCopier.main(TransactionalMessageCopier.java:292) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Reopened] (KAFKA-15538) Client support for java regex based subscription
[ https://issues.apache.org/jira/browse/KAFKA-15538?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans reopened KAFKA-15538: > Client support for java regex based subscription > > > Key: KAFKA-15538 > URL: https://issues.apache.org/jira/browse/KAFKA-15538 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Phuc Hong Tran >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > When using subscribe with a java regex (Pattern), we need to resolve it on > the client side to send the broker a list of topic names to subscribe to. > Context: > The new consumer group protocol uses [Google > RE2/J|https://github.com/google/re2j] for regular expressions and introduces > new methods in the consumer API to subscribe using a `SubscribePattern`. The > subscribe using a java `Pattern` will be still supported for a while but > eventually removed. > * When the subscribe with SubscriptionPattern is used, the client should > just send the regex to the broker and it will be resolved on the server side. > * In the case of the subscribe with Pattern, the regex should be resolved on > the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15866) Refactor OffsetFetchRequestState Error handling to be more consistent with OffsetCommitRequestState
[ https://issues.apache.org/jira/browse/KAFKA-15866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15866. Fix Version/s: 3.7.0 (was: 3.8.0) Assignee: (was: Lan Ding) Resolution: Fixed > Refactor OffsetFetchRequestState Error handling to be more consistent with > OffsetCommitRequestState > --- > > Key: KAFKA-15866 > URL: https://issues.apache.org/jira/browse/KAFKA-15866 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > Fix For: 3.7.0 > > > The current OffsetFetchRequestState error handling uses nested if-else, which > is quite different, stylistically, to the OffsetCommitRequestState using a > switch statment. The latter is a bit more readable so we should refactor the > error handling using the same style to improve readability. > > A minor point: Some of the error handling seems inconsistent with the commit. > The logic was from the current implementation, so we should also review all > the error handling. For example: somehow the current logic doesn't mark the > coordinator unavailable when receiving COORDINATOR_NOT_AVAILABLE -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16107) Ensure consumer does not start fetching from added partitions until onPartitionsAssgined completes
Lianet Magrans created KAFKA-16107: -- Summary: Ensure consumer does not start fetching from added partitions until onPartitionsAssgined completes Key: KAFKA-16107 URL: https://issues.apache.org/jira/browse/KAFKA-16107 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans In the new consumer implementation, when new partitions are assigned, the subscription state is updated and then the #onPartitionsAssigned triggered. This sequence seems sensible but we need to ensure that no data is fetched until the onPartitionsAssigned completes (where the user could be setting the committed offsets it want to start fetching from). We should pause the partitions newly added partitions until onPartitionsAssigned completes, similar to how it's done on revocation to avoid positions getting ahead of the committed offsets. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15946) AsyncKafkaConsumer should retry commits on the application thread instead of auto-retry
[ https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15946. Fix Version/s: 3.7.0 (was: 3.8.0) Assignee: Lianet Magrans (was: Kirk True) Resolution: Fixed 3.7 includes fix to make sure that only sync commits are retried, with a timeout, and async commits are not (just passing failure to the callback). There is also a follow ticket https://issues.apache.org/jira/browse/KAFKA-16033 > AsyncKafkaConsumer should retry commits on the application thread instead of > auto-retry > --- > > Key: KAFKA-15946 > URL: https://issues.apache.org/jira/browse/KAFKA-15946 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Philip Nee >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > Fix For: 3.7.0 > > > The original design was that the network thread always completes the future > whether succeeds or fails. However, in the current patch, I mis-added > auto-retry functionality because commitSync wasn't retrying. What we should > be doing is, the commit sync API should catch the RetriableExceptions and > resend another commit until timesout. > > {code:java} > if (error.exception() instanceof RetriableException) { > log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, > error.message()); > handleRetriableError(error, response); > retry(responseTime); <--- We probably shouldn't do this. > return; > } {code} > > {code:java} > @Override > public void commitSync(Map offsets, > Duration timeout) { > acquireAndEnsureOpen(); > long commitStart = time.nanoseconds(); > try > { CompletableFuture commitFuture = commit(offsets, true); <-- we > probably should retry here ConsumerUtils.getResult(commitFuture, > time.timer(timeout)); } > finally > { wakeupTrigger.clearTask(); > kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); > release(); } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15967) Fix revocation in reconcilation logic
[ https://issues.apache.org/jira/browse/KAFKA-15967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15967. Fix Version/s: 3.7.0 (was: 3.8.0) Assignee: Lianet Magrans Resolution: Fixed > Fix revocation in reconcilation logic > - > > Key: KAFKA-15967 > URL: https://issues.apache.org/jira/browse/KAFKA-15967 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lucas Brutschy >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.7.0 > > > Looks like there is a problem in the reconciliation logic. > We are getting 6 partitions from an HB, we add them to > {{{}assignmentReadyToReconcile{}}}. Next HB we get only 4 partitions (2 are > revoked), we also add them to {{{}assignmentReadyToReconcile{}}}, but the 2 > partitions that were supposed to be removed from the assignment are never > removed because they are still in {{{}assignmentReadyToReconcile{}}}. > This was discovered during integration testing of > [https://github.com/apache/kafka/pull/14878] - part of the test > testRemoteAssignorRange was disabled and should be re-enabled once this is > fixed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15872) Investigate autocommit retry logic
[ https://issues.apache.org/jira/browse/KAFKA-15872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15872. Fix Version/s: (was: 3.8.0) Resolution: Duplicate > Investigate autocommit retry logic > -- > > Key: KAFKA-15872 > URL: https://issues.apache.org/jira/browse/KAFKA-15872 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Philip Nee >Priority: Minor > Labels: consumer-threading-refactor > > This is purely an investigation ticket. > Currently, we send an autocommit only if there isn't an inflight one; > however, this logic might not be correct because I think we should: > # expires the request if it is not completed in time > # always send an autocommit on the clock -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15455) Add support for OffsetCommit version 9 in consumer
[ https://issues.apache.org/jira/browse/KAFKA-15455?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15455. Fix Version/s: 3.7.0 (was: 3.8.0) Resolution: Fixed > Add support for OffsetCommit version 9 in consumer > -- > > Key: KAFKA-15455 > URL: https://issues.apache.org/jira/browse/KAFKA-15455 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: David Jacot >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > We need to handle the new error codes as specified here: > [https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitResponse.json#L46|https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json#L35] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16099) Handle timeouts for AsyncKafkaConsumer.commitSync
[ https://issues.apache.org/jira/browse/KAFKA-16099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-16099. Fix Version/s: 3.7.0 Resolution: Fixed > Handle timeouts for AsyncKafkaConsumer.commitSync > - > > Key: KAFKA-16099 > URL: https://issues.apache.org/jira/browse/KAFKA-16099 > Project: Kafka > Issue Type: Sub-task > Components: clients >Reporter: Andrew Schofield >Priority: Major > Fix For: 3.7.0 > > > The handling of synchronous offset commits in the background thread does not > observe the caller's timeout. In the situation that a commit request needs to > be retried, the retries should not extend beyond the caller's timeout. The > CommitApplicationEvent should contain the timeout and not continue beyond > that time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16103) Review client logic for triggering offset commit callbacks
Lianet Magrans created KAFKA-16103: -- Summary: Review client logic for triggering offset commit callbacks Key: KAFKA-16103 URL: https://issues.apache.org/jira/browse/KAFKA-16103 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Review logic for triggering commit callbacks, ensuring that all callbacks are triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15846) Review consumer leave group request best effort and response handling
[ https://issues.apache.org/jira/browse/KAFKA-15846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15846. Fix Version/s: (was: 3.8.0) Resolution: Duplicate > Review consumer leave group request best effort and response handling > - > > Key: KAFKA-15846 > URL: https://issues.apache.org/jira/browse/KAFKA-15846 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support > > New consumer sends out a leave group request with a best effort approach. > Transitions to LEAVING to indicate the HB manager that the request must be > sent, but it does not do any response handling or retrying (note that the > response is still handled as any other HB response). After the first HB > manager poll iteration while on LEAVING, the consumer transitions into > UNSUBSCRIBE (no matter if the request was actually sent out or not, ex, due > to coordinator not known). Review if this is good enough as an effort to send > the request, and consider effect of responses that may be received and > processed when there are no longer relevant -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15542) Release member assignments on errors
[ https://issues.apache.org/jira/browse/KAFKA-15542?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15542. Fix Version/s: 3.7.0 (was: 3.8.0) Resolution: Fixed Fixed in https://github.com/apache/kafka/pull/14690 > Release member assignments on errors > > > Key: KAFKA-15542 > URL: https://issues.apache.org/jira/browse/KAFKA-15542 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > Fix For: 3.7.0 > > > Member should release assignment by triggering the onPartitionsLost flow from > the HB manager when errors occur (both fencing and unrecoverable errors) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15515) Remove duplicated integration tests for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-15515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15515. Fix Version/s: (was: 4.0.0) Resolution: Not A Problem PlaintextAsyncConsumer.scala was discarded before merging as it was not needed anymore (existing PlaintextConsumer.scala was parametrized) > Remove duplicated integration tests for new consumer > > > Key: KAFKA-15515 > URL: https://issues.apache.org/jira/browse/KAFKA-15515 > Project: Kafka > Issue Type: Test > Components: clients, consumer, unit tests >Reporter: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > This task involves removing the temporary `PlaintextAsyncConsumer` file > containing duplicated integration tests for the new consumer. The copy was > generated to catch regressions and validate functionality in the new consumer > while in development. It should be deleted when the new consumer is fully > implemented and the existing integration tests (`PlaintextConsumerTest`) can > be executed for both implementations. > > Context: > > For the current KafkaConsumer, a set of integration tests exist in the file > PlaintextConsumerTest. Those tests cannot be executed as such for the new > consumer implementation for 2 main reasons > - the new consumer is being developed as a new PrototypeAsyncConsumer class, > in parallel to the existing KafkaConsumer. > - the new consumer is under development, so it does not support all the > consumer functionality yet. > > In order to be able to run the subsets of tests that the new consumer > supports while the implementation completes, it was decided to : > - to make a copy of the `PlaintextAsyncConsumer` class, named > PlaintextAsyncConsumer. > - leave all the existing integration tests that cover the simple consumer > case unchanged, and disable the tests that are not yet supported by the new > consumer. Disabled tests will be enabled as the async consumer > evolves. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15325) Integrate topicId in OffsetFetch and OffsetCommit async consumer calls
[ https://issues.apache.org/jira/browse/KAFKA-15325?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15325. Resolution: Duplicate > Integrate topicId in OffsetFetch and OffsetCommit async consumer calls > -- > > Key: KAFKA-15325 > URL: https://issues.apache.org/jira/browse/KAFKA-15325 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support > Fix For: 3.8.0 > > > KIP-848 introduces support for topicIds in the OffsetFetch and OffsetCommit > APIs. The consumer calls to those APIs should be updated to include topicIds > when available. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16033) Review retry logic of OffsetFetch and OffsetCommit responses
Lianet Magrans created KAFKA-16033: -- Summary: Review retry logic of OffsetFetch and OffsetCommit responses Key: KAFKA-16033 URL: https://issues.apache.org/jira/browse/KAFKA-16033 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans The retry logic for OffsetFetch and OffsetCommit requests lives in the CommitRequestManager, and applies to requests issued from multiple components (AsyncKakfaConsumer for commitSync and commitAsync, CommitRequestManager for the regular auto-commits, MembershipManager for auto-commits before rebalance, auto-commit before closing consumer). While this approach helps to avoid having the retry logic in each caller, currently the CommitManager has it in different places and it ends up being rather hard to follow. This task aims at reviewing the retry logic from a high level perspective (multiple callers, with retry needs that have similarities and differences at the same time). So the review should asses the similarities vs differences, and then consider two options: 1. Keep retry logic centralized in the CommitManager, but fixed in a more consistent way, applied the same way for all requests, depending on the intention expressed by the caller. Advantages of this approach (current approach + improvement) is that callers that require the same retry logic could reuse if, keeping it in a single place (ex. commitSync from the consumer retries in the same way as the auto-commit before rebalance). 2. move retry logic to the caller. This aligns with the way it was done on the legacy coordinator, but the main challenge seems to be not duplicating the retry logic in callers that require the same. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16032) Review inconsistent error handling of OffsetFetch and OffsetCommit responses
Lianet Magrans created KAFKA-16032: -- Summary: Review inconsistent error handling of OffsetFetch and OffsetCommit responses Key: KAFKA-16032 URL: https://issues.apache.org/jira/browse/KAFKA-16032 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans OffsetFetch and OffsetCommit handle errors separately. There are 2 issues to review around this: - The logic is duplicated for some errors that are treated similarly (ex. NOT_COORDINATOR) - Some errors are not handled similarly in both requests (ex. COORDINATOR_NOT_AVAILABLE handled and retried for OffsetCommit but not OffsetFetch). Note that the specific errors handled by each request were kept the same as in the legacy ConsumerCoordinator but this should be reviewed, in an attempt to handle the same errors, in the same way, whenever possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16004) Review new consumer inflight offset commit logic
Lianet Magrans created KAFKA-16004: -- Summary: Review new consumer inflight offset commit logic Key: KAFKA-16004 URL: https://issues.apache.org/jira/browse/KAFKA-16004 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans New consumer logic for committing offsets handles inflight requests, to validate that no commit requests are sent if a previous one hasn't received a response. Review how that logic is currently applied to both, sync and async commits and validate against the legacy coordinator, who seems to apply it only for async commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15991) Flaky new consumer test testGroupIdNotNullAndValid
Lianet Magrans created KAFKA-15991: -- Summary: Flaky new consumer test testGroupIdNotNullAndValid Key: KAFKA-15991 URL: https://issues.apache.org/jira/browse/KAFKA-15991 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Fails locally when running it in a loop with it's latest changes from [https://github.com/apache/kafka/commit/6df192b6cb1397a6e6173835bbbd8a3acb7e3988.] Failed the build so temporarily disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15954) Review minimal effort approach on consumer last heartbeat on unsubscribe
Lianet Magrans created KAFKA-15954: -- Summary: Review minimal effort approach on consumer last heartbeat on unsubscribe Key: KAFKA-15954 URL: https://issues.apache.org/jira/browse/KAFKA-15954 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Currently the legacy and new consumer follows a minimal effort approach when sending a leave group (legacy) or last heartbeat request (new consumer). The request is sent without waiting/handling any response. This behaviour applies when the consumer is being closed or when it unsubscribes. For the case when the consumer is being closed, (which is a "terminal" state), it makes sense to just follow a minimal effort approach for "properly" leaving the group. But for the case of unsubscribe, it would maybe make sense to put a little more effort in making sure that the last heartbeat is sent and received by the broker. Note that unsubscribe could a temporary state, where the consumer might want to re-join the group at any time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15847) Allow to resolve client metadata for specific topics
Lianet Magrans created KAFKA-15847: -- Summary: Allow to resolve client metadata for specific topics Key: KAFKA-15847 URL: https://issues.apache.org/jira/browse/KAFKA-15847 Project: Kafka Issue Type: Bug Components: clients Reporter: Lianet Magrans Currently metadata updates requested through the metadata object request metadata for all topics. Consider allowing the partial updates that are already expressed as an intention in the Metadata class but not fully supported. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15846) Review consumer leave group request
Lianet Magrans created KAFKA-15846: -- Summary: Review consumer leave group request Key: KAFKA-15846 URL: https://issues.apache.org/jira/browse/KAFKA-15846 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans New consumer sends out a leave group request with a best effort approach. Transitions to LEAVING to indicate the HB manager that the request must be sent, but it does not do any response handling or retrying. After the first HB manager poll iteration while on LEAVING, the consumer transitions into UNSUBSCRIBE (no matter if the request was actually sent out or not, ex, due to coordinator not known). Review if this is good enough as an effort to send the request. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15843) Review consumer onPartitionsAssigned called with empty partitions
Lianet Magrans created KAFKA-15843: -- Summary: Review consumer onPartitionsAssigned called with empty partitions Key: KAFKA-15843 URL: https://issues.apache.org/jira/browse/KAFKA-15843 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Legacy coordinator triggers onPartitionsAssigned with empty assignment (which is not the case when triggering onPartitionsRevoked or Lost). This is the behaviour of the legacy coordinator, and the new consumer implementation maintains the same principle. We should review this to fully understand if it is really needed to call onPartitionsAssigned with empty assignment (or if it should behave consistently with the onRevoke/Lost) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15839) Review topic recreation handling in client reconciliation
Lianet Magrans created KAFKA-15839: -- Summary: Review topic recreation handling in client reconciliation Key: KAFKA-15839 URL: https://issues.apache.org/jira/browse/KAFKA-15839 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Ensure that we properly handle topic re-creation (same name, diff topic IDs) in the reconciliation process (assignment cache, same assignment comparison, etc.) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15835) Group commit/callbacks triggering logic
Lianet Magrans created KAFKA-15835: -- Summary: Group commit/callbacks triggering logic Key: KAFKA-15835 URL: https://issues.apache.org/jira/browse/KAFKA-15835 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans The new consumer reconciliation logic triggers a commit request, revocation callback and assignment callbacks sequentially to ensure that they are executed in that order. This means that we could require multiple iterations of the poll loop to complete reconciling an assignment. We could consider triggering them all together, to be executed in the same poll iteration, while still making sure that they are executed in the right order. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15832) Trigger reconciliation based on manager poll
Lianet Magrans created KAFKA-15832: -- Summary: Trigger reconciliation based on manager poll Key: KAFKA-15832 URL: https://issues.apache.org/jira/browse/KAFKA-15832 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Currently the reconciliation logic on the client is triggered when a new target assignment is received and resolved, or when new unresolved target assignments are discovered in metadata. This could be improved by triggering the reconciliation logic on each poll iteration, to reconcile whatever is ready to be reconciled. This would required changes to support poll on the MembershipManager, and integrate it with the current polling logic in the background thread. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15317) Fix for async consumer access to committed offsets with multiple consumers
[ https://issues.apache.org/jira/browse/KAFKA-15317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15317. Resolution: Fixed Failing test consistently passing after PR [https://github.com/apache/kafka/pull/14406] (potentially even before, given that the failing test was only in integration branch) > Fix for async consumer access to committed offsets with multiple consumers > -- > > Key: KAFKA-15317 > URL: https://issues.apache.org/jira/browse/KAFKA-15317 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor, kip-848-preview > > Access to the committed offsets via a call to the _committed_ API func works > as expected for a single async consumer, but it some times fails with timeout > when trying to retrieve the committed offsets with another consumer in the > same group (test testConsumeFromCommittedOffsets on BaseAsynConsumerTest) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15539) Client should stop fetching while partitions being revoked
[ https://issues.apache.org/jira/browse/KAFKA-15539?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15539. Resolution: Duplicate > Client should stop fetching while partitions being revoked > -- > > Key: KAFKA-15539 > URL: https://issues.apache.org/jira/browse/KAFKA-15539 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-preview > > When partitions are being revoked (client received revocation on heartbeat > and is in the process of invoking the callback), we need to make sure we do > not fetch from those partitions anymore: > * no new fetches should be sent out for the partitions being revoked > * no fetch responses should be handled for those partitions (case where a > fetch was already in-flight when the partition revocation started. > This does not seem to be handled in the current KafkaConsumer and the old > consumer protocol (only for the EAGER protocol). > Consider re-using the existing pendingRevocation logic that already exist in > the subscriptionState & used from the fetcher to determine if a partition is > fetchable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15573) Implement auto-commit on partition assignment revocation
[ https://issues.apache.org/jira/browse/KAFKA-15573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15573. Resolution: Duplicate > Implement auto-commit on partition assignment revocation > > > Key: KAFKA-15573 > URL: https://issues.apache.org/jira/browse/KAFKA-15573 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > When the group member's assignment changes and partitions are revoked and > auto-commit is enabled, we need to ensure that the commit request manager is > invoked to queue up the commits. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15679) Client support new consumer configs
Lianet Magrans created KAFKA-15679: -- Summary: Client support new consumer configs Key: KAFKA-15679 URL: https://issues.apache.org/jira/browse/KAFKA-15679 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans New consumer should support the new configs introduced by KIP-848 |group.protocol|enum|generic|A flag which indicates if the new protocol should be used or not. It could be: generic or consumer| |group.remote.assignor|string|null|The server side assignor to use. It cannot be used in conjunction with group.local.assignor. {{null}} means that the choice of the assignor is left to the group coordinator.| The protocol introduces a 3rd property for client side (local) assignors, but that will be introduced later on. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15631) Do not send new heartbeat request while another one in-flight
Lianet Magrans created KAFKA-15631: -- Summary: Do not send new heartbeat request while another one in-flight Key: KAFKA-15631 URL: https://issues.apache.org/jira/browse/KAFKA-15631 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Client consumer should not send a new heartbeat request while there is a previous in-flight. If a HB is in-flight, we should wait for a response or timeout before sending a next one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15540) Handle heartbeat and revocation when consumer leaves group
[ https://issues.apache.org/jira/browse/KAFKA-15540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15540. Resolution: Duplicate Duplicate of KAKFA-15548 > Handle heartbeat and revocation when consumer leaves group > -- > > Key: KAFKA-15540 > URL: https://issues.apache.org/jira/browse/KAFKA-15540 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support, kip-848-e2e, > kip-848-preview > > When a consumer intentionally leaves a group we should: > * release assignment (revoke partitions) > * send a last Heartbeat request with epoch -1 (or -2 if static member) > Note that the revocation involves stop fetching, committing offsets if > auto-commit enabled and invoking the onPartitionsRevoked callback. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15561) Client support for new SubscriptionPattern based subscription
Lianet Magrans created KAFKA-15561: -- Summary: Client support for new SubscriptionPattern based subscription Key: KAFKA-15561 URL: https://issues.apache.org/jira/browse/KAFKA-15561 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans New consumer should support subscribe with the new SubscriptionPattern introduced in the new consumer group protocol. When subscribing with this regex, the client should provide the regex in the HB request on the SubscribedTopicRegex field, delegating the resolution to the server. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15554) Update client state machine to align with protocol sending one assignment at a time
Lianet Magrans created KAFKA-15554: -- Summary: Update client state machine to align with protocol sending one assignment at a time Key: KAFKA-15554 URL: https://issues.apache.org/jira/browse/KAFKA-15554 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans The new consumer group protocol will be sending one assignment to the member, and wait for an ack or timeout before sending any other assignment. The client state machine will be updated so that it takes on new target assignment only if there is no other in process. If a new target assignment is received in this case, the member should fail with a fatal error, as this is not expected in the protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15275) Implement consumer group membership state machine
[ https://issues.apache.org/jira/browse/KAFKA-15275?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15275. Resolution: Fixed > Implement consumer group membership state machine > - > > Key: KAFKA-15275 > URL: https://issues.apache.org/jira/browse/KAFKA-15275 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Kirk True >Assignee: Lianet Magrans >Priority: Major > Labels: kip-848, kip-848-client-support > > Provide the Java client support for the consumer group member state machine, > including: > * Define the states of the client member, based on the heartbeat > {{ConsumerGroupHeartbeat}} data structure & state transitions > * Determine the valid transitions between those states > * Provide functions to update state on successful and failed HB responses > The state machine won't do any error handling, it's just responsible for > doing the appropriate state transitions and keeping the member info (id, > epoch, assignment) > This task is part of the work to implement support for the new KIP-848 > consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15550) OffsetsForTimes validation for negative timestamps in new consumer
Lianet Magrans created KAFKA-15550: -- Summary: OffsetsForTimes validation for negative timestamps in new consumer Key: KAFKA-15550 URL: https://issues.apache.org/jira/browse/KAFKA-15550 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans OffsetsForTimes api call should fail with `IllegalArgumentException` if negative timestamps are provided as arguments, keeping the current behaviour of the KafkaConsumer -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15544) Enable existing client integration tests for new protocol
Lianet Magrans created KAFKA-15544: -- Summary: Enable existing client integration tests for new protocol Key: KAFKA-15544 URL: https://issues.apache.org/jira/browse/KAFKA-15544 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Enable & validate integration tests defined in `PlaintextAsyncConsumerTest`, that are currently disabled waiting for the client to fully support the new consumer group protocol. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15543) Send HB request right after reconciliation completes
Lianet Magrans created KAFKA-15543: -- Summary: Send HB request right after reconciliation completes Key: KAFKA-15543 URL: https://issues.apache.org/jira/browse/KAFKA-15543 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans HeartbeatRequest manager should send HB request outside of the interval, right after the reconciliation process completes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15542) Release member assignments on errors
Lianet Magrans created KAFKA-15542: -- Summary: Release member assignments on errors Key: KAFKA-15542 URL: https://issues.apache.org/jira/browse/KAFKA-15542 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans Member should release assignment by triggering the onPartitionsLost flow from the HB manager when errors occur (both fencing and unrecoverable errors) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15438) Review exception caching logic used for reset/validate positions in async consumer
[ https://issues.apache.org/jira/browse/KAFKA-15438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15438. Resolution: Fixed > Review exception caching logic used for reset/validate positions in async > consumer > -- > > Key: KAFKA-15438 > URL: https://issues.apache.org/jira/browse/KAFKA-15438 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Priority: Minor > Labels: consumer-threading-refactor > > The refactored async consumer reuses part of the core logic required for > resetting and validating positions. That currently works on the principle of > async requests, that reset/validate positions when responses are received. If > the responses include errors, or if a validation verification fails (ex. log > truncation detected), exceptions are saved in-memory, to be thrown on the > next call to the reset/validate. Note that these functionalities are > periodically called as part of the poll loop to update fetch positions before > fetching records. > > As an initial implementation, the async consumer reuses this same caching > logic, as it has the asyn nature required. Keeping this caching logic ensure > that we maintaint the timing of the exceptions thrown for reset/validate > (they are currently not thrown when discovered, instead they are thrown on > the next call to reset/validate). This task aims at reviewing the > implications of changing this behaviour, and rely on the completion of the > Reset and Validate events instead, to propagate the errors found. Note that > this would happen closely inter-wined with the continued poll loop, that may > have already issued a new reset/validate. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15164) Extract reusable logic from OffsetsForLeaderEpochClient
[ https://issues.apache.org/jira/browse/KAFKA-15164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lianet Magrans resolved KAFKA-15164. Resolution: Fixed Included in PR https://github.com/apache/kafka/pull/14346 > Extract reusable logic from OffsetsForLeaderEpochClient > --- > > Key: KAFKA-15164 > URL: https://issues.apache.org/jira/browse/KAFKA-15164 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lianet Magrans >Priority: Major > Labels: consumer-threading-refactor > > The OffsetsForLeaderEpochClient class is used for making asynchronous > requests to the OffsetsForLeaderEpoch API. It encapsulates the logic for: > * preparing the requests > * sending them over the network using the network client > * handling the response > The new KafkaConsumer implementation, based on a new threading model, > requires the same logic for preparing the requests and handling the > responses, with different behaviour for how the request is actually sent. > This task includes refactoring OffsetsForLeaderEpochClient by extracting out > the logic for preparing the requests and handling the responses. No changes > in the existing logic, just making the functionality available to be reused. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15540) Handle heartbeat and assignment release when consumer leaves group
Lianet Magrans created KAFKA-15540: -- Summary: Handle heartbeat and assignment release when consumer leaves group Key: KAFKA-15540 URL: https://issues.apache.org/jira/browse/KAFKA-15540 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans When a consumer intentionally leaves a group we should: * release assignment (revoke partitions) * send a last Heartbeat request with epoch -1 (or -2 if static member) Note that the revocation involves stop fetching, committing offsets if auto-commit enabled and invoking the onPartitionsRevoked callback. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15539) Stop fetching while partitions being revoked
Lianet Magrans created KAFKA-15539: -- Summary: Stop fetching while partitions being revoked Key: KAFKA-15539 URL: https://issues.apache.org/jira/browse/KAFKA-15539 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans When partitions are being revoked (client received revocation on heartbeat and is in the process of invoking the callback), we need to make sure we do not fetch from those partitions anymore: * no new fetches should be sent out for the partitions being revoked * no fetch responses should be handled for those partitions (case where a fetch was already in-flight when the partition revocation started. This does not seem to be handled in the current KafkaConsumer and the old consumer protocol (only for the EAGER protocol). Consider re-using the existing pendingRevocation logic that already exist in the subscriptionState & used from the fetcher to determine if a partition is fetchable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15538) Resolve regex on client side when using java regex in new consumer
Lianet Magrans created KAFKA-15538: -- Summary: Resolve regex on client side when using java regex in new consumer Key: KAFKA-15538 URL: https://issues.apache.org/jira/browse/KAFKA-15538 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans We need to resolve a java Pattern regex on the client side to send the broker a list of topic names to subscribe to. Context: The new consumer group protocol uses [Google RE2/J|https://github.com/google/re2j] for regular expressions and introduces new methods in the consumer API to subscribe using a `SubscribePattern`. The subscribe using a java `Pattern` will be still supported for a while but eventually removed. * When the subscribe with SubscriptionPattern is used, the client should just send the regex to the broker and it will be resolved on the server side. * In the case of the subscribe with Pattern, the regex should be resolved on the client side. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15515) Remove duplicated integration tests for new consumer
Lianet Magrans created KAFKA-15515: -- Summary: Remove duplicated integration tests for new consumer Key: KAFKA-15515 URL: https://issues.apache.org/jira/browse/KAFKA-15515 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans This task involves removing the temporary `PlaintextAsyncConsumer` file containing duplicated integration tests for the new consumer. The copy was generated to catch regressions and validate functionality in the new consumer while in development. It should be deleted when the new consumer is fully implemented and the existing integration tests (`PlaintextConsumerTest`) can be executed for both implementations. Context: For the current KafkaConsumer, a set of integration tests exist in the file PlaintextConsumerTest. Those tests cannot be executed as such for the new consumer implementation for 2 main reasons - the new consumer is being developed as a new PrototypeAsyncConsumer class, in parallel to the existing KafkaConsumer. - the new consumer is under development, so it does not support all the consumer functionality yet. In order to be able to run the subsets of tests that the new consumer supports while the implementation completes, it was decided to : - to make a copy of the `PlaintextAsyncConsumer` class, named PlaintextAsyncConsumer. - leave all the existing integration tests that cover the simple consumer case unchanged, and disable the tests that are not yet supported by the new consumer. Disabled tests will be enabled as the async consumer evolves. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15438) Review exception caching logic used for reset/validate positions in async consumer
Lianet Magrans created KAFKA-15438: -- Summary: Review exception caching logic used for reset/validate positions in async consumer Key: KAFKA-15438 URL: https://issues.apache.org/jira/browse/KAFKA-15438 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans The refactored async consumer reuses part of the core logic required for resetting and validating positions. That currently works on the principle of async requests, that reset/validate positions when responses are received. If the responses include errors, or if a validation verification fails (ex. log truncation detected), exceptions are saved in-memory, to be thrown on the next call to the reset/validate. Note that these functionalities are periodically called as part of the poll loop to update fetch positions before fetching records. As an initial implementation, the async consumer reuses this same caching logic, as it has the asyn nature required. This task aims at reviewing the processing of `ResetApplicationEvent `and `ValidatePositionsApplicationEvent` to evaluate if they should rely on event completion instead, to propagate the errors found. It would align with how other application events manage async requests and responses/errors for the new async consumer (based on CompletableFutures), but with the trade-off of heavily changing a caching logic that is currently reused by the legacy and the new consumer in the OffsetFetcherUtils. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15327) Async consumer should commit offsets on close
Lianet Magrans created KAFKA-15327: -- Summary: Async consumer should commit offsets on close Key: KAFKA-15327 URL: https://issues.apache.org/jira/browse/KAFKA-15327 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Lianet Magrans In the current implementation of the KafkaConsumer, the ConsumerCoordinator commits offsets before the consumer is closed, with a call to maybeAutoCommitOffsetsSync(timer); The async consumer should provide the same behaviour to commit offsets on close. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15325) Integrate topicId in OffsetFetch and OffsetCommit async consumer calls
Lianet Magrans created KAFKA-15325: -- Summary: Integrate topicId in OffsetFetch and OffsetCommit async consumer calls Key: KAFKA-15325 URL: https://issues.apache.org/jira/browse/KAFKA-15325 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans KIP-848 introduces support for topicIds in the OffsetFetch and OffsetCommit APIs. The consumer calls to those APIs should be updated to include topicIds when available. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15317) Fix for async consumer access to committed offsets with multiple consumers
Lianet Magrans created KAFKA-15317: -- Summary: Fix for async consumer access to committed offsets with multiple consumers Key: KAFKA-15317 URL: https://issues.apache.org/jira/browse/KAFKA-15317 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Access to the committed offsets via a call to the _committed_ API func works as expected for a single async consumer, but it some times fails with timeout when trying to retrieve the committed offsets with another consumer in the same group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15316) CommitRequestManager not calling RequestState callbacks
Lianet Magrans created KAFKA-15316: -- Summary: CommitRequestManager not calling RequestState callbacks Key: KAFKA-15316 URL: https://issues.apache.org/jira/browse/KAFKA-15316 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Lianet Magrans CommitRequestManager is not triggering the RequestState callbacks that update {_}lastReceivedMs{_}, affecting the _canSendRequest_ verification of the RequestState -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15306) Integrate committed offsets logic when updating fetching positions
Lianet Magrans created KAFKA-15306: -- Summary: Integrate committed offsets logic when updating fetching positions Key: KAFKA-15306 URL: https://issues.apache.org/jira/browse/KAFKA-15306 Project: Kafka Issue Type: Task Reporter: Lianet Magrans Assignee: Lianet Magrans Integrate refreshCommittedOffsets logic, currently performed by the coordinator, into the update fetch positions performed on every iteration of the consumer poll loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15270) Integration tests for AsyncConsumer simple consume case
Lianet Magrans created KAFKA-15270: -- Summary: Integration tests for AsyncConsumer simple consume case Key: KAFKA-15270 URL: https://issues.apache.org/jira/browse/KAFKA-15270 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans This task involves writing integration tests for covering the simple consume functionality of the AsyncConsumer. This should include validation of the assign, fetch and positions logic. Not covering any committed offset functionality as part of this task. Integration tests should have a similar form as the existing PlaintextConsumerTest, but scoped to the simple consume flow. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15164) Extract reusable logic from OffsetsForLeaderEpochClient
Lianet Magrans created KAFKA-15164: -- Summary: Extract reusable logic from OffsetsForLeaderEpochClient Key: KAFKA-15164 URL: https://issues.apache.org/jira/browse/KAFKA-15164 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans The OffsetsForLeaderEpochClient class is used for making asynchronous requests to the OffsetsForLeaderEpoch API. It encapsulates the logic for: * preparing the requests * sending them over the network using the network client * handling the response The new KafkaConsumer implementation, based on a new threading model, requires the same logic for preparing the requests and handling the responses, with different behaviour for how the request is actually sent. This task includes refactoring OffsetsForLeaderEpochClient by extracting out the logic for preparing the requests and handling the responses. No changes in the existing logic, just making the functionality available to be reused. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15163) Implement validatePositions functionality for new KafkaConsumer
Lianet Magrans created KAFKA-15163: -- Summary: Implement validatePositions functionality for new KafkaConsumer Key: KAFKA-15163 URL: https://issues.apache.org/jira/browse/KAFKA-15163 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Introduce support for resetting positions in the new ListOffsetsRequestManager. This task will include a new event for the resetPositions calls performed from the new consumer, and the logic for handling such events in the ListOffsetRequestManager. The reset positions implementation will keep the same behaviour as the one in the old consumer, but adapted to the new threading model. So it is based in a RESET_POSITIONS events that is submitted to the background thread, and the processed by the ApplicationEventProcessor. The processing itself is done by the ListOffsetRequestManager given that this will require a LIST_OFFSETS request for the partitions awaiting reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15115) Implement resetPositions functionality in ListOffsetRequestManager
Lianet Magrans created KAFKA-15115: -- Summary: Implement resetPositions functionality in ListOffsetRequestManager Key: KAFKA-15115 URL: https://issues.apache.org/jira/browse/KAFKA-15115 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Introduce support for resetting positions in the new ListOffsetsRequestManager. This task will include a new event for the resetPositions calls performed from the new consumer, and the logic for handling such events in the ListOffsetRequestManager. The reset positions implementation will keep the same behaviour as the one in the old consumer, but adapted to the new threading model. So it is based in a RESET_POSITIONS events that is submitted to the background thread, and the processed by the ApplicationEventProcessor. The processing itself is done by the ListOffsetRequestManager given that this will require a LIST_OFFSETS request for the partitions awaiting reset. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15081) Implement new consumer offsetsForTimes
Lianet Magrans created KAFKA-15081: -- Summary: Implement new consumer offsetsForTimes Key: KAFKA-15081 URL: https://issues.apache.org/jira/browse/KAFKA-15081 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans Assignee: Lianet Magrans Implement offsetForTimes for the kafka consumer based on the new threading model, using the ListOffsetsRequestManager -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14966) Extract reusable common logic from OffsetFetcher
Lianet Magrans created KAFKA-14966: -- Summary: Extract reusable common logic from OffsetFetcher Key: KAFKA-14966 URL: https://issues.apache.org/jira/browse/KAFKA-14966 Project: Kafka Issue Type: Sub-task Components: clients, consumer Reporter: Lianet Magrans The OffsetFetcher is internally used by the KafkaConsumer to fetch offsets, validate and reset positions. For the new consumer based on a refactored threading model, similar functionality will be needed by the ListOffsetsRequestManager component. This task aims at identifying and extracting the OffsetFetcher functionality that can be reused by the new consumer implementation. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14965) Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new consumer threading refactor
Lianet Magrans created KAFKA-14965: -- Summary: Introduce ListOffsetsRequestManager to integrate ListOffsetsRequests into new consumer threading refactor Key: KAFKA-14965 URL: https://issues.apache.org/jira/browse/KAFKA-14965 Project: Kafka Issue Type: Task Components: clients, consumer Reporter: Lianet Magrans This task introduces new functionality for handling ListOffsetsRequests for the new consumer implementation, as part for the ongoing work for the consumer threading model refactor. This task introduces a new class named {{ListOffsetsRequestManager, }}responsible of handling ListOffsets requests performed by the consumer to expose functionality like beginningOffsets, endOffsets and offsetsForTimes. The Offset{{{}Fetcher{}}} class is used internally by the {{KafkaConsumer}} to list offsets, so this task will be based on a refactored Offset{{{}Fetcher{}}}, reusing the fetching logic as much as possible. -- This message was sent by Atlassian Jira (v8.20.10#820010)