[GitHub] [kafka] showuon commented on pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection
showuon commented on PR #12381: URL: https://github.com/apache/kafka/pull/12381#issuecomment-1181370970 @divijvaidya , usually we backported to the previous version only since patch release usually has one only. I just backported back to 3.1 branch, too. It took me some time to fix the conflict and make sure test works. If you think it should backport to versions >= 2.7, welcome to submit PRs against each version, I'll help review and merge them. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stan-confluent commented on pull request #12120: Add mini test
stan-confluent commented on PR #12120: URL: https://github.com/apache/kafka/pull/12120#issuecomment-1180978870 Passed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/5019/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6
mdedetrich commented on PR #11478: URL: https://github.com/apache/kafka/pull/11478#issuecomment-1180951825 No worries, I am just a bit unfamiliar with the process for KIP's. Also letting you know the KIP was accepted into the 3.3 release so I do believe it needs to be backported as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-10000: System tests (KIP-618)
C0urante commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1180923772 I ran the tests locally when I first wrote them. In the year since then, enough has changed (including getting a new laptop) that I'm no longer able to run them locally. Attempts to do so using Docker have led to some hung JVMs and appear to be due to environmental issues. If there's dedicated hardware out there to run these on, it'd be nice if we could leverage that for these tests. Otherwise, I can try to diagnose my local Docker issues and/or experiment with an alternative testing setup. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14058) Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest
[ https://issues.apache.org/jira/browse/KAFKA-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17565173#comment-17565173 ] Chris Egerton edited comment on KAFKA-14058 at 7/11/22 8:17 PM: Hi Christo! I assigned these three to myself because they touch on test classes that I've either written from scratch or heavily modified recently, and I anticipate that the changes will be fairly similar across the three. I think there are other opportunities to help if you're interested--even though every subtask in https://issues.apache.org/jira/browse/KAFKA-7438 is assigned, there are definitely some that seem to have gone stale. Perhaps you could check on some of those items first? was (Author: chrisegerton): Hi Christo! I assigned these three to myself because they touch on test classes that I've either written from scratch or heavily modified recently, and I anticipate that the changes will be fairly similar across the three. I think there are other opportunities to help if you're interested, though. Even though every subtask in https://issues.apache.org/jira/browse/KAFKA-7438 is assigned, there are definitely some that seem to have gone stale. Perhaps you could check on some of those items first? > Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest > -- > > Key: KAFKA-14058 > URL: https://issues.apache.org/jira/browse/KAFKA-14058 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14058) Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest
[ https://issues.apache.org/jira/browse/KAFKA-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17565173#comment-17565173 ] Chris Egerton edited comment on KAFKA-14058 at 7/11/22 8:16 PM: Hi Christo! I assigned these three to myself because they touch on test classes that I've either written from scratch or heavily modified recently, and I anticipate that the changes will be fairly similar across the three. I think there are other opportunities to help if you're interested, though. Even though every subtask in https://issues.apache.org/jira/browse/KAFKA-7438 is assigned, there are definitely some that seem to have gone stale. Perhaps you could check on some of those items first? was (Author: chrisegerton): Hi Chris! I assigned these three to myself because they touch on test classes that I've either written from scratch or heavily modified recently, and I anticipate that the changes will be fairly similar across the three. I think there are other opportunities to help if you're interested, though. Even though every subtask in https://issues.apache.org/jira/browse/KAFKA-7438 is assigned, there are definitely some that seem to have gone stale. Perhaps you could check on some of those items first? > Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest > -- > > Key: KAFKA-14058 > URL: https://issues.apache.org/jira/browse/KAFKA-14058 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14058) Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest
[ https://issues.apache.org/jira/browse/KAFKA-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17565173#comment-17565173 ] Chris Egerton commented on KAFKA-14058: --- Hi Chris! I assigned these three to myself because they touch on test classes that I've either written from scratch or heavily modified recently, and I anticipate that the changes will be fairly similar across the three. I think there are other opportunities to help if you're interested, though. Even though every subtask in https://issues.apache.org/jira/browse/KAFKA-7438 is assigned, there are definitely some that seem to have gone stale. Perhaps you could check on some of those items first? > Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest > -- > > Key: KAFKA-14058 > URL: https://issues.apache.org/jira/browse/KAFKA-14058 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6
mimaison commented on PR #11478: URL: https://github.com/apache/kafka/pull/11478#issuecomment-1180809834 @mdedetrich Sorry for the delay. I hope to review your PR in the coming days. Please keep it again trunk, if we also want it in 3.3, we'll backport it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14067) Sink connector override.consumer.group.id can conflict with worker group.id
[ https://issues.apache.org/jira/browse/KAFKA-14067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17565164#comment-17565164 ] Greg Harris commented on KAFKA-14067: - This bug is addressed by the fix in https://github.com/apache/kafka/pull/11369 > Sink connector override.consumer.group.id can conflict with worker group.id > --- > > Key: KAFKA-14067 > URL: https://issues.apache.org/jira/browse/KAFKA-14067 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Greg Harris >Priority: Minor > > Currently there is a validation step for connector names which prevents sink > connector consumer groups from colliding with the worker group.id. > There is currently no such validation for consumer.override.group.id that > would prevent a conflicting connector from being configured, and so it is > possible to misconfigure a connector in a way that may be damaging to the > workers themselves. > Reproduction steps: > 1. Configure a connect distributed cluster with a certain group.id in the > worker config. > 2. Configure a sink connector with consumer.override.group.id having the same > value as in the worker config > Expected behavior: > 1. An error is returned indicating that the consumer.override.group.id is > invalid > 2. The connector is not created or started > Actual behavior: > 1. No error is returned, and the configuration is otherwise valid. > 2. The connector is created and starts running. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14067) Sink connector override.consumer.group.id can conflict with worker group.id
Greg Harris created KAFKA-14067: --- Summary: Sink connector override.consumer.group.id can conflict with worker group.id Key: KAFKA-14067 URL: https://issues.apache.org/jira/browse/KAFKA-14067 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.3.0 Reporter: Greg Harris Currently there is a validation step for connector names which prevents sink connector consumer groups from colliding with the worker group.id. There is currently no such validation for consumer.override.group.id that would prevent a conflicting connector from being configured, and so it is possible to misconfigure a connector in a way that may be damaging to the workers themselves. Reproduction steps: 1. Configure a connect distributed cluster with a certain group.id in the worker config. 2. Configure a sink connector with consumer.override.group.id having the same value as in the worker config Expected behavior: 1. An error is returned indicating that the consumer.override.group.id is invalid 2. The connector is not created or started Actual behavior: 1. No error is returned, and the configuration is otherwise valid. 2. The connector is created and starts running. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mdedetrich commented on pull request #11478: KAFKA-13299: Accept duplicate listener on port for IPv4/IPv6
mdedetrich commented on PR #11478: URL: https://github.com/apache/kafka/pull/11478#issuecomment-1180742598 @jsancio Now that you have created the 3.3.0 release branch do I need to change the base branch of this PR to the new 3.3.0 branch or should it remain on trunk? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13953) kafka Console consumer fails with CorruptRecordException
[ https://issues.apache.org/jira/browse/KAFKA-13953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17565138#comment-17565138 ] Jun Rao commented on KAFKA-13953: - [~doguscan] : The broker verifies the batch level CRC before appending the batch to the log. So, it's more likely that the corruption happened at the storage level. Byte wise, does the corrupted area have any patterns (e.g. consecutive 0s)? Next time if this happens again, it would be useful to compare the bytes across replicas to see if the corrupted bytes are identical across replicas. > kafka Console consumer fails with CorruptRecordException > - > > Key: KAFKA-13953 > URL: https://issues.apache.org/jira/browse/KAFKA-13953 > Project: Kafka > Issue Type: Bug > Components: consumer, controller, core >Affects Versions: 2.7.0 >Reporter: Aldan Brito >Priority: Blocker > > Kafka consumer fails with corrupt record exception. > {code:java} > opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server *.*.*.*: > --topic BQR-PULL-DEFAULT --from-beginning > > /opt/nokia/kafka-zookeeper-clustering/kafka/topic-data/tmpsdh/dumptest > [{*}2022-05-15 18:34:15,146]{*} ERROR Error processing message, terminating > consumer process: (kafka.tools.ConsoleConsumer$) > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from BQR-PULL-DEFAULT-30. If needed, please seek past the record > to continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1577) > at > org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access$1700(Fetcher.java:1432) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:684) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:635) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1276) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:438) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:104) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:78) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:55) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > Caused by: org.apache.kafka.common.errors.CorruptRecordException: Record size > 0 is less than the minimum record overhead (14) > Processed a total of 15765197 messages {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14066) local-confluent-platform fails to start: due to hobo-register-dns not found error
Taha Ismail created KAFKA-14066: --- Summary: local-confluent-platform fails to start: due to hobo-register-dns not found error Key: KAFKA-14066 URL: https://issues.apache.org/jira/browse/KAFKA-14066 Project: Kafka Issue Type: Bug Environment: attempted on M1 mac as well as ubuntu CVM Reporter: Taha Ismail local-confluent-platform fails to start up, the following happens when the kafka-up script is run: {noformat} tismail@tismail:~/indeed/local-confluent-platform$ sudo sh kafka-up.sh contact-us-modules ../contact-us-modules/local-kafka-init.sh kafka-up.sh: 28: hobo-host-ip: not found Prefixing service names with 'contact-us-modules-' contact-us-modules-zookeeper is up-to-date Starting contact-us-modules-broker ... done Starting contact-us-modules-schema-registry ... done Starting contact-us-modules-kafka-connect ... Starting contact-us-modules-kafka-connect ... done Starting contact-us-modules-rest-proxy ... Starting contact-us-modules-rest-proxy ... done Starting contact-us-modules-control-center ... done kafka-up.sh: 36: hobo-register-dns: not found kafka-up.sh: 37: hobo-register-dns: not found kafka-up.sh: 38: hobo-register-dns: not found kafka-up.sh: 39: hobo-register-dns: not found kafka-up.sh: 40: hobo-register-dns: not found kafka-up.sh: 41: hobo-register-dns: not found kafka-up.sh: 42: hobo-register-dns: not found Pausing 10 seconds before running init script(s) Running custom init script ../contact-us-modules/local-kafka-init.sh kafka-up.sh: 62: source: not found{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri opened a new pull request, #12401: Minor: replace .kafka with .log in implementation documentation
fvaleri opened a new pull request, #12401: URL: https://github.com/apache/kafka/pull/12401 This is a minor change required to align the documentation to the current implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13436) Omitted BrokerTopicMetrics metrics in the documentation
[ https://issues.apache.org/jira/browse/KAFKA-13436?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17565062#comment-17565062 ] ASF GitHub Bot commented on KAFKA-13436: mimaison merged PR #417: URL: https://github.com/apache/kafka-site/pull/417 > Omitted BrokerTopicMetrics metrics in the documentation > --- > > Key: KAFKA-13436 > URL: https://issues.apache.org/jira/browse/KAFKA-13436 > Project: Kafka > Issue Type: Bug > Components: documentation >Reporter: Dongjin Lee >Assignee: Dongjin Lee >Priority: Minor > Fix For: 3.3.0 > > > As of present, there are 18 'kafka.server:type=BrokerTopicMetrics' but, only > 13 of them are described in the documentation. > The omitted metrics are: > * kafka.server:type=BrokerTopicMetrics,name=TotalProduceRequestsPerSec > * kafka.server:type=BrokerTopicMetrics,name=TotalFetchRequestsPerSec > * kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec > * kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec > * kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect
mimaison commented on PR #11442: URL: https://github.com/apache/kafka/pull/11442#issuecomment-1180548955 @pjmagee Thanks for the contribution! As @rhauch mentioned above SMTs are part of the API so in order to add new configurations we need a KIP. The process is documented in https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals Considering it's a pretty small changes it should be a relatively straight forward KIP. Let me know if you have any questions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13821) Update Kafka Streams WordCount demo to new Processor API
[ https://issues.apache.org/jira/browse/KAFKA-13821?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov resolved KAFKA-13821. --- Resolution: Fixed > Update Kafka Streams WordCount demo to new Processor API > > > Key: KAFKA-13821 > URL: https://issues.apache.org/jira/browse/KAFKA-13821 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 3.3.0 >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Chun-Hao Tang >Priority: Minor > > Once KIP-820 is merged and release, WordCount[1] demo will be using > deprecated APIs: > [https://github.com/apache/kafka/pull/11993#discussion_r847744046] > [1] > https://github.com/apache/kafka/blob/0d518aaed158896ee9ee6949b8f38128d1d73634/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountTransformerDemo.java -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] blcksrx opened a new pull request, #12400: KAFKA-13648: KRaft ClusterInstance does not allow for deferred start
blcksrx opened a new pull request, #12400: URL: https://github.com/apache/kafka/pull/12400 This issue happens cause the `cluster.startup()` invoked already and the solution is just check the `clusterConfig.isAutoStart` in the `BeforeTestExecutionCallback`. In addition I believe checking just a broker state is not sufficient and it's better to invoke `cluster.waitForReadyBrokers()` instead but I didn't change it, cause it was out of scope of the issue. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14058) Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest
[ https://issues.apache.org/jira/browse/KAFKA-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17565049#comment-17565049 ] Christo Lolov commented on KAFKA-14058: --- Hello! I have been trying to move streams from JUnit 4 to JUnit 5 and in there are are multiple similar tasks which are in various stages of being carried out. Would it be a problem if I take on of the other issues you have created (KAFKA-14059, KAFKA-14060) and help you in getting this to conclusion? > Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest > -- > > Key: KAFKA-14058 > URL: https://issues.apache.org/jira/browse/KAFKA-14058 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14058) Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest
[ https://issues.apache.org/jira/browse/KAFKA-14058?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17565049#comment-17565049 ] Christo Lolov edited comment on KAFKA-14058 at 7/11/22 3:06 PM: Hello! I have been trying to move streams from JUnit 4 to JUnit 5 and in there are are multiple similar tasks which are in various stages of being carried out. Would it be a problem if I take on one of the other issues you have created (KAFKA-14059, KAFKA-14060) and help you in getting this to conclusion? was (Author: christo_lolov): Hello! I have been trying to move streams from JUnit 4 to JUnit 5 and in there are are multiple similar tasks which are in various stages of being carried out. Would it be a problem if I take on of the other issues you have created (KAFKA-14059, KAFKA-14060) and help you in getting this to conclusion? > Replace EasyMock and PowerMock with Mockito in ExactlyOnceWorkerSourceTaskTest > -- > > Key: KAFKA-14058 > URL: https://issues.apache.org/jira/browse/KAFKA-14058 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14065) kafka-clients version 2.3.0 can not process lz4 compression type
[ https://issues.apache.org/jira/browse/KAFKA-14065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] lifa updated KAFKA-14065: - Description: kafka-clients version 2.3.0 can not process message with lz4 compression type *1.set message compression type :lz4* *2.when consume messages exception happened:* org.apache.kafka.common.KafkaException: Received exception when fetching the next record from xxx-2-0(your topic and partition). If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Stream frame descriptor corrupted Caused by: java.io.IOException: Stream frame descriptor corrupted at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132) at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.(KafkaLZ4BlockInputStream.java:78) at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110) ... 34 more was: kafka-clients version 2.3.0 can not process message with lz4 compression type set message compression type :lz4 exception : org.apache.kafka.common.KafkaException: Received exception when fetching the next record from xxx-2-0. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Stream frame descriptor corrupted Caused by: java.io.IOException: Stream frame descriptor corrupted at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132) at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.(KafkaLZ4BlockInputStream.java:78) at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110) ... 34 more > kafka-clients version 2.3.0 can not process lz4 compression type > - > > Key: KAFKA-14065 > URL: https://issues.apache.org/jira/browse/KAFKA-14065 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.3.0 >Reporter: lifa >Priority: Critical > > kafka-clients version 2.3.0 can not process message with lz4 compression > type > *1.set message compression type :lz4* > *2.when consume messages exception happened:* > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from xxx-2-0(your topic and partition). If needed, please seek > past the record to continue consumption. > Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: > Stream frame descriptor corrupted > Caused by: java.io.IOException: Stream frame descriptor corrupted > at > org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132) > at > org.apache.kafka.common.record.KafkaLZ4BlockInputStream.(KafkaLZ4BlockInputStream.java:78) > at > org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110) > ... 34 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14065) kafka-clients version 2.3.0 can not process lz4 compression type
lifa created KAFKA-14065: Summary: kafka-clients version 2.3.0 can not process lz4 compression type Key: KAFKA-14065 URL: https://issues.apache.org/jira/browse/KAFKA-14065 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.3.0 Reporter: lifa kafka-clients version 2.3.0 can not process message with lz4 compression type set message compression type :lz4 exception : org.apache.kafka.common.KafkaException: Received exception when fetching the next record from xxx-2-0. If needed, please seek past the record to continue consumption. Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Stream frame descriptor corrupted Caused by: java.io.IOException: Stream frame descriptor corrupted at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132) at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.(KafkaLZ4BlockInputStream.java:78) at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110) ... 34 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14013) Limit the length of the `reason` field sent on the wire
[ https://issues.apache.org/jira/browse/KAFKA-14013?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Evgeny Tolbakov reassigned KAFKA-14013: --- Assignee: Evgeny Tolbakov (was: David Jacot) > Limit the length of the `reason` field sent on the wire > --- > > Key: KAFKA-14013 > URL: https://issues.apache.org/jira/browse/KAFKA-14013 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.2.0, 3.3.0 >Reporter: David Jacot >Assignee: Evgeny Tolbakov >Priority: Blocker > > KIP-800 added the `reason` field to the JoinGroupRequest and the > LeaveGroupRequest as I mean to provide more information to the group > coordinator. In https://issues.apache.org/jira/browse/KAFKA-13998, we > discovered that the size of the field is limited to 32767 chars by our > serialization mechanism. At the moment, the field either provided directly by > the user or constructed internally is directly set regardless of its length. > Given the purpose of this field, it seems acceptable to only sent the first > 255 chars on the wire. That would prevent us from hitting that limit again > while ensuring that the amount of bytes sent to the broker remain within an > acceptable range. > We should apply this to the JoinGroupRequest and to the LeaveGroupRequest. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] etolbakov commented on pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
etolbakov commented on PR #12388: URL: https://github.com/apache/kafka/pull/12388#issuecomment-1180434338 @dajac great news! Thank you very much for your help & review, David! my Jira handle `etolbakov`, probably need some permissions to be able to assign tickets to myself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
dajac commented on PR #12388: URL: https://github.com/apache/kafka/pull/12388#issuecomment-1180420597 @etolbakov Could you assign https://issues.apache.org/jira/browse/KAFKA-14013 to yourself? If you don't have Jira setup, could you give me your userid and I will set it up for you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] etolbakov commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
etolbakov commented on code in PR #12388: URL: https://github.com/apache/kafka/pull/12388#discussion_r917938399 ## clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java: ## @@ -70,6 +70,21 @@ public static void validateGroupInstanceId(String id) { }); } +/** + * Ensures that the provided {@code reason} remains within a range of 255 chars. + * @param reason This is the reason that is sent to the broker over the wire + * as a part of {@code JoinGroupRequest}, {@code LeaveGroupRequest} + * or {@code RemoveMembersFromConsumerGroupOptions} messages. Review Comment: though it looks like a straightforward change, probably I need to spend more time digesting it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11783: KAFKA-10000: System tests (KIP-618)
showuon commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1180411188 @C0urante , I'll take a look this week. Before that, I'd like to know if you have run these system tests locally? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12308: KAFKA-14009: update rebalance timeout in memory when consumers use st…
dajac commented on code in PR #12308: URL: https://github.com/apache/kafka/pull/12308#discussion_r917915339 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -1300,7 +1304,9 @@ class GroupCoordinator(val brokerId: Int, completeAndScheduleNextHeartbeatExpiration(group, member) val knownStaticMember = group.get(newMemberId) -group.updateMember(knownStaticMember, protocols, responseCallback) +val oldRebalanceTimeoutMs = knownStaticMember.rebalanceTimeoutMs +val oldSessionTimeoutMs = knownStaticMember.sessionTimeoutMs +group.updateMember(knownStaticMember, protocols, rebalanceTimeoutMs, sessionTimeoutMs, responseCallback) Review Comment: @Stephan14 Could you add a unit test in `GroupCoordinatorTest`? If you have trouble with this, could you explain your issues? I can perhaps help you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
dajac commented on code in PR #12388: URL: https://github.com/apache/kafka/pull/12388#discussion_r917912085 ## clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java: ## @@ -70,6 +70,21 @@ public static void validateGroupInstanceId(String id) { }); } +/** + * Ensures that the provided {@code reason} remains within a range of 255 chars. + * @param reason This is the reason that is sent to the broker over the wire + * as a part of {@code JoinGroupRequest}, {@code LeaveGroupRequest} + * or {@code RemoveMembersFromConsumerGroupOptions} messages. Review Comment: nit: We can remove this line because, in the end, we also send a LeaveGroupRequest in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] etolbakov commented on pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
etolbakov commented on PR #12388: URL: https://github.com/apache/kafka/pull/12388#issuecomment-1180388125 Thank you for the feedback @dajac! sorry for the indentation issues, will re-read the contributor recommendations on that matter and make sure it won't be the case again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] etolbakov commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
etolbakov commented on code in PR #12388: URL: https://github.com/apache/kafka/pull/12388#discussion_r917905764 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1432,4 +1432,17 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } +/** + * Ensures that the provided {@code reason} remains within a range of 255 chars. + * @param reason This is the reason that is sent to the broker over the wire + * as a part of {@code JoinGroupRequest}, {@code LeaveGroupRequest} or {@code RemoveMembersFromConsumerGroupOptions} messages. + * @return a provided reason as is or truncated reason if it exceeds the 255 chars threshold. + */ +public static String truncateIfRequired(final String reason) { Review Comment: thanks for the suggestions! yeah `maybeTruncateReason` for sure is a better name, also I've noticed there are a few method names that start with "maybe" so it will be consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12381: KAFKA-13474: Allow reconfiguration of SSL certs for broker to controller connection
divijvaidya commented on PR #12381: URL: https://github.com/apache/kafka/pull/12381#issuecomment-1180373398 Hey @showuon How do we make the decision on what version do we want to backport a bug to? This bug exists in versions >= 2.7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #10964: KAFKA-13043: Implement Admin APIs for offsetFetch batching
rajinisivaram commented on PR #10964: URL: https://github.com/apache/kafka/pull/10964#issuecomment-1180354994 After discussing with @skaundinya15 offline, I have rebased and made some changes to the PR to get it ready to merge into 3.3. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12617) Convert MetadataRequestTest to use ClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17564941#comment-17564941 ] Sayed Mohammad Hossein Torabi commented on KAFKA-12617: --- I would like to work on it > Convert MetadataRequestTest to use ClusterTest > -- > > Key: KAFKA-12617 > URL: https://issues.apache.org/jira/browse/KAFKA-12617 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on a diff in pull request #12388: KAFKA-14013: Limit the length of the `reason` field sent on the wire
dajac commented on code in PR #12388: URL: https://github.com/apache/kafka/pull/12388#discussion_r917780526 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1432,4 +1432,17 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } +/** + * Ensures that the provided {@code reason} remains within a range of 255 chars. + * @param reason This is the reason that is sent to the broker over the wire + * as a part of {@code JoinGroupRequest}, {@code LeaveGroupRequest} or {@code RemoveMembersFromConsumerGroupOptions} messages. + * @return a provided reason as is or truncated reason if it exceeds the 255 chars threshold. + */ +public static String truncateIfRequired(final String reason) { Review Comment: nit: As this is tight to the reason, I would rather put it in `JoinGroupRequest`. Should we call it `maybeTruncateReason`? ## clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java: ## @@ -4084,6 +4084,13 @@ public void testRemoveMembersFromGroupReason() throws Exception { testRemoveMembersFromGroup("testing remove members reason", "testing remove members reason"); } +@Test +public void testRemoveMembersFromGroupReasonAndTruncateReason() throws Exception { Review Comment: nit: `testRemoveMembersFromGroupTruncatesReason`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -478,11 +478,12 @@ boolean joinGroupIfNeeded(final Timer timer) { resetJoinGroupFuture(); synchronized (AbstractCoordinator.this) { +final String simpleName = exception.getClass().getSimpleName(); final String shortReason = String.format("rebalance failed due to %s", -exception.getClass().getSimpleName()); +simpleName); Review Comment: nit: Could we put this one on the previous line? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java: ## @@ -1193,6 +1215,12 @@ public void testHandleLeaveGroupResponseWithException() { } private RequestFuture setupLeaveGroup(LeaveGroupResponse leaveGroupResponse) { +return setupLeaveGroup(leaveGroupResponse, "test maybe leave group", "test maybe leave group"); +} + +private RequestFuture setupLeaveGroup(LeaveGroupResponse leaveGroupResponse, +String expectedLeaveReason, +String actualLeaveReason) { Review Comment: nit: I would put `actualLeaveReason` first. It is a bit more natural. Should we call it `leaveReason`? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -478,11 +478,12 @@ boolean joinGroupIfNeeded(final Timer timer) { resetJoinGroupFuture(); synchronized (AbstractCoordinator.this) { +final String simpleName = exception.getClass().getSimpleName(); final String shortReason = String.format("rebalance failed due to %s", -exception.getClass().getSimpleName()); +simpleName); final String fullReason = String.format("rebalance failed due to '%s' (%s)", -exception.getMessage(), -exception.getClass().getSimpleName()); +exception.getMessage(), +simpleName); Review Comment: nit: Could we revert to the previous indentation? ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java: ## @@ -1204,11 +1232,11 @@ private RequestFuture setupLeaveGroup(LeaveGroupResponse leaveGroupRespons } LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) body).data(); return leaveGroupRequest.members().get(0).memberId().equals(memberId) && - leaveGroupRequest.members().get(0).reason().equals("test maybe leave group"); + leaveGroupRequest.members().get(0).reason().equals(expectedLeaveReason); Review Comment: nit: Could we remove that extra added space? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12399: KAFKA-14063: Prevent malicious tiny payloads from causing OOMs with variably sized collections
clolov commented on PR #12399: URL: https://github.com/apache/kafka/pull/12399#issuecomment-1180186207 Hello! This looks like a very interesting find. Could you write a test which tests this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12302: KAFKA-14004: Migrate streams module to JUnit 5 - Part 3
clolov commented on PR #12302: URL: https://github.com/apache/kafka/pull/12302#issuecomment-1180175166 Politely bumping for a review @cadonna -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12285: KAFKA-14001: Migrate streams module to JUnit 5 - Part 1
clolov commented on PR #12285: URL: https://github.com/apache/kafka/pull/12285#issuecomment-1180174715 Politely bumping the review @cadonna -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12301: KAFKA-14003: Migrate streams module to JUnit 5 - Part 2
clolov commented on PR #12301: URL: https://github.com/apache/kafka/pull/12301#issuecomment-1180174990 Politely bumping for a review @cadonna -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14064) MirrorMaker2 stops task when record is too big
David Dufour created KAFKA-14064: Summary: MirrorMaker2 stops task when record is too big Key: KAFKA-14064 URL: https://issues.apache.org/jira/browse/KAFKA-14064 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.7.1 Reporter: David Dufour As MirrorMaker2 does currently not support shallow mirrorring ([KIP-712: Shallow Mirroring|https://wiki.apache.org/confluence/display/KAFKA/KIP-712%3A+Shallow+Mirroring]), if a producer has produced using compression in one mirrorred topic, MirrorMaker2 will get the message uncompressed at some point and if not properly tuned (typically {{{}max.request.size{}}}), it may fail with a RecordTooLargeException: org.apache.kafka.connect.errors.ConnectException: Unrecoverable exception from producer send callback at org.apache.kafka.connect.runtime.WorkerSourceTask.maybeThrowProducerSendException(WorkerSourceTask.java:284) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:338) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:256) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1049087 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n" worker_id: 'xxx.xxx.xxx.xxx:8083' The task is stopped and needs a manual restart. However, this seems to be a bit overkill because, amongst all partitions replicated by the task, only one is problematic. Stopping the replication on all partitions can make a severe impact. It would be more optimized to 'suspend' the partition involved and keep replication working for all remaining ones. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] pch8388 commented on pull request #12389: MINOR: refactor result string
pch8388 commented on PR #12389: URL: https://github.com/apache/kafka/pull/12389#issuecomment-1180088134 Thanks for the good point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12389: MINOR: Fix result string
showuon commented on PR #12389: URL: https://github.com/apache/kafka/pull/12389#issuecomment-1180087325 Good point. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pch8388 commented on pull request #12389: MINOR: Fix result string
pch8388 commented on PR #12389: URL: https://github.com/apache/kafka/pull/12389#issuecomment-1180087281 That's my mistake. I should say it's a simple refactoring. I'll edit the PR title. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on PR #12347: URL: https://github.com/apache/kafka/pull/12347#issuecomment-1180086966 @tombentley , I've updated the PR in this commit: https://github.com/apache/kafka/pull/12347/commits/1d04b6dc30f525cbbe6d3daf9c7c5e5a331896cb . Please take a look again. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r917648253 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -410,12 +437,34 @@ class LogManager(logDirs: Seq[File], error(s"There was an error in one of the threads during logs loading: ${e.getCause}") throw e.getCause } finally { + removeLogRecoveryMetrics() threadPools.foreach(_.shutdown()) } info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.") } + private[log] def addLogRecoveryMetrics(): Unit = { +for (dir <- logDirs) { + newGauge("remainingLogsToRecover", () => numRemainingLogs.get(dir.getAbsolutePath), +Map("dir" -> dir.getAbsolutePath)) + for (i <- 0 until numRecoveryThreadsPerDataDir) { +val threadName = s"log-recovery-${dir.getAbsolutePath}-$i" +newGauge("remainingSegmentsToRecover", () => numRemainingSegments.get(threadName), + Map("dir" -> dir.getAbsolutePath, "threadNum" -> i.toString)) + } +} + } + + private[log] def removeLogRecoveryMetrics(): Unit = { +for (dir <- logDirs) { + removeMetric("remainingLogsToRecover", Map("dir" -> dir.getAbsolutePath)) + for (i <- 0 until numRecoveryThreadsPerDataDir) { Review Comment: You're right. Passed in the current `numRecoveryThreadsPerDataDir` parameter now to prevent this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r917646894 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -410,12 +437,34 @@ class LogManager(logDirs: Seq[File], error(s"There was an error in one of the threads during logs loading: ${e.getCause}") throw e.getCause } finally { + removeLogRecoveryMetrics() threadPools.foreach(_.shutdown()) } info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.") } + private[log] def addLogRecoveryMetrics(): Unit = { +for (dir <- logDirs) { + newGauge("remainingLogsToRecover", () => numRemainingLogs.get(dir.getAbsolutePath), +Map("dir" -> dir.getAbsolutePath)) + for (i <- 0 until numRecoveryThreadsPerDataDir) { +val threadName = s"log-recovery-${dir.getAbsolutePath}-$i" Review Comment: Good suggestion. Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r917646481 ## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ## @@ -638,6 +641,221 @@ class LogManagerTest { assertTrue(logManager.partitionsInitializing.isEmpty) } + private def appendRecordsToLog(time: MockTime, parentLogDir: File, partitionId: Int, brokerTopicStats: BrokerTopicStats, expectedSegmentsPerLog: Int): Unit = { +def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = time.milliseconds) +val tpFile = new File(parentLogDir, s"$name-$partitionId") + +val log = LogTestUtils.createLog(tpFile, logConfig, brokerTopicStats, time.scheduler, time, 0, 0, + 5 * 60 * 1000, 60 * 60 * 1000, LogManager.ProducerIdExpirationCheckIntervalMs) + +val numMessages = 20 +try { + for (_ <- 0 until numMessages) { +log.appendAsLeader(createRecords, leaderEpoch = 0) + } + + assertEquals(expectedSegmentsPerLog, log.numberOfSegments) +} finally { + log.close() +} + } + + private def verifyRemainingLogsToRecoverMetric(spyLogManager: LogManager, expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingLogsToRecover` metrics +val logMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingLogsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +assertEquals(expectedParams.size, logMetrics.size) + +val capturedPath: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) +val capturedNumRemainingLogs: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + +// Since we'll update numRemainingLogs from totalLogs to 0 for each log dir, so we need to add 1 here +val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum +verify(spyLogManager, times(expectedCallTimes)).updateNumRemainingLogs(capturedPath.capture(), capturedNumRemainingLogs.capture()); + +val paths = capturedPath.getAllValues +val numRemainingLogs = capturedNumRemainingLogs.getAllValues + +// expected the end value is 0 +logMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + +expectedParams.foreach { + case (path, totalLogs) => +// make sure we update the numRemainingLogs from totalLogs to 0 in order for each log dir +var expectedCurRemainingLogs = totalLogs + 1 +for (i <- 0 until paths.size()) { + if (paths.get(i).contains(path)) { +expectedCurRemainingLogs -= 1 +assertEquals(expectedCurRemainingLogs, numRemainingLogs.get(i)) + } +} +assertEquals(0, expectedCurRemainingLogs) +} + } + + private def verifyRemainingSegmentsToRecoverMetric(spyLogManager: LogManager, + logDirs: Seq[File], + recoveryThreadsPerDataDir: Int, + mockMap: ConcurrentHashMap[String, Int], + expectedParams: Map[String, Int]): Unit = { +val spyLogManagerClassName = spyLogManager.getClass().getSimpleName +// get all `remainingSegmentsToRecover` metrics +val logSegmentMetrics: ArrayBuffer[Gauge[Int]] = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + .filter { case (metric, _) => metric.getType == s"$spyLogManagerClassName" && metric.getName == "remainingSegmentsToRecover" } + .map { case (_, gauge) => gauge } + .asInstanceOf[ArrayBuffer[Gauge[Int]]] + +// expected each log dir has 2 metrics for each thread +assertEquals(recoveryThreadsPerDataDir * logDirs.size, logSegmentMetrics.size) + +val capturedThreadName: ArgumentCaptor[String] = ArgumentCaptor.forClass(classOf[String]) +val capturedNumRemainingSegments: ArgumentCaptor[Int] = ArgumentCaptor.forClass(classOf[Int]) + +// Since we'll update numRemainingSegments from totalSegments to 0 for each thread, so we need to add 1 here +val expectedCallTimes = expectedParams.values.map( num => num + 1 ).sum +verify(mockMap, times(expectedCallTimes)).put(capturedThreadName.capture(), capturedNumRemainingSegments.capture()); + +// expected the end value is 0 +logSegmentMetrics.foreach { gauge => assertEquals(0, gauge.value()) } + +val threadNames = capturedThreadName.getAllValues +val numRemainingSegments = capturedNumRemainingSegments.getAllValues + +expectedParams.foreach { + case (threadName, totalSegments) => +// make sure we update the numRemainingSegments from totalSegments to 0 in order for each thread +var expectedCurRemaining
[GitHub] [kafka] showuon commented on a diff in pull request #12347: KAFKA-13919: expose log recovery metrics
showuon commented on code in PR #12347: URL: https://github.com/apache/kafka/pull/12347#discussion_r917645850 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -410,12 +442,34 @@ class LogManager(logDirs: Seq[File], error(s"There was an error in one of the threads during logs loading: ${e.getCause}") throw e.getCause } finally { + removeLogRecoveryMetrics(curNumRecoveryThreadsPerDataDir) threadPools.foreach(_.shutdown()) } info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.") } + private[log] def addLogRecoveryMetrics(curNumRecoveryThreadsPerDataDir: Int): Unit = { +for (dir <- logDirs) { + newGauge("remainingLogsToRecover", () => numRemainingLogs.get(dir.getAbsolutePath), +Map("dir" -> dir.getAbsolutePath)) + for (i <- 0 until curNumRecoveryThreadsPerDataDir) { +val threadName = logRecoveryThreadName(dir.getAbsolutePath, i) +newGauge("remainingSegmentsToRecover", () => numRemainingSegments.get(threadName), + Map("dir" -> dir.getAbsolutePath, "threadNum" -> i.toString)) + } +} + } + + private[log] def removeLogRecoveryMetrics(curNumRecoveryThreadsPerDataDir: Int): Unit = { +for (dir <- logDirs) { + removeMetric("remainingLogsToRecover", Map("dir" -> dir.getAbsolutePath)) + for (i <- 0 until curNumRecoveryThreadsPerDataDir) { +removeMetric("remainingSegmentsToRecover", Map("dir" -> dir.getAbsolutePath, "threadNum" -> i.toString)) Review Comment: pass the `curNumRecoveryThreadsPerDataDir` in so that it won't be affected by the dynamically numRecoveryThreads change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12389: MINOR: Fix result string
ijuma commented on PR #12389: URL: https://github.com/apache/kafka/pull/12389#issuecomment-1180083712 The PR description says "Fix...". Are we saying it's not a fix, it's simply a refactoring? We should make it clear if so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12389: MINOR: Fix result string
showuon commented on PR #12389: URL: https://github.com/apache/kafka/pull/12389#issuecomment-1180078726 @ijuma , thanks for the reminder. But I've checked and confirmed there is already a unit test covered this change: `ConfigDefTest#testNiceMemoryUnits`. I should have mentioned it in the review comments. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pch8388 commented on pull request #12389: MINOR: Fix result string
pch8388 commented on PR #12389: URL: https://github.com/apache/kafka/pull/12389#issuecomment-1180070556 Thanks for the review. How do I add changes to a merged PR? Need to open a new PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12389: MINOR: Fix result string
ijuma commented on PR #12389: URL: https://github.com/apache/kafka/pull/12389#issuecomment-1180049917 @showuon Whenever reviewing PRs for fixes, we should generally include at least a unit test. There needs to be a strong reason to merge a fix without any test changes/additions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #12389: MINOR: Fix result string
ijuma commented on PR #12389: URL: https://github.com/apache/kafka/pull/12389#issuecomment-1180049398 Thanks for the PR. Can you please include a unit test for this fix? Also, `String.format` performs worse than string concatenation. Seems ok here, but worth keeping in mind for areas where performance is important. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org