[GitHub] [kafka] vinothchandar commented on issue #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()
vinothchandar commented on issue #8462: URL: https://github.com/apache/kafka/pull/8462#issuecomment-617567564 @guozhangwang So, I still need to add a test case around this specific scenario, tasks stuck in created state.. Seems it needs some engineering to create that scenario. (sophie gave me some pointers, yet to try them).. if we need to just fix the test issue, I can open a simpler one with just the test fix.. That's probably better? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar commented on a change in pull request #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()
vinothchandar commented on a change in pull request #8462: URL: https://github.com/apache/kafka/pull/8462#discussion_r412690139 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java ## @@ -295,9 +295,10 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store } }); -restartedStreams.start(); +// Wait till the restarted instance reaches running, after restoring + startApplicationAndWaitUntilRunning(Collections.singletonList(restartedStreams), Duration.ofSeconds(60)); Review comment: this is the actual fix for flaky test .. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9901) TimeoutError: Never saw message indicating StreamsTest finished startup on ducker@ducker07
jiamei xie created KAFKA-9901: - Summary: TimeoutError: Never saw message indicating StreamsTest finished startup on ducker@ducker07 Key: KAFKA-9901 URL: https://issues.apache.org/jira/browse/KAFKA-9901 Project: Kafka Issue Type: Bug Components: streams, system tests Reporter: jiamei xie Assignee: jiamei xie When running _DUCKTAPE_OPTIONS="--debug" TC_PATHS="tests/kafkatest/tests/streams/streams_broker_bounce_test.py::StreamsBrokerBounceTest.test_all_brokers_bounce" bash tests/docker/run_tests.sh | tee debug_logs.txt It failed because of below error. TimeoutError: Never saw message indicating StreamsTest finished startup on ducker@ducker07 https://github.com/apache/kafka/pull/8443 updated the constructor of StreamsSmokeTestJobRunnerService. But it wasn't updated in streams_broker_bounce_test -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
ijuma commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r412680581 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,19 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) Review comment: Do we need this? Can we not pass the `time` from every caller? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc opened a new pull request #8528: Changed the system tests for --zookeeper flag removal
d8tltanc opened a new pull request #8528: URL: https://github.com/apache/kafka/pull/8528 * Remove the --zookeeper flags for node versions supporting --bootstrap-server for TopicCommand. For the scram credential related code piece, switched to using --bootstrap-server even we don't support it yet. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ConcurrencyPractitioner commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal
ConcurrencyPractitioner commented on issue #7884: URL: https://github.com/apache/kafka/pull/7884#issuecomment-617537261 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ConcurrencyPractitioner commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal
ConcurrencyPractitioner commented on issue #7884: URL: https://github.com/apache/kafka/pull/7884#issuecomment-617537213 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] senthilm-ms commented on issue #8103: KAFKA-7061: KIP-280 Enhanced log compaction
senthilm-ms commented on issue #8103: URL: https://github.com/apache/kafka/pull/8103#issuecomment-617535197 @junrao @guozhangwang can you please review and let me know if you have any more comments. would like to close and move on. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.
[ https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenxin updated KAFKA-9900: --- Attachment: leak3.jpg > Expired Sensor can not be remove,and it fills all the old generation,can't be > GC. > - > > Key: KAFKA-9900 > URL: https://issues.apache.org/jira/browse/KAFKA-9900 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0, 2.1.1 >Reporter: Chenxin >Priority: Critical > Attachments: Leak1.jpg, leak2.jpg, leak3.jpg > > > I tried kafka-client 2.1.0 and 2.1.1, but it exists both. > I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's > in old Gen at last,and can not be GC. > I want to disable the Metrics Collecotor, but I didn't find a way. Is this a > problem? Or just my wrong use case? > > I'am a Chinesse, not very good at english. > > Thanks. > > !leak2.jpg|width=612,height=227! > !Leak1.jpg|width=354,height=286! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.
[ https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenxin updated KAFKA-9900: --- Description: I tried kafka-client 2.1.0 and 2.1.1, but it exists both. I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's in old Gen at last,and can not be GC. I want to disable the Metrics Collecotor, but I didn't find a way. Is this a problem? Or just my wrong use case? I'am a Chinesse, not very good at english. Thanks. !leak2.jpg|width=612,height=227! !Leak1.jpg|width=354,height=286! There is another picture by jvisualvm 。 !leak3.jpg|width=1998,height=85! was: I tried kafka-client 2.1.0 and 2.1.1, but it exists both. I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's in old Gen at last,and can not be GC. I want to disable the Metrics Collecotor, but I didn't find a way. Is this a problem? Or just my wrong use case? I'am a Chinesse, not very good at english. Thanks. !leak2.jpg|width=612,height=227! !Leak1.jpg|width=354,height=286! > Expired Sensor can not be remove,and it fills all the old generation,can't be > GC. > - > > Key: KAFKA-9900 > URL: https://issues.apache.org/jira/browse/KAFKA-9900 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0, 2.1.1 >Reporter: Chenxin >Priority: Critical > Attachments: Leak1.jpg, leak2.jpg, leak3.jpg > > > I tried kafka-client 2.1.0 and 2.1.1, but it exists both. > I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's > in old Gen at last,and can not be GC. > I want to disable the Metrics Collecotor, but I didn't find a way. Is this a > problem? Or just my wrong use case? > > I'am a Chinesse, not very good at english. > > Thanks. > > !leak2.jpg|width=612,height=227! > !Leak1.jpg|width=354,height=286! > > There is another picture by jvisualvm 。 > !leak3.jpg|width=1998,height=85! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] d8tltanc opened a new pull request #8527: Splitted unit tests for --zookeeper flag removal
d8tltanc opened a new pull request #8527: URL: https://github.com/apache/kafka/pull/8527 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on issue #8482: KAFKA-9863: update the deprecated --zookeeper option in the documentation into --bootstrap-server
showuon commented on issue #8482: URL: https://github.com/apache/kafka/pull/8482#issuecomment-617529478 @junrao @omkreddy , could you please help review this PR? This is a small and straightforward change to update the documentation only, but I think it's important to users. Thank you very much. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
apovzner commented on issue #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-617525619 @dajac Sounds good! I added unit tests to KafkaApisTest as you suggested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.
[ https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenxin updated KAFKA-9900: --- Description: I tried kafka-client 2.1.0 and 2.1.1, but it exists both. I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's in old Gen at last,and can not be GC. I want to disable the Metrics Collecotor, but I didn't find a way. Is this a problem? Or just my wrong use case? I'am a Chinesse, not very good at english. Thanks. !leak2.jpg|width=612,height=227! !Leak1.jpg|width=354,height=286! was: !leak2.jpg|width=612,height=227! !Leak1.jpg|width=354,height=286! > Expired Sensor can not be remove,and it fills all the old generation,can't be > GC. > - > > Key: KAFKA-9900 > URL: https://issues.apache.org/jira/browse/KAFKA-9900 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0, 2.1.1 >Reporter: Chenxin >Priority: Critical > Attachments: Leak1.jpg, leak2.jpg > > > I tried kafka-client 2.1.0 and 2.1.1, but it exists both. > I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's > in old Gen at last,and can not be GC. > I want to disable the Metrics Collecotor, but I didn't find a way. Is this a > problem? Or just my wrong use case? > > I'am a Chinesse, not very good at english. > > Thanks. > > !leak2.jpg|width=612,height=227! > !Leak1.jpg|width=354,height=286! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.
[ https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenxin updated KAFKA-9900: --- Attachment: leak2.jpg > Expired Sensor can not be remove,and it fills all the old generation,can't be > GC. > - > > Key: KAFKA-9900 > URL: https://issues.apache.org/jira/browse/KAFKA-9900 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0, 2.1.1 >Reporter: Chenxin >Priority: Critical > Attachments: Leak1.jpg, leak2.jpg > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.
[ https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenxin updated KAFKA-9900: --- Description: !leak2.jpg|width=612,height=227! !Leak1.jpg|width=354,height=286! > Expired Sensor can not be remove,and it fills all the old generation,can't be > GC. > - > > Key: KAFKA-9900 > URL: https://issues.apache.org/jira/browse/KAFKA-9900 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0, 2.1.1 >Reporter: Chenxin >Priority: Critical > Attachments: Leak1.jpg, leak2.jpg > > > > !leak2.jpg|width=612,height=227! > !Leak1.jpg|width=354,height=286! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.
[ https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenxin updated KAFKA-9900: --- Attachment: Leak1.jpg > Expired Sensor can not be remove,and it fills all the old generation,can't be > GC. > - > > Key: KAFKA-9900 > URL: https://issues.apache.org/jira/browse/KAFKA-9900 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0, 2.1.1 >Reporter: Chenxin >Priority: Critical > Attachments: Leak1.jpg, leak2.jpg > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.
Chenxin created KAFKA-9900: -- Summary: Expired Sensor can not be remove,and it fills all the old generation,can't be GC. Key: KAFKA-9900 URL: https://issues.apache.org/jira/browse/KAFKA-9900 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.1.1, 2.1.0 Reporter: Chenxin Attachments: Leak1.jpg -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r412633489 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java ## @@ -547,17 +547,15 @@ public void testMaybeCompleteValidationAfterOffsetReset() { int initialOffsetEpoch = 5; SubscriptionState.FetchPosition initialPosition = new SubscriptionState.FetchPosition(initialOffset, -Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch))); +Optional.of(initialOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch))); Review comment: Only side cleanups This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r412633349 ## File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ## @@ -199,19 +200,21 @@ public void testIgnoreLeaderEpochInOlderMetadataResponse() { MetadataResponse response = new MetadataResponse(struct, version); assertFalse(response.hasReliableLeaderEpochs()); metadata.updateWithCurrentRequestVersion(response, false, 100); +assertFalse(metadata.hasReliableLeaderEpochs()); Review comment: Added test coverage L203 and L214, other changes are just side cleanups and signature refactoring in this file. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r412632269 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java ## @@ -85,10 +85,6 @@ protected OffsetForEpochResult handleResponse( case KAFKA_STORAGE_ERROR: case OFFSET_NOT_AVAILABLE: case LEADER_NOT_AVAILABLE: -logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", Review comment: Side cleanup This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r412632493 ## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ## @@ -464,22 +466,41 @@ public static MetadataResponse prepareResponse(int throttleTimeMs, Collection
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r412632092 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() { validateOffsetsAsync(partitionsToValidate); } +/** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. + */ +private void validateOffsetsAsync(Map partitionsToValidate) { +final Map> regrouped = +regroupFetchPositionsByLeader(partitionsToValidate); + +regrouped.forEach((node, fetchPositions) -> { +if (node.isEmpty()) { +metadata.requestUpdate(); +return; +} + +NodeApiVersions nodeApiVersions = apiVersions.get(node.idString()); +if (nodeApiVersions == null) { +client.tryConnect(node); +return; +} + +if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { +log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " + + "support the required protocol version (introduced in Kafka 2.3)", +fetchPositions.keySet()); +completeAllValidations(fetchPositions); +return; +} + +// We need to get the client epoch state before sending out the leader epoch request, and use it to +// decide whether we need to validate offsets. +if (!metadata.hasReliableLeaderEpochs()) { +log.debug("Skipping validation of fetch offsets for partitions {} since the provided leader broker " + + "is not reliable", fetchPositions.keySet()); +completeAllValidations(fetchPositions); +return; +} + +subscriptions.setNextAllowedRetry(fetchPositions.keySet(), time.milliseconds() + requestTimeoutMs); + +RequestFuture future = +offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions); + +future.addListener(new RequestFutureListener() { +@Override +public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) { +Map truncationWithoutResetPolicy = new HashMap<>(); +if (!offsetsResult.partitionsToRetry().isEmpty()) { + subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs); +metadata.requestUpdate(); +} + +// For each OffsetsForLeader response, check if the end-offset is lower than our current offset +// for the partition. If so, it means we have experienced log truncation and need to reposition +// that partition's offset. +// +// In addition, check whether the returned offset and epoch are valid. If not, then we should treat +// it as out of range and update metadata for rediscovery. +offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> { +if (respEndOffset.hasUndefinedEpochOrOffset()) { Review comment: Change two: do not complete the validation as the returned epoch or offset is invalid. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r412631698 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() { validateOffsetsAsync(partitionsToValidate); } +/** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. + */ +private void validateOffsetsAsync(Map partitionsToValidate) { Review comment: Move the function closer to its caller. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r412631879 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() { validateOffsetsAsync(partitionsToValidate); } +/** + * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition + * with the epoch less than or equal to the epoch the partition last saw. + * + * Requests are grouped by Node for efficiency. + */ +private void validateOffsetsAsync(Map partitionsToValidate) { +final Map> regrouped = +regroupFetchPositionsByLeader(partitionsToValidate); + +regrouped.forEach((node, fetchPositions) -> { +if (node.isEmpty()) { +metadata.requestUpdate(); +return; +} + +NodeApiVersions nodeApiVersions = apiVersions.get(node.idString()); +if (nodeApiVersions == null) { +client.tryConnect(node); +return; +} + +if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { +log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " + + "support the required protocol version (introduced in Kafka 2.3)", +fetchPositions.keySet()); +completeAllValidations(fetchPositions); +return; +} + +// We need to get the client epoch state before sending out the leader epoch request, and use it to +// decide whether we need to validate offsets. +if (!metadata.hasReliableLeaderEpochs()) { Review comment: Change one: immediately complete the validation when the leader epoch is not reliable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-2419) Allow certain Sensors to be garbage collected after inactivity
[ https://issues.apache.org/jira/browse/KAFKA-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenxin updated KAFKA-2419: --- Attachment: leak2.jpg > Allow certain Sensors to be garbage collected after inactivity > -- > > Key: KAFKA-2419 > URL: https://issues.apache.org/jira/browse/KAFKA-2419 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0 >Reporter: Aditya Auradkar >Assignee: Aditya Auradkar >Priority: Blocker > Labels: quotas > Fix For: 0.9.0.0 > > Attachments: Leak1, Leak1.jpg, leak2.jpg > > > Currently, metrics cannot be removed once registered. > Implement a feature to remove certain sensors after a certain period of > inactivity (perhaps configurable). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-2419) Allow certain Sensors to be garbage collected after inactivity
[ https://issues.apache.org/jira/browse/KAFKA-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089231#comment-17089231 ] Chenxin commented on KAFKA-2419: Execuse me. Is this bug fix now? I just see a ExpireSensorTask in Metrics, but nowhere can use. Then I see beyond 800,000 sensors existed in my application, and it is in Old generation ,can't be GC. I use kafka-clients version 2.1.1 . !Leak1.jpg|width=217,height=175! !leak2.jpg|width=782,height=290! > Allow certain Sensors to be garbage collected after inactivity > -- > > Key: KAFKA-2419 > URL: https://issues.apache.org/jira/browse/KAFKA-2419 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0 >Reporter: Aditya Auradkar >Assignee: Aditya Auradkar >Priority: Blocker > Labels: quotas > Fix For: 0.9.0.0 > > Attachments: Leak1, Leak1.jpg, leak2.jpg > > > Currently, metrics cannot be removed once registered. > Implement a feature to remove certain sensors after a certain period of > inactivity (perhaps configurable). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-2419) Allow certain Sensors to be garbage collected after inactivity
[ https://issues.apache.org/jira/browse/KAFKA-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenxin updated KAFKA-2419: --- Attachment: Leak1.jpg > Allow certain Sensors to be garbage collected after inactivity > -- > > Key: KAFKA-2419 > URL: https://issues.apache.org/jira/browse/KAFKA-2419 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0 >Reporter: Aditya Auradkar >Assignee: Aditya Auradkar >Priority: Blocker > Labels: quotas > Fix For: 0.9.0.0 > > Attachments: Leak1, Leak1.jpg, leak2.jpg > > > Currently, metrics cannot be removed once registered. > Implement a feature to remove certain sensors after a certain period of > inactivity (perhaps configurable). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-2419) Allow certain Sensors to be garbage collected after inactivity
[ https://issues.apache.org/jira/browse/KAFKA-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chenxin updated KAFKA-2419: --- Attachment: Leak1 > Allow certain Sensors to be garbage collected after inactivity > -- > > Key: KAFKA-2419 > URL: https://issues.apache.org/jira/browse/KAFKA-2419 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.9.0.0 >Reporter: Aditya Auradkar >Assignee: Aditya Auradkar >Priority: Blocker > Labels: quotas > Fix For: 0.9.0.0 > > Attachments: Leak1 > > > Currently, metrics cannot be removed once registered. > Implement a feature to remove certain sensors after a certain period of > inactivity (perhaps configurable). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] iceChen8123 commented on issue #233: KAFKA-2419; Garbage collect unused sensors
iceChen8123 commented on issue #233: URL: https://github.com/apache/kafka/pull/233#issuecomment-617512892 Execuse me. Is this bug fix now? I just see a ExpireSensorTask in Metrics, but nowhere can use. Then I see beyond 800,000 sensors existed in my application, and it is in Old generation ,can't be GC. I use kafka-clients version 2.1.1 . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h
[ https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089214#comment-17089214 ] Matthias J. Sax commented on KAFKA-8924: {quote}there is no sensible default grace period {quote} This is of course subjective, but I would be personally fine with a default of zero. Even if not very common, there are use-cases for which data is actually ordered (in fact, each time when one uses wall clock time); thus, I don't consider it "crazy" to use a default of zero. Furthermore, for simple demos and when people start to build their first apps, it seem desirable to have a few mandatory parameters as possible to make it easier to get started? I see your argument that for a real production use case, one might want to set a custom grace-period with high probability and thus, I could be convinced to make it mandatory. It's just a slight preference of mine to use a default of zero. What I like most about the proposal is, that we could just fix it in 2.7. We should have thought about this earlier and should have fix it in the same release `suppress()` was introduces. Well, better a little delayed than even more delayed (ie, to 3.0). It would be great to get a KIP for this and get it into 2.7, release. > Default grace period (-1) of TimeWindows causes suppress to emit events after > 24h > - > > Key: KAFKA-8924 > URL: https://issues.apache.org/jira/browse/KAFKA-8924 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Michał >Assignee: Michał >Priority: Major > Labels: needs-kip > > h2. Problem > The default creation of TimeWindows, like > {code} > TimeWindows.of(ofMillis(xxx)) > {code} > calls an internal constructor > {code} > return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS); > {code} > And the *-1* parameter is the default grace period which I think is here for > backward compatibility > {code} > @SuppressWarnings("deprecation") // continuing to support > Windows#maintainMs/segmentInterval in fallback mode > @Override > public long gracePeriodMs() { > // NOTE: in the future, when we remove maintainMs, > // we should default the grace period to 24h to maintain the default > behavior, > // or we can default to (24h - size) if you want to be super accurate. > return graceMs != -1 ? graceMs : maintainMs() - size(); > } > {code} > The problem is that if you use a TimeWindows with gracePeriod of *-1* > together with suppress *untilWindowCloses*, it never emits an event. > You can check the Suppress tests > (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where > [~vvcephei] was (maybe) aware of that and all the scenarios specify the > gracePeriod. > I will add a test without it on my branch and it will fail. > The test: > https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db > > h2. Now what can be done > One easy fix would be to change the default value to 0, which works fine for > me in my project, however, I am not aware of the impact it would have done > due to the changes in the *gracePeriodMs* method mentioned before. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9898) Flaky Test StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-9898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-9898: --- Component/s: unit tests > Flaky Test StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores > --- > > Key: KAFKA-9898 > URL: https://issues.apache.org/jira/browse/KAFKA-9898 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.6.0 >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test, unit-test > > h3. Stacktrace > java.lang.AssertionError: Expected: is but: was at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores(StoreQueryIntegrationTest.java:233) > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5900/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQueryAllStalePartitionStores/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on issue #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()
guozhangwang commented on issue #8462: URL: https://github.com/apache/kafka/pull/8462#issuecomment-617488326 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] andrewchoi5 commented on issue #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception
andrewchoi5 commented on issue #8479: URL: https://github.com/apache/kafka/pull/8479#issuecomment-617479063 Thanks for referring Matthias. Would appreciate your review @hachikuji @cmccabe This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-9899) LogCleaner Tries To Clean Single Partition Over 1000x/Minute
Jeff Nadler created KAFKA-9899: -- Summary: LogCleaner Tries To Clean Single Partition Over 1000x/Minute Key: KAFKA-9899 URL: https://issues.apache.org/jira/browse/KAFKA-9899 Project: Kafka Issue Type: Bug Components: log cleaner Affects Versions: 2.4.1 Environment: ubuntu bionic, openjdk11.0.6, kafka 2.4.1 Reporter: Jeff Nadler Attachments: CPU-Usage.png I had previously believed this to be the same issue as KAFKA-8764, but I took a closer look when it persisted after upgrading to 2.4.1 and now believe this is a different bug. For a topic that is a very low traffic, compact topic the log cleaner will sometimes - for a period of usually 2 hours or longer - get stuck in a loop where it tries to clean the same partition for the same offset range nonstop, and the log cleaner thread consumes 100% of a single core during this time. h4. 1396 attempts in a single minute: {{root@stage-obs-kafka01:/var/log/kafka# cat log-cleaner.log | grep 22:22: | grep "offset range" | wc -l}} {{1396}} h4. All 1396 of these are looking at the same partition and same offset range: {{[2020-04-21 22:22:59,862] INFO Cleaner 0: Building offset map for log elauneind-firebolt-messages-sfo-0 for 0 segments in offset range [22943108, 22912825). (kafka.log.LogCleaner)}} These attempts are separated by on average only 30ms. This is a small 3 node cluster, note that the CPU graph attached is very clearly bimodal for each node: low when the log cleaner is not "stuck", and much higher when it is. Eventually the log cleaner appears to find a segment to clean (because enough traffic has arrived?) and the loop is broken... for a time. Note that it finds "1 segments" and then finally moves on to check other topic-partitions. {{...tens of thousands of this first one then}} {{[2020-04-21 20:06:02,531] INFO Cleaner 0: Building offset map for log elauneind-firebolt-messages-sfo-0 for 0 segments in *offset range* [23591841, 23575583). (kafka.log.LogCleaner)}}{{[2020-04-21 20:06:02,567] INFO Cleaner 0: Building offset map for log elauneind-firebolt-messages-sfo-0 for 1 segments in *offset range* [23591841, 23621641). (kafka.log.LogCleaner)}}{{[2020-04-21 20:43:04,309] INFO Cleaner 0: Building offset map for log elauneind-firebolt-messages-s2r1-0 for 1 segments in *offset range* [2687968, 2732498). (kafka.log.LogCleaner)}} h4. The topic gets about 100 messsages/minute, and it's config is: {{Topic: elauneind-firebolt-messages-sfo PartitionCount: 1 ReplicationFactor: 3 Configs: min.insync.replicas=1,cleanup.policy=compact,delete,segment.bytes=10240,retention.ms=90,message.format.version=2.3-IV1,min.compaction.lag.ms=30,min.cleanable.dirty.ratio=0.2,unclean.leader.election.enable=true,retention.bytes=1073741824}}{{ Topic: elauneind-firebolt-messages-sfo Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,1,2}} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
[ https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089107#comment-17089107 ] Sophie Blee-Goldman commented on KAFKA-7965: [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5900/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/] > Flaky Test > ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup > > > Key: KAFKA-7965 > URL: https://issues.apache.org/jira/browse/KAFKA-7965 > Project: Kafka > Issue Type: Bug > Components: clients, consumer, unit tests >Affects Versions: 1.1.1, 2.2.0, 2.3.0 >Reporter: Matthias J. Sax >Assignee: David Jacot >Priority: Critical > Labels: flaky-test > Fix For: 2.3.0 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/] > {quote}java.lang.AssertionError: Received 0, expected at least 68 at > org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320) > at > kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at > kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9898) Flaky Test StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores
Sophie Blee-Goldman created KAFKA-9898: -- Summary: Flaky Test StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores Key: KAFKA-9898 URL: https://issues.apache.org/jira/browse/KAFKA-9898 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.0 Reporter: Sophie Blee-Goldman h3. Stacktrace java.lang.AssertionError: Expected: is but: was at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores(StoreQueryIntegrationTest.java:233) [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5900/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQueryAllStalePartitionStores/] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on issue #8497: URL: https://github.com/apache/kafka/pull/8497#issuecomment-617439325 Unrelated java 11 failures: kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9335) java.lang.IllegalArgumentException: Number of partitions must be at least 1.
[ https://issues.apache.org/jira/browse/KAFKA-9335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089096#comment-17089096 ] Yuexi Liu commented on KAFKA-9335: -- [~vveeramani] I test it, the 2.4.1 fixed it > java.lang.IllegalArgumentException: Number of partitions must be at least 1. > > > Key: KAFKA-9335 > URL: https://issues.apache.org/jira/browse/KAFKA-9335 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0 >Reporter: Nitay Kufert >Assignee: Boyang Chen >Priority: Blocker > Labels: bug > Fix For: 2.4.1 > > > Hey, > When trying to upgrade our Kafka streams client to 2.4.0 (from 2.3.1) we > encountered the following exception: > {code:java} > java.lang.IllegalArgumentException: Number of partitions must be at least 1. > {code} > It's important to notice that the exact same code works just fine at 2.3.1. > > I have created a "toy" example which reproduces this exception: > [https://gist.github.com/nitayk/50da33b7bcce19ad0a7f8244d309cb8f] > and I would love to get some insight regarding why its happening / ways to > get around it > > Thanks -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h
[ https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089090#comment-17089090 ] John Roesler commented on KAFKA-8924: - Thanks for the comment, [~cadonna] , I agree that it's not as elegant-looking this way, but the point is that if we make it two separate methods, then we _have to_ select a default. I'm leaning toward the viewpoint that there is no sensible default grace period, therefore it's got to be a required parameter, which means that it must be present in the "root" factory method arguments. I can see now why other stream processing systems attempt to adaptively "learn" the grace period by observing the input stream's lateness, but I'm still on the fence about whether that's really a good "default" either. > Default grace period (-1) of TimeWindows causes suppress to emit events after > 24h > - > > Key: KAFKA-8924 > URL: https://issues.apache.org/jira/browse/KAFKA-8924 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Michał >Assignee: Michał >Priority: Major > Labels: needs-kip > > h2. Problem > The default creation of TimeWindows, like > {code} > TimeWindows.of(ofMillis(xxx)) > {code} > calls an internal constructor > {code} > return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS); > {code} > And the *-1* parameter is the default grace period which I think is here for > backward compatibility > {code} > @SuppressWarnings("deprecation") // continuing to support > Windows#maintainMs/segmentInterval in fallback mode > @Override > public long gracePeriodMs() { > // NOTE: in the future, when we remove maintainMs, > // we should default the grace period to 24h to maintain the default > behavior, > // or we can default to (24h - size) if you want to be super accurate. > return graceMs != -1 ? graceMs : maintainMs() - size(); > } > {code} > The problem is that if you use a TimeWindows with gracePeriod of *-1* > together with suppress *untilWindowCloses*, it never emits an event. > You can check the Suppress tests > (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where > [~vvcephei] was (maybe) aware of that and all the scenarios specify the > gracePeriod. > I will add a test without it on my branch and it will fail. > The test: > https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db > > h2. Now what can be done > One easy fix would be to change the default value to 0, which works fine for > me in my project, however, I am not aware of the impact it would have done > due to the changes in the *gracePeriodMs* method mentioned before. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617423885 I hope I didn't step on your toes, @ConcurrencyPractitioner , but I just wanted to make sure that you're unblocked to finish up this PR. Figuring out what's exactly wrong with those tests and whether it's ok can be a bit subtle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617423139 Ok, last one. The TransformValuesTest is just another case where the test input data is now considered idempotent, which is fine: ```diff --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java @@ -398,8 +398,8 @@ public class KTableTransformValuesTest { driver.createInputTopic(INPUT_TOPIC, new StringSerializer(), new StringSerializer()); inputTopic.pipeInput("A", "ignored", 5L); -inputTopic.pipeInput("A", "ignored", 15L); -inputTopic.pipeInput("A", "ignored", 10L); +inputTopic.pipeInput("A", "ignored1", 15L); +inputTopic.pipeInput("A", "ignored2", 10L); assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "1", 5), new KeyValueTimestamp<>("A", "0", 15), ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617418517 The source topic restart integration test was actually just failing because the tests were polluting each others' topics. This is one way to fix it: ```diff diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java index 3ec239fab9..b42a5852a5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java @@ -100,6 +100,7 @@ public class KTableSourceTopicRestartIntegrationTest { @After public void after() throws Exception { IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); +CLUSTER.deleteAllTopicsAndWait(60_000L); } @Test ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
[ https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089052#comment-17089052 ] Matthias J. Sax commented on KAFKA-9897: [~NaviBrar] – would you be interested to look into this one? You should have some context about the test. Would be nice if we could find and fix the root cause of the flakiness. > Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores > - > > Key: KAFKA-9897 > URL: https://issues.apache.org/jira/browse/KAFKA-9897 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.6.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/] > {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get > state store source-table because the stream thread is PARTITIONS_ASSIGNED, > not RUNNING at > org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85) > at > org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61) > at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617392150 I also took a look at the foreign-key join test, which is actually telling us something awesome: your feature allows us to drop _unnecessary_ tombstones that we'd otherwise send under some conditions. Anyway, it's complicated, so here's a fix for the test: ```diff --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java @@ -48,6 +48,7 @@ import java.util.Properties; import java.util.function.Function; import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; @@ -371,12 +372,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest { // Deleting a non-joining record produces an unnecessary tombstone for inner joins, because // it's not possible to know whether a result was previously emitted. +// HOWEVER, when the final join result is materialized (either explicitly or +// implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it. // For the left join, the tombstone is necessary. left.pipeInput("lhs1", (String) null); { assertThat( outputTopic.readKeyValuesToMap(), -is(mkMap(mkEntry("lhs1", null))) +is(leftJoin || !(materialized || rejoin) + ? mkMap(mkEntry("lhs1", null)) + : emptyMap()) ); if (materialized) { assertThat( @@ -452,12 +457,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest { } // "moving" our subscription to another non-existent FK results in an unnecessary tombstone for inner join, // since it impossible to know whether the prior FK existed or not (and thus whether any results have -// previously been emitted) +// previously been emitted). HOWEVER, when the final join result is materialized (either explicitly or +// implicitly by a subsequent join), we _can_ detect that the tombstone is unnecessary and drop it. // The left join emits a _necessary_ update (since the lhs record has actually changed) left.pipeInput("lhs1", "lhsValue1|rhs2"); assertThat( outputTopic.readKeyValuesToMap(), -is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" : null))) +is(leftJoin + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)")) + : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null)) ); if (materialized) { assertThat( @@ -469,7 +477,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest { left.pipeInput("lhs1", "lhsValue1|rhs3"); assertThat( outputTopic.readKeyValuesToMap(), -is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" : null))) +is(leftJoin + ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)")) + : (materialized || rejoin) ? emptyMap() : singletonMap("lhs1", null)) ); if (materialized) { assertThat( ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9882) Add Block getAssignments()
[ https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089016#comment-17089016 ] Jesse Anderson commented on KAFKA-9882: --- Seeking each time was just an example. Adding a config wouldn't handle all cases. For example, it would handle seeking to a specific offset based on which partition it was assigned. > Add Block getAssignments() > -- > > Key: KAFKA-9882 > URL: https://issues.apache.org/jira/browse/KAFKA-9882 > Project: Kafka > Issue Type: New Feature > Components: clients >Affects Versions: 2.5.0 >Reporter: Jesse Anderson >Priority: Critical > > In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a > poll(Duration). The poll(Duration) does not block for consumer assignments. > Now, there isn't a blocking method that can get consumer assignments. > A new KafkaConsumer method needs to be added that blocks while getting > consumer assignments. > The current workaround is to poll for a short amount of time in a while loop > and check the size of assignment(). This isn't a great method of verifying > the consumer assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on issue #8520: Add explicit grace period to tumbling window example
vvcephei commented on issue #8520: URL: https://github.com/apache/kafka/pull/8520#issuecomment-617376010 Gah, I forgot to add the reviewers to the merge commit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] surabhidixit opened a new pull request #8526: KAFKA-6867: corrected the typos in upgrade.html
surabhidixit opened a new pull request #8526: URL: https://github.com/apache/kafka/pull/8526 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support
vvcephei commented on issue #8254: URL: https://github.com/apache/kafka/pull/8254#issuecomment-617372772 Hey @ConcurrencyPractitioner , sorry it took so long. It's just again because the test happened to expect idempotent updates to flow through regularly, but not for anything important. Just changing the value of the "tick" record the second time fixes it without breaking anything about the test. Here's my diff: ```diff --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -615,11 +615,11 @@ public class SuppressScenarioTest { ); -inputTopicRight.pipeInput("tick", "tick", 21L); +inputTopicRight.pipeInput("tick", "tick1", 21L); verify( drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), asList( -new KeyValueTimestamp<>("tick", "(null,tick)", 21), // just a testing artifact +new KeyValueTimestamp<>("tick", "(null,tick1)", 21), // just a testing artifact new KeyValueTimestamp<>("A", "(b,2)", 13L) ) ); @@ -703,11 +703,11 @@ public class SuppressScenarioTest { ); -inputTopicLeft.pipeInput("tick", "tick", 21L); +inputTopicLeft.pipeInput("tick", "tick1", 21L); verify( drainProducerRecords(driver, "output", STRING_DESERIALIZER, STRING_DESERIALIZER), asList( -new KeyValueTimestamp<>("tick", "(tick,null)", 21), // just a testing artifact +new KeyValueTimestamp<>("tick", "(tick1,null)", 21), // just a testing artifact new KeyValueTimestamp<>("A", "(2,b)", 13L) ) ); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9882) Add Block getAssignments()
[ https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089006#comment-17089006 ] Boyang Chen commented on KAFKA-9882: So you are trying to reset offset every time when you restart the consumer? If that's the case, we could get a config like `init.offset.reset` which enforces the reset every time we restart the consumer. > Add Block getAssignments() > -- > > Key: KAFKA-9882 > URL: https://issues.apache.org/jira/browse/KAFKA-9882 > Project: Kafka > Issue Type: New Feature > Components: clients >Affects Versions: 2.5.0 >Reporter: Jesse Anderson >Priority: Critical > > In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a > poll(Duration). The poll(Duration) does not block for consumer assignments. > Now, there isn't a blocking method that can get consumer assignments. > A new KafkaConsumer method needs to be added that blocks while getting > consumer assignments. > The current workaround is to poll for a short amount of time in a while loop > and check the size of assignment(). This isn't a great method of verifying > the consumer assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on issue #8497: URL: https://github.com/apache/kafka/pull/8497#issuecomment-617364803 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on issue #8497: URL: https://github.com/apache/kafka/pull/8497#issuecomment-617364180 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
ableegoldman commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412424861 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiFunction; +import org.apache.kafka.streams.processor.TaskId; + +/** + * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment + */ +class ValidClientsByTaskLoadQueue { +private final PriorityQueue clientsByTaskLoad; +private final BiFunction validClientCriteria; +private final Set uniqueClients = new HashSet<>(); + +ValidClientsByTaskLoadQueue(final Map clientStates, +final BiFunction validClientCriteria) { +clientsByTaskLoad = getClientPriorityQueueByTaskLoad(clientStates); +this.validClientCriteria = validClientCriteria; +} + +/** += * @return the next least loaded client that satisfies the given criteria, or null if none do + */ +UUID poll(final TaskId task) { +final List validClient = poll(task, 1); +return validClient.isEmpty() ? null : validClient.get(0); +} + +/** + * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid + * candidates for the given task + */ +List poll(final TaskId task, final int numClients) { +final List nextLeastLoadedValidClients = new LinkedList<>(); +final Set invalidPolledClients = new HashSet<>(); +while (nextLeastLoadedValidClients.size() < numClients) { +UUID candidateClient; +while (true) { +candidateClient = clientsByTaskLoad.poll(); +if (candidateClient == null) { +offerAll(invalidPolledClients); +return nextLeastLoadedValidClients; +} + +if (validClientCriteria.apply(candidateClient, task)) { +nextLeastLoadedValidClients.add(candidateClient); +break; +} else { +invalidPolledClients.add(candidateClient); +} +} +} +offerAll(invalidPolledClients); +return nextLeastLoadedValidClients; +} + +void offerAll(final Collection clients) { +for (final UUID client : clients) { +offer(client); +} +} + +void offer(final UUID client) { +if (uniqueClients.contains(client)) { Review comment: @cadonna you're right, I forgot to remove from `uniqueClients` in poll. Good catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412424445 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiFunction; +import org.apache.kafka.streams.processor.TaskId; + +/** + * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment + */ +class ValidClientsByTaskLoadQueue { +private final PriorityQueue clientsByTaskLoad; +private final BiFunction validClientCriteria; +private final Set uniqueClients = new HashSet<>(); + +ValidClientsByTaskLoadQueue(final Map clientStates, +final BiFunction validClientCriteria) { +clientsByTaskLoad = getClientPriorityQueueByTaskLoad(clientStates); +this.validClientCriteria = validClientCriteria; +} + +/** += * @return the next least loaded client that satisfies the given criteria, or null if none do + */ +UUID poll(final TaskId task) { +final List validClient = poll(task, 1); +return validClient.isEmpty() ? null : validClient.get(0); +} + +/** + * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid + * candidates for the given task + */ +List poll(final TaskId task, final int numClients) { +final List nextLeastLoadedValidClients = new LinkedList<>(); +final Set invalidPolledClients = new HashSet<>(); +while (nextLeastLoadedValidClients.size() < numClients) { +UUID candidateClient; +while (true) { +candidateClient = clientsByTaskLoad.poll(); +if (candidateClient == null) { +offerAll(invalidPolledClients); +return nextLeastLoadedValidClients; +} + +if (validClientCriteria.apply(candidateClient, task)) { +nextLeastLoadedValidClients.add(candidateClient); +break; +} else { +invalidPolledClients.add(candidateClient); +} +} +} +offerAll(invalidPolledClients); +return nextLeastLoadedValidClients; +} + +void offerAll(final Collection clients) { +for (final UUID client : clients) { +offer(client); +} +} + +void offer(final UUID client) { +if (uniqueClients.contains(client)) { Review comment: Gah! You're right. We should also _remove_ the client from `uniqueClients` when we `poll`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412423011 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiFunction; +import org.apache.kafka.streams.processor.TaskId; + +/** + * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment + */ +class ValidClientsByTaskLoadQueue { +private final PriorityQueue clientsByTaskLoad; +private final BiFunction validClientCriteria; + +ValidClientsByTaskLoadQueue(final Map clientStates, +final BiFunction validClientCriteria) { Review comment: Ah, sorry about that @ableegoldman ; I wasn't able (or was too lazy) to follow the `git praise` trail through the class movement. Well, kudos to you, then. :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412422082 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,103 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); -final TaskId task; -final UUID source; -final UUID destination; +class TaskMovement { +private final TaskId task; +private final UUID destination; +private final SortedSet caughtUpClients; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +private TaskMovement(final TaskId task, final UUID destination, final SortedSet caughtUpClients) { this.task = task; -this.source = source; this.destination = destination; -} +this.caughtUpClients = caughtUpClients; -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; +if (caughtUpClients == null || caughtUpClients.isEmpty()) { +throw new IllegalStateException("Should not attempt to move a task if no caught up clients exist"); } -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); } /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry :
[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412419973 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,94 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import java.util.UUID; import org.apache.kafka.streams.processor.TaskId; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class TaskMovement { -private static final Logger log = LoggerFactory.getLogger(TaskMovement.class); - final TaskId task; -final UUID source; -final UUID destination; +private final UUID destination; -TaskMovement(final TaskId task, final UUID source, final UUID destination) { +TaskMovement(final TaskId task, final UUID destination) { this.task = task; -this.source = source; this.destination = destination; } -@Override -public boolean equals(final Object o) { -if (this == o) { -return true; -} -if (o == null || getClass() != o.getClass()) { -return false; -} -final TaskMovement movement = (TaskMovement) o; -return Objects.equals(task, movement.task) && - Objects.equals(source, movement.source) && - Objects.equals(destination, movement.destination); -} - -@Override -public int hashCode() { -return Objects.hash(task, source, destination); -} - /** - * Computes the movement of tasks from the state constrained to the balanced assignment, up to the configured - * {@code max.warmup.replicas}. A movement corresponds to a warmup replica on the destination client, with - * a few exceptional cases: - * - * 1. Tasks whose destination clients are caught-up, or whose source clients are not caught-up, will be moved - * immediately from the source to the destination in the state constrained assignment - * 2. Tasks whose destination client previously had this task as a standby will not be counted towards the total - * {@code max.warmup.replicas}. Instead they will be counted against that task's total {@code num.standby.replicas}. - * - * @param statefulActiveTaskAssignment the initial, state constrained assignment, with the source clients - * @param balancedStatefulActiveTaskAssignment the final, balanced assignment, with the destination clients - * @return list of the task movements from statefulActiveTaskAssignment to balancedStatefulActiveTaskAssignment + * @return whether any warmup replicas were assigned */ -static List getMovements(final Map> statefulActiveTaskAssignment, - final Map> balancedStatefulActiveTaskAssignment, - final Map> tasksToCaughtUpClients, - final Map clientStates, - final Map tasksToRemainingStandbys, - final int maxWarmupReplicas) { -if (statefulActiveTaskAssignment.size() != balancedStatefulActiveTaskAssignment.size()) { -throw new IllegalStateException("Tried to compute movements but assignments differ in size."); -} +static boolean assignTaskMovements(final Map> statefulActiveTaskAssignment, + final Map> tasksToCaughtUpClients, + final Map clientStates, + final Map tasksToRemainingStandbys, + final int maxWarmupReplicas) { +boolean warmupReplicasAssigned = false; + +final ValidClientsByTaskLoadQueue clientsByTaskLoad = +new ValidClientsByTaskLoadQueue( +clientStates, +(client, task) -> taskIsCaughtUpOnClient(task, client, tasksToCaughtUpClients) +); -final Map taskToDestinationClient = new HashMap<>(); -for (final Map.Entry> clientEntry : balancedStatefulActiveTaskAssignment.entrySet()) { -final UUID destination = clientEntry.getKey(); -for (final TaskId task : clientEntry.getValue()) { -taskToDestinationClient.put(task, destination); +final SortedSet taskMovements = new TreeSet<>( +(movement, other) -> { +final int numCaughtUpClients = tasksToCaughtUpClients.get(movement.task).size(); +final int
[jira] [Commented] (KAFKA-9882) Add Block getAssignments()
[ https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088922#comment-17088922 ] Jesse Anderson commented on KAFKA-9882: --- {code:java} // Create KafkaConsumer and subscribe // Call blocking getter for partition assignment java.util.Set assignment = consumer.getAssignment(); // Seek to end of topic consumer.seekToEnd(assignment); // Seek to beginning of topic consumer.seekToBeginning(assignment); // Start polling{code} > Add Block getAssignments() > -- > > Key: KAFKA-9882 > URL: https://issues.apache.org/jira/browse/KAFKA-9882 > Project: Kafka > Issue Type: New Feature > Components: clients >Affects Versions: 2.5.0 >Reporter: Jesse Anderson >Priority: Critical > > In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a > poll(Duration). The poll(Duration) does not block for consumer assignments. > Now, there isn't a blocking method that can get consumer assignments. > A new KafkaConsumer method needs to be added that blocks while getting > consumer assignments. > The current workaround is to poll for a short amount of time in a while loop > and check the size of assignment(). This isn't a great method of verifying > the consumer assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9882) Add Block getAssignments()
[ https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088917#comment-17088917 ] Boyang Chen commented on KAFKA-9882: I see, to make the discussion more effective, could you draft a sample code skeleton assuming we have introduced a new blocking API for the above use case? > Add Block getAssignments() > -- > > Key: KAFKA-9882 > URL: https://issues.apache.org/jira/browse/KAFKA-9882 > Project: Kafka > Issue Type: New Feature > Components: clients >Affects Versions: 2.5.0 >Reporter: Jesse Anderson >Priority: Critical > > In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a > poll(Duration). The poll(Duration) does not block for consumer assignments. > Now, there isn't a blocking method that can get consumer assignments. > A new KafkaConsumer method needs to be added that blocks while getting > consumer assignments. > The current workaround is to poll for a short amount of time in a while loop > and check the size of assignment(). This isn't a great method of verifying > the consumer assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9882) Add Block getAssignments()
[ https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088908#comment-17088908 ] Jesse Anderson commented on KAFKA-9882: --- subscribe() doesn't get a partition assignment until poll() is called. I double-checked this just now. Using this method would lead to quite a bit of complicated code. The listener code would have to maintain a boolean to check if this is the first time the onPartitionsAssigned method was called. It would have to flip the first time it was called. I think a more straightforward approach to getting partition assignments would result in more readable and less buggy code. > Add Block getAssignments() > -- > > Key: KAFKA-9882 > URL: https://issues.apache.org/jira/browse/KAFKA-9882 > Project: Kafka > Issue Type: New Feature > Components: clients >Affects Versions: 2.5.0 >Reporter: Jesse Anderson >Priority: Critical > > In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a > poll(Duration). The poll(Duration) does not block for consumer assignments. > Now, there isn't a blocking method that can get consumer assignments. > A new KafkaConsumer method needs to be added that blocks while getting > consumer assignments. > The current workaround is to poll for a short amount of time in a while loop > and check the size of assignment(). This isn't a great method of verifying > the consumer assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9882) Add Block getAssignments()
[ https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1708#comment-1708 ] Boyang Chen commented on KAFKA-9882: I checked back on the original KIP [https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior] So it seems the original intention was to resolve the indefinite blocking. For your case, have you considered using `public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)` which can plug in a user-callback to wait for assignment result? > Add Block getAssignments() > -- > > Key: KAFKA-9882 > URL: https://issues.apache.org/jira/browse/KAFKA-9882 > Project: Kafka > Issue Type: New Feature > Components: clients >Affects Versions: 2.5.0 >Reporter: Jesse Anderson >Priority: Critical > > In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a > poll(Duration). The poll(Duration) does not block for consumer assignments. > Now, there isn't a blocking method that can get consumer assignments. > A new KafkaConsumer method needs to be added that blocks while getting > consumer assignments. > The current workaround is to poll for a short amount of time in a while loop > and check the size of assignment(). This isn't a great method of verifying > the consumer assignment. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r412313524 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,23 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) -class DelayedItem(val delayMs: Long) extends Delayed with Logging { - - private val dueMs = Time.SYSTEM.milliseconds + delayMs - - def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay)) + private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs) /** - * The remaining delay time + * true if the item is still delayed */ - def getDelay(unit: TimeUnit): Long = { -unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS) + def isDelayed: Boolean = { +time.nanoseconds < dueNs } - def compareTo(d: Delayed): Int = { -val other = d.asInstanceOf[DelayedItem] -java.lang.Long.compare(dueMs, other.dueMs) + def compareTo(d: DelayedItem): Int = { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
ijuma commented on a change in pull request #8417: URL: https://github.com/apache/kafka/pull/8417#discussion_r412309101 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ## @@ -57,6 +59,10 @@ public ByteBuffer serialize(ApiKeys apiKey, short version, int correlationId) { return Collections.singletonMap(error, 1); } +protected Map errorCounts(Stream errors) { +return errors.collect(Collectors.groupingBy(e -> e, Collectors.summingInt(e -> 1))); Review comment: Thanks! I think the way you have it now is both fast and concise, not worth changing it to use `forEach` given how this is normally used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
lbradstreet commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r412305902 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,23 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) -class DelayedItem(val delayMs: Long) extends Delayed with Logging { - - private val dueMs = Time.SYSTEM.milliseconds + delayMs - - def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay)) + private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs) /** - * The remaining delay time + * true if the item is still delayed */ - def getDelay(unit: TimeUnit): Long = { -unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS) + def isDelayed: Boolean = { +time.nanoseconds < dueNs } - def compareTo(d: Delayed): Int = { -val other = d.asInstanceOf[DelayedItem] -java.lang.Long.compare(dueMs, other.dueMs) + def compareTo(d: DelayedItem): Int = { Review comment: Makes sense. I'll remove this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on issue #8508: MINOR: Improve usage of LogCaptureAppender
mjsax commented on issue #8508: URL: https://github.com/apache/kafka/pull/8508#issuecomment-617266065 Java 8 and Java 11 passed. Java 14: failed with unknown error... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on issue #8497: URL: https://github.com/apache/kafka/pull/8497#issuecomment-617266153 Test this, please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on issue #8497: URL: https://github.com/apache/kafka/pull/8497#issuecomment-617265836 Test this, please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
vvcephei commented on issue #8497: URL: https://github.com/apache/kafka/pull/8497#issuecomment-617264248 Test this, please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9850) Move KStream#repartition operator validation during Topology build process
[ https://issues.apache.org/jira/browse/KAFKA-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] HaiyuanZhao reassigned KAFKA-9850: -- Assignee: HaiyuanZhao > Move KStream#repartition operator validation during Topology build process > --- > > Key: KAFKA-9850 > URL: https://issues.apache.org/jira/browse/KAFKA-9850 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Levani Kokhreidze >Assignee: HaiyuanZhao >Priority: Major > Labels: help-wanted, newbie, newbie++ > > `KStream#repartition` operation performs most of its validation regarding > joining, co-partitioning, etc after starting Kafka Streams instance. Some > parts of this validation can be detected much earlier, specifically during > topology `build()`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy
tombentley commented on a change in pull request #8417: URL: https://github.com/apache/kafka/pull/8417#discussion_r412287986 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ## @@ -57,6 +59,10 @@ public ByteBuffer serialize(ApiKeys apiKey, short version, int correlationId) { return Collections.singletonMap(error, 1); } +protected Map errorCounts(Stream errors) { +return errors.collect(Collectors.groupingBy(e -> e, Collectors.summingInt(e -> 1))); Review comment: @ijuma I wrote a small microbenchmark (committed and reverted if you want to take a look) to compare performance. I picked `TxnOffsetCommitResponse` (more or less at random, but since it has two levels of nesting for topics and partitions it has a double loop) with an unrepresentitively large number of topics and partitions. Using the old code (`errorCounts(errors())`), I got this test run: ``` {NONE=2} run 0, times=2 took 50172790515ns, 398.62243647820753ops/s {NONE=2} run 1, times=2 took 49210843555ns, 406.4144923191004ops/s {NONE=2} run 2, times=2 took 49366208092ns, 405.1354311582437ops/s {NONE=2} run 3, times=2 took 48628565963ns, 411.2808922890589ops/s {NONE=2} run 4, times=2 took 48727847017ns, 410.4429237971969ops/s ``` I aborted early because it was pretty slow. You can see the JIT is improving the performance a little over time. Using the streaming approach I got this test run: ``` {NONE=2} run 0, times=2 took 6984797524ns, 2863.36145482805ops/s {NONE=2} run 1, times=2 took 6566254988ns, 3045.8762318171493ops/s {NONE=2} run 2, times=2 took 6553362923ns, 3051.868214074797ops/s {NONE=2} run 3, times=2 took 6259904961ns, 3194.936684279159ops/s {NONE=2} run 4, times=2 took 6675450385ns, 2996.052527772626ops/s {NONE=2} run 5, times=2 took 6949088789ns, 2878.075184714696ops/s {NONE=2} run 6, times=2 took 6045899635ns, 3308.02712704972ops/s {NONE=2} run 7, times=2 took 5845348664ns, 3421.5238730197325ops/s {NONE=2} run 8, times=2 took 6370088159ns, 3139.6739732311135ops/s {NONE=2} run 9, times=2 took 6799792822ns, 2941.2660831800854ops/s {NONE=2} run 10, times=2 took 6641092713ns, 3011.5525959831602ops/s {NONE=2} run 11, times=2 took 6621610314ns, 3020.4133211696576ops/s {NONE=2} run 12, times=2 took 6339235087ns, 3154.9547738045576ops/s {NONE=2} run 13, times=2 took 6461046814ns, 3095.473624593366ops/s {NONE=2} run 14, times=2 took 6585386195ns, 3037.027656052296ops/s {NONE=2} run 15, times=2 took 6565973868ns, 3046.0066484ops/s {NONE=2} run 16, times=2 took 6585253169ns, 3037.0890058031114ops/s {NONE=2} run 17, times=2 took 6618664562ns, 3021.7576087518905ops/s {NONE=2} run 18, times=2 took 6592603829ns, 3033.7026945290754ops/s {NONE=2} run 19, times=2 took 6567525693ns, 3045.2869063484604ops/s ``` This is about 7½ times faster. Out of interest I also rewote the `TxnOffsetCommitResponse.errorCounts()` to use a `forEach()`: ``` {NONE=2} run 0, times=2 took 6038137472ns, 3312.279671131012ops/s {NONE=2} run 1, times=2 took 5642135982ns, 3544.7568197231726ops/s {NONE=2} run 2, times=2 took 5551109425ns, 3602.883400195268ops/s {NONE=2} run 3, times=2 took 5511950192ns, 3628.4798126492215ops/s {NONE=2} run 4, times=2 took 5180664883ns, 3860.5083423999577ops/s {NONE=2} run 5, times=2 took 4571569172ns, 4374.865444997799ops/s {NONE=2} run 6, times=2 took 5472660241ns, 3654.529811692726ops/s {NONE=2} run 7, times=2 took 5499370051ns, 3636.780179279483ops/s {NONE=2} run 8, times=2 took 5523721146ns, 3620.7475850736946ops/s {NONE=2} run 9, times=2 took 4691001711ns, 4263.481710761627ops/s {NONE=2} run 10, times=2 took 5495174831ns, 3639.5566319698773ops/s {NONE=2} run 11, times=2 took 5676661773ns, 3523.1974001210237ops/s {NONE=2} run 12, times=2 took 5605106974ns, 3568.174540249194ops/s {NONE=2} run 13, times=2 took 5577604479ns, 3585.768778568137ops/s {NONE=2} run 14, times=2 took 5544332242ns, 3607.287429222572ops/s {NONE=2} run 15, times=2 took 5502312660ns, 3634.835247621134ops/s {NONE=2} run 16, times=2 took 5528323376ns, 3617.7333776865516ops/s {NONE=2} run 17, times=2 took 5528944581ns, 3617.3269069704934ops/s
[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
chia7712 commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r412284404 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,23 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) -class DelayedItem(val delayMs: Long) extends Delayed with Logging { - - private val dueMs = Time.SYSTEM.milliseconds + delayMs - - def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay)) + private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs) /** - * The remaining delay time + * true if the item is still delayed */ - def getDelay(unit: TimeUnit): Long = { -unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), TimeUnit.MILLISECONDS) + def isDelayed: Boolean = { +time.nanoseconds < dueNs } - def compareTo(d: Delayed): Int = { -val other = d.asInstanceOf[DelayedItem] -java.lang.Long.compare(dueMs, other.dueMs) + def compareTo(d: DelayedItem): Int = { Review comment: This method was from ```Delayed``` so it seems to me it is ok to remove this method if this class does not extend ```Delayed``` anymore This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem
chia7712 commented on a change in pull request #8517: URL: https://github.com/apache/kafka/pull/8517#discussion_r412279368 ## File path: core/src/main/scala/kafka/utils/DelayedItem.scala ## @@ -21,24 +21,24 @@ import java.util.concurrent._ import org.apache.kafka.common.utils.Time -import scala.math._ +class DelayedItem(val delayMs: Long, val time: Time) extends Logging { + def this(delayMs: Long) = this(delayMs, Time.SYSTEM) -class DelayedItem(val delayMs: Long) extends Delayed with Logging { - - private val dueMs = Time.SYSTEM.milliseconds + delayMs - - def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay)) + private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs) Review comment: > I think it's very unlikely we'll ever hit it given our normal delays. you are right :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on issue #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on issue #8221: URL: https://github.com/apache/kafka/pull/8221#issuecomment-61720 > I saw that the stream time update codepath is missing, could we add it back? I was thinking about a simpler approach here: since we could not decrement the stream-time, no matter we are adding or removing partitions, we shall just iterate over all the result partitions and find the min partition time. If that min partition time is larger than current stream time, we advance it; otherwise we do nothing, WDYT? @guozhangwang @avalsa I think like it's in code: if we can skip update streamTime and it will not violate any requirements it's easiest what we can do and seems it will not impact anybody too much as it will be updated at first get next record request. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state
dajac commented on a change in pull request #8238: URL: https://github.com/apache/kafka/pull/8238#discussion_r412182105 ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -1553,6 +1554,50 @@ class KafkaApisTest { assertEquals(Errors.INVALID_REQUEST, response.error) } + @Test + def testListGroupsRequest(): Unit = { +val overviews = List( + GroupOverview("group1", "protocol1", "Stable"), + GroupOverview("goupp2", "qwerty", "Empty") +) +val response = listGroupRequest(Option.empty, overviews) +assertEquals(2, response.data.groups.size) +assertEquals("", response.data.groups.get(0).groupState) +assertEquals("", response.data.groups.get(1).groupState) + } + + @Test + def testListGroupsRequestWithState(): Unit = { +val overviews = List( + GroupOverview("group1", "protocol1", "Stable") +) +val response = listGroupRequest(Option.apply("Stable"), overviews) +assertEquals(1, response.data.groups.size) +assertEquals("Stable", response.data.groups.get(0).groupState) + } + + private def listGroupRequest(state: Option[String], overviews: List[GroupOverview]): ListGroupsResponse = { +EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel) + +val data = new ListGroupsRequestData() +if (state.isDefined) + data.setStates(Collections.singletonList(state.get)) +val listGroupsRequest = new ListGroupsRequest.Builder(data).build() +val requestChannelRequest = buildRequest(listGroupsRequest) + +val capturedResponse = expectNoThrottling() +val expectedStates = if (state.isDefined) List(state.get) else List() +EasyMock.expect(groupCoordinator.handleListGroups(expectedStates)) + .andReturn((Errors.NONE, overviews)) +EasyMock.replay(groupCoordinator, clientRequestQuotaManager, requestChannel) + +createKafkaApis().handleListGroupsRequest(requestChannelRequest) + +val response = readResponse(ApiKeys.LIST_GROUPS, listGroupsRequest, capturedResponse).asInstanceOf[ListGroupsResponse] +assertEquals(Errors.NONE.code, response.data.errorCode) +return response Review comment: nit: `return` can be omitted here. ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -1553,6 +1554,50 @@ class KafkaApisTest { assertEquals(Errors.INVALID_REQUEST, response.error) } + @Test + def testListGroupsRequest(): Unit = { +val overviews = List( + GroupOverview("group1", "protocol1", "Stable"), + GroupOverview("goupp2", "qwerty", "Empty") +) +val response = listGroupRequest(Option.empty, overviews) Review comment: nit: We tend to use `None` instead of `Option.empty`. ## File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ## @@ -1024,6 +1071,11 @@ object ConsumerGroupCommand extends Logging { if (!options.has(groupOpt) && !options.has(allGroupsOpt)) CommandLineUtils.printUsageAndDie(parser, s"Option $describeOpt takes one of these options: ${allGroupSelectionScopeOpts.mkString(", ")}") +val mutuallyExclusiveOpts: Set[OptionSpec[_]] = Set(membersOpt, offsetsOpt, stateOpt) +if (mutuallyExclusiveOpts.toList.map(o => if (options.has(o)) 1 else 0).sum > 1) { + CommandLineUtils.printUsageAndDie(parser, +s"Option $describeOpt takes at most one of these options: $mutuallyExclusiveOpts") Review comment: We should build a string here: `mutuallyExclusiveOpts..mkString(", ")` ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -1553,6 +1554,50 @@ class KafkaApisTest { assertEquals(Errors.INVALID_REQUEST, response.error) } + @Test + def testListGroupsRequest(): Unit = { +val overviews = List( + GroupOverview("group1", "protocol1", "Stable"), + GroupOverview("goupp2", "qwerty", "Empty") +) +val response = listGroupRequest(Option.empty, overviews) +assertEquals(2, response.data.groups.size) +assertEquals("", response.data.groups.get(0).groupState) +assertEquals("", response.data.groups.get(1).groupState) Review comment: When I see this, I do wonder if it wouldn't be better to make the `GroupState` field in the response `nullable` and to set it to `null` when the state is not provided. Having to handle empty stings is a bit annoying. What do you think? ## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ## @@ -1553,6 +1554,50 @@ class KafkaApisTest { assertEquals(Errors.INVALID_REQUEST, response.error) } + @Test + def testListGroupsRequest(): Unit = { +val overviews = List( + GroupOverview("group1", "protocol1", "Stable"), + GroupOverview("goupp2", "qwerty", "Empty") +) +val response = listGroupRequest(Option.empty, overviews) +assertEquals(2, response.data.groups.size) +assertEquals("",
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener list TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +} + +@Test +public void testRegexRecordsAreProcessedAfterReassignment() throws Exception { Review comment: Yes, it fails on trunk. I found reason: here I tried to isolate tests: so we can create same topic in one test, delete it after we are done, do the same in next test, so on. But after each test it calls `streams.close()` in `teardown` method. I guess It tries to do smth with removed topics and gets `TimeoutException` because can't do it. may be it's issue but probably it happens only when close. But it seems to be not related to this task. @abbccdda What do you think? I fixed it easily (may be not best approach): call `streams.close` in test method body before deleting topics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed
[ https://issues.apache.org/jira/browse/KAFKA-9617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088692#comment-17088692 ] Stanislav Kozlovski commented on KAFKA-9617: Feel free to assign yourself [~showuon]! > Replica Fetcher can mark partition as failed when max.message.bytes is changed > -- > > Key: KAFKA-9617 > URL: https://issues.apache.org/jira/browse/KAFKA-9617 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Priority: Major > Labels: newbie > > There exists a race condition when changing the dynamic max.message.bytes > config for a topic. A follower replica can replicate a message that is over > that size after it processes the config change. When this happens, the > replica fetcher catches the unexpected exception, marks the partition as > failed and stops replicating it. > {code:java} > 06:38:46.596 Processing override for entityPath: topics/partition-1 with > config: Map(max.message.bytes -> 512) > 06:38:46.597 [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] > Unexpected error occurred while processing data for partition partition-1 at > offset 20964 > org.apache.kafka.common.errors.RecordTooLargeException: The record batch size > in the append to partition-1 is 3349 bytes which exceeds the maximum > configured value of 512. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener list TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +} + +@Test +public void testRegexRecordsAreProcessedAfterReassignment() throws Exception { Review comment: I found reason: here I tried to isolate tests: so we can create same topic in one test, delete it after we are done, do the same in next test, so on. But after each test it calls `streams.close()` in `teardown` method. I guess It tries to do smth with removed topics and gets `TimeoutException` because can't do it. may be it's issue but probably it happens only when close. But it seems to be not related to this task. @abbccdda What do you think? I fixed it easily (may be not best approach): call `streams.close` in test method body before deleting topics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final ConsumerRebalanceListener list TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +} + +@Test +public void testRegexRecordsAreProcessedAfterReassignment() throws Exception { Review comment: I found reason: here I tried to isolate tests: so we can create same topic in one test, delete it after we are done, do the same in next test, so on. But after each test it calls `streams.close()` in `teardown` method. I guess It tries to do smth with removed topics and gets `TimeoutException` because can't do it. may be it's issue but probably it happens only when close. But it seems to be not related to this task. @abbccdda What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9850) Move KStream#repartition operator validation during Topology build process
[ https://issues.apache.org/jira/browse/KAFKA-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088647#comment-17088647 ] HaiyuanZhao commented on KAFKA-9850: [~bchen225242] I want to pick this up. Can u assign this to me? :) > Move KStream#repartition operator validation during Topology build process > --- > > Key: KAFKA-9850 > URL: https://issues.apache.org/jira/browse/KAFKA-9850 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Levani Kokhreidze >Priority: Major > Labels: help-wanted, newbie, newbie++ > > `KStream#repartition` operation performs most of its validation regarding > joining, co-partitioning, etc after starting Kafka Streams instance. Some > parts of this validation can be detected much earlier, specifically during > topology `build()`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch
dajac commented on issue #8509: URL: https://github.com/apache/kafka/pull/8509#issuecomment-617137684 @apovzner Yeah, you're right. It does not increase the overall coverage. I spent quite some time in the `KafkaApis` recently and I noticed that its unit test coverage is quite low. If not mistaken, we have literally zero unit tests for the `LeaderAndIsr` and `UpdateMetadata` requests over there. I believe that having a reasonable unit tests coverage will help us to catch regression earlier in the development process. When one touches the `KafkaApis`, the first reflex is to run its unit tests to verify that things are not broken. I do agree that adding a test which verifies that `IllegalStateException` is not thrown does not bring much. What do you think about adding few unit tests which verify respectively that a stale broker epoch is rejected and a valid broker epoch is accepted for the `LeaderAndIsr`, the `StopReplica` and the `UpdateMetadata` requests? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412099165 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -115,7 +115,7 @@ public void suspend() { } @Override -public void resume() { +public void resume(final boolean requiresUpdate) { Review comment: agree. beautified it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one
cadonna commented on a change in pull request #8497: URL: https://github.com/apache/kafka/pull/8497#discussion_r412047332 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java ## @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import java.util.Collection; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.UUID; +import java.util.function.BiFunction; +import org.apache.kafka.streams.processor.TaskId; + +/** + * Wraps a priority queue of clients and returns the next valid candidate(s) based on the current task assignment + */ +class ValidClientsByTaskLoadQueue { + +private final PriorityQueue clientsByTaskLoad; +private final BiFunction validClientCriteria; +private final Set uniqueClients = new HashSet<>(); + +ValidClientsByTaskLoadQueue(final Map clientStates, +final BiFunction validClientCriteria) { +this.validClientCriteria = validClientCriteria; + +clientsByTaskLoad = new PriorityQueue<>( +(client, other) -> { +final double clientTaskLoad = clientStates.get(client).taskLoad(); +final double otherTaskLoad = clientStates.get(other).taskLoad(); +if (clientTaskLoad < otherTaskLoad) { +return -1; +} else if (clientTaskLoad > otherTaskLoad) { +return 1; +} else { +return client.compareTo(other); +} +}); +} + +/** + * @return the next least loaded client that satisfies the given criteria, or null if none do + */ +UUID poll(final TaskId task) { +final List validClient = poll(task, 1); +return validClient.isEmpty() ? null : validClient.get(0); +} + +/** + * @return the next N <= {@code numClientsPerTask} clients in the underlying priority queue that are valid candidates for the given task + */ +List poll(final TaskId task, final int numClients) { +final List nextLeastLoadedValidClients = new LinkedList<>(); +final Set invalidPolledClients = new HashSet<>(); +while (nextLeastLoadedValidClients.size() < numClients) { +UUID candidateClient; +while (true) { +candidateClient = clientsByTaskLoad.poll(); +if (candidateClient == null) { +offerAll(invalidPolledClients); +return nextLeastLoadedValidClients; +} + +if (validClientCriteria.apply(candidateClient, task)) { +nextLeastLoadedValidClients.add(candidateClient); +break; +} else { +invalidPolledClients.add(candidateClient); +} +} +} +offerAll(invalidPolledClients); +return nextLeastLoadedValidClients; +} + +void offerAll(final Collection clients) { +for (final UUID client : clients) { +offer(client); +} +} + +void offer(final UUID client) { +if (uniqueClients.contains(client)) { +clientsByTaskLoad.remove(client); +} +clientsByTaskLoad.offer(client); +uniqueClients.add(client); Review comment: prop: I would not add the client if it is already contained in the set. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -16,128 +16,103 @@ */ package org.apache.kafka.streams.processor.internals.assignment; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; +import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist; + import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Set; import java.util.SortedSet; +import
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412055008 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -305,6 +297,12 @@ public void resume() { default: throw new IllegalStateException("Illegal state " + state() + " while resuming active task " + id); } +if (requiresUpdate) { +partitionGroup.updatePartitions(inputPartitions(), recordQueueCreator::createQueue); +if (state() != State.RESTORING) { // if task is RESTORING then topology will be initialized in completeRestoration Review comment: when work on this task it fails some test that ensure that initializeTopology called once here. I thought that it might be important and decided to support this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412049922 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1127,7 +1127,7 @@ public void shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration() } @Test -public void shouldNotReInitializeTopologyWhenResuming() throws IOException { +public void shouldNotReInitializeTopologyWhenResumingWithFalseFlag() throws IOException { Review comment: fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on issue #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
tombentley commented on issue #8311: URL: https://github.com/apache/kafka/pull/8311#issuecomment-617069226 @dajac thanks for the review, I've addressed all your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) { return queue.partitionTime(); } +// creates queues for new partitions, removes old queues, saves cached records for previously assigned partitions +void updatePartitions(final Set newInputPartitions, final Function recordQueueCreator) { +final Set removedPartitions = new HashSet<>(); +final Iterator> queuesIterator = partitionQueues.entrySet().iterator(); +while (queuesIterator.hasNext()) { +final Map.Entry queueEntry = queuesIterator.next(); +final TopicPartition topicPartition = queueEntry.getKey(); +if (newInputPartitions.contains(topicPartition)) { Review comment: Yes, can rephrase as you offer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) { return queue.partitionTime(); } +// creates queues for new partitions, removes old queues, saves cached records for previously assigned partitions +void updatePartitions(final Set newInputPartitions, final Function recordQueueCreator) { +final Set removedPartitions = new HashSet<>(); +final Iterator> queuesIterator = partitionQueues.entrySet().iterator(); +while (queuesIterator.hasNext()) { +final Map.Entry queueEntry = queuesIterator.next(); +final TopicPartition topicPartition = queueEntry.getKey(); +if (newInputPartitions.contains(topicPartition)) { Review comment: Yes, can rephrased as you offer. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance
avalsa commented on a change in pull request #8221: URL: https://github.com/apache/kafka/pull/8221#discussion_r412015236 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java ## @@ -50,7 +53,7 @@ */ public class PartitionGroup { -private final Map partitionQueues; +private Map partitionQueues; Review comment: agree This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
stanislavkozlovski commented on a change in pull request #8524: URL: https://github.com/apache/kafka/pull/8524#discussion_r412014026 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1066,6 +1066,7 @@ class KafkaController(val config: KafkaConfig, // do this check only if the broker is live and there are no partitions being reassigned currently // and preferred replica election is not in progress val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) && + controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr.contains(leaderBroker) && Review comment: I think we should do this check last. We also want to use an option to avoid any potential NPEs. e.g: ``` controllerContext.partitionLeadershipInfo.get(partition).forall(...) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #8525: [WIP] KAFKA-9885; Evict last members of a group when the maximum allowed is reached
dajac opened a new pull request #8525: URL: https://github.com/apache/kafka/pull/8525 WIP ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed
[ https://issues.apache.org/jira/browse/KAFKA-9617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Ge reassigned KAFKA-9617: -- Assignee: (was: Wang Ge) > Replica Fetcher can mark partition as failed when max.message.bytes is changed > -- > > Key: KAFKA-9617 > URL: https://issues.apache.org/jira/browse/KAFKA-9617 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Priority: Major > Labels: newbie > > There exists a race condition when changing the dynamic max.message.bytes > config for a topic. A follower replica can replicate a message that is over > that size after it processes the config change. When this happens, the > replica fetcher catches the unexpected exception, marks the partition as > failed and stops replicating it. > {code:java} > 06:38:46.596 Processing override for entityPath: topics/partition-1 with > config: Map(max.message.bytes -> 512) > 06:38:46.597 [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] > Unexpected error occurred while processing data for partition partition-1 at > offset 20964 > org.apache.kafka.common.errors.RecordTooLargeException: The record batch size > in the append to partition-1 is 3349 bytes which exceeds the maximum > configured value of 512. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed
[ https://issues.apache.org/jira/browse/KAFKA-9617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Ge reassigned KAFKA-9617: -- Assignee: Wang Ge > Replica Fetcher can mark partition as failed when max.message.bytes is changed > -- > > Key: KAFKA-9617 > URL: https://issues.apache.org/jira/browse/KAFKA-9617 > Project: Kafka > Issue Type: Bug >Reporter: Stanislav Kozlovski >Assignee: Wang Ge >Priority: Major > Labels: newbie > > There exists a race condition when changing the dynamic max.message.bytes > config for a topic. A follower replica can replicate a message that is over > that size after it processes the config change. When this happens, the > replica fetcher catches the unexpected exception, marks the partition as > failed and stops replicating it. > {code:java} > 06:38:46.596 Processing override for entityPath: topics/partition-1 with > config: Map(max.message.bytes -> 512) > 06:38:46.597 [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] > Unexpected error occurred while processing data for partition partition-1 at > offset 20964 > org.apache.kafka.common.errors.RecordTooLargeException: The record batch size > in the append to partition-1 is 3349 bytes which exceeds the maximum > configured value of 512. > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #8520: Add explicit grace period to tumbling window example
LiamClarkeNZ commented on a change in pull request #8520: URL: https://github.com/apache/kafka/pull/8520#discussion_r412007274 ## File path: docs/streams/developer-guide/dsl-api.html ## @@ -3262,12 +3262,15 @@ KTable-KTable Foreign-Key import org.apache.kafka.streams.kstream.TimeWindows; // A tumbling time window with a size of 5 minutes (and, by definition, an implicit -// advance interval of 5 minutes). +// advance interval of 5 minutes). Note the explicit grace period, as the current +// default value is 24 hours, which may be larger than needed for smaller windows. +// Note that this default may change in future major version releases. Review comment: Kia ora @vvcephei I have removed the comment in my latest commit :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h
[ https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088442#comment-17088442 ] Bruno Cadonna commented on KAFKA-8924: -- [~vvcephei] I like your proposal {code:java} public static TimeWindows ofSize(final Duration size) {code} because it makes the meaning of the method more explicit. I am undecided about {code:java} public static TimeWindows ofSizeAndGracePeriod(final Duration size, final Duration grace) {code} I see your point, but I find the API more elegant (i.e. better readable) with two distinct methods for size and grace. But that's my personal opinion and not a technical reason. > Default grace period (-1) of TimeWindows causes suppress to emit events after > 24h > - > > Key: KAFKA-8924 > URL: https://issues.apache.org/jira/browse/KAFKA-8924 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.3.0 >Reporter: Michał >Assignee: Michał >Priority: Major > Labels: needs-kip > > h2. Problem > The default creation of TimeWindows, like > {code} > TimeWindows.of(ofMillis(xxx)) > {code} > calls an internal constructor > {code} > return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS); > {code} > And the *-1* parameter is the default grace period which I think is here for > backward compatibility > {code} > @SuppressWarnings("deprecation") // continuing to support > Windows#maintainMs/segmentInterval in fallback mode > @Override > public long gracePeriodMs() { > // NOTE: in the future, when we remove maintainMs, > // we should default the grace period to 24h to maintain the default > behavior, > // or we can default to (24h - size) if you want to be super accurate. > return graceMs != -1 ? graceMs : maintainMs() - size(); > } > {code} > The problem is that if you use a TimeWindows with gracePeriod of *-1* > together with suppress *untilWindowCloses*, it never emits an event. > You can check the Suppress tests > (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where > [~vvcephei] was (maybe) aware of that and all the scenarios specify the > gracePeriod. > I will add a test without it on my branch and it will fail. > The test: > https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db > > h2. Now what can be done > One easy fix would be to change the default value to 0, which works fine for > me in my project, however, I am not aware of the impact it would have done > due to the changes in the *gracePeriodMs* method mentioned before. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] leonardge opened a new pull request #8524: Avoid starting election for topics where preferred leader is not in s…
leonardge opened a new pull request #8524: URL: https://github.com/apache/kafka/pull/8524 …ync. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9781) TimestampConverter / Allow to specify a time zone when converting unix epoch to string
[ https://issues.apache.org/jira/browse/KAFKA-9781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] fml2 updated KAFKA-9781: Description: TimestampConverter can convert a unix epoch value (long; number of milliseconds since Jan 01 1970 00:00 GMT) to string. However, when doing such conversion, the string result depends on the time zone used. TimestampConverter uses UTC (i.e. GMT) for the conversion and does not allow to change it. But I would need this in order to get the date/time representation in my local time zone. So I propose to introduce another config parameter (optional) for "target.type=string": *timeZone* (use java name as the value for the parameter). If no time zone is specified, UTC should be used, so that the change is backwards compatible. was: TimestampConverter can convert a unix epoch value (long; number of milliseconds since Jan 01 1970 00:00 GMT) to string. However, when doing such conversion, the string result depends on the time zone used. TimestampConverter uses UTC (i.e. GMT) for the conversion and does not allow to change it. But I would need this in order to get the date/time representation in my local time zone. So I propose to introduce another config parameter (optional) for "target.type=string": *timeZone* (use java name for that). If no time zone is specified, UTC should be used. > TimestampConverter / Allow to specify a time zone when converting unix epoch > to string > -- > > Key: KAFKA-9781 > URL: https://issues.apache.org/jira/browse/KAFKA-9781 > Project: Kafka > Issue Type: Wish > Components: KafkaConnect >Reporter: fml2 >Priority: Major > > TimestampConverter can convert a unix epoch value (long; number of > milliseconds since Jan 01 1970 00:00 GMT) to string. However, when doing such > conversion, the string result depends on the time zone used. > TimestampConverter uses UTC (i.e. GMT) for the conversion and does not allow > to change it. But I would need this in order to get the date/time > representation in my local time zone. > So I propose to introduce another config parameter (optional) for > "target.type=string": *timeZone* (use java name as the value for the > parameter). If no time zone is specified, UTC should be used, so that the > change is backwards compatible. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9866) Do not attempt to elect preferred leader replicas which are outside ISR
[ https://issues.apache.org/jira/browse/KAFKA-9866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wang Ge reassigned KAFKA-9866: -- Assignee: Wang Ge > Do not attempt to elect preferred leader replicas which are outside ISR > --- > > Key: KAFKA-9866 > URL: https://issues.apache.org/jira/browse/KAFKA-9866 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Wang Ge >Priority: Minor > > The controller automatically triggers a preferred leader election every N > minutes. It tries to elect all preferred leaders of partitions without doing > some basic checks like whether the leader is in sync. > This leads to a multitude of errors which cause confusion: > {code:java} > April 14th 2020, 17:01:11.015 [Controller id=0] Partition TOPIC-9 failed to > complete preferred replica leader election to 1. Leader is still 0{code} > {code:java} > April 14th 2020, 17:01:11.002 [Controller id=0] Error completing replica > leader election (PREFERRED) for partition TOPIC-9 > kafka.common.StateChangeFailedException: Failed to elect leader for partition > TOPIC-9 under strategy PreferredReplicaPartitionLeaderElectionStrategy {code} > It would be better if the Controller filtered out some of these elections, > not attempt them at all and maybe log an aggregate INFO-level log -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9895) Truncation request on broker start up may cause OffsetOutOfRangeException
[ https://issues.apache.org/jira/browse/KAFKA-9895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088354#comment-17088354 ] Boquan Tang commented on KAFKA-9895: [~ijuma] We've only recently upgraded from 2.2.1 to 2.4.0, we would upgrade again for newer version in the near future, but to avoid too much of overhead we don't do it as often as every Kafka release. Is this issue documented in another ticket and got fixed in 2.4.1/2.5.0? If so please feel free to close the ticket as already fixed. > Truncation request on broker start up may cause OffsetOutOfRangeException > - > > Key: KAFKA-9895 > URL: https://issues.apache.org/jira/browse/KAFKA-9895 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boquan Tang >Priority: Major > > We have a 4 broker cluster running version 2.4.0. > Upon broker restart, we frequently observe issue like this: > {code} > [2020-04-20 20:36:37,827] ERROR [ReplicaFetcher replicaId=4, leaderId=1, > fetcherId=0] Unexpected error occurred during truncation for topic-name-10 at > offset 632111354 (kafka.server.ReplicaFetcherThread) > org.apache.kafka.common.errors.OffsetOutOfRangeException: Received request > for offset 632111355 for partition active-ads-10, but we only have log > segments in the range 0 to 632111354. > {code} > The partition experiencing this issue seems random. Could we actually ignore > this kind of error and not put this partition to offline? From what the error > log describes, I think once the start up finishes, and the partition catches > up with leader, it should be OK to put it back to ISR. Please help me if I'm > understanding it incorrectly. > This happens after we updated to 2.4.0, so I'm wondering if it has anything > to do with this specific version or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9895) Truncation request on broker start up may cause OffsetOutOfRangeException
[ https://issues.apache.org/jira/browse/KAFKA-9895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088352#comment-17088352 ] Ismael Juma commented on KAFKA-9895: Thanks for the report. Is there a reason you are using 2.4.0 instead of 2.4.1 or 2.5.0? There are some important fixes in the newer versions. > Truncation request on broker start up may cause OffsetOutOfRangeException > - > > Key: KAFKA-9895 > URL: https://issues.apache.org/jira/browse/KAFKA-9895 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boquan Tang >Priority: Major > > We have a 4 broker cluster running version 2.4.0. > Upon broker restart, we frequently observe issue like this: > {code} > [2020-04-20 20:36:37,827] ERROR [ReplicaFetcher replicaId=4, leaderId=1, > fetcherId=0] Unexpected error occurred during truncation for topic-name-10 at > offset 632111354 (kafka.server.ReplicaFetcherThread) > org.apache.kafka.common.errors.OffsetOutOfRangeException: Received request > for offset 632111355 for partition active-ads-10, but we only have log > segments in the range 0 to 632111354. > {code} > The partition experiencing this issue seems random. Could we actually ignore > this kind of error and not put this partition to offline? From what the error > log describes, I think once the start up finishes, and the partition catches > up with leader, it should be OK to put it back to ISR. Please help me if I'm > understanding it incorrectly. > This happens after we updated to 2.4.0, so I'm wondering if it has anything > to do with this specific version or not. -- This message was sent by Atlassian Jira (v8.3.4#803005)