[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16776694#comment-16776694 ] Viktor Somogyi-Vass commented on KAFKA-6794: hey [~sql_consulting], thanks for sharing this. I think it is also a good approach, and frankly now the only way one could incrementalize reassignments manually. How would you make this queue? Do you have an algorithm for choosing the next replica to drop and to add? > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer
Patrik Kleindl created KAFKA-7996: - Summary: KafkaStreams does not pass timeout when closing Producer Key: KAFKA-7996 URL: https://issues.apache.org/jira/browse/KAFKA-7996 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.1.0 Reporter: Patrik Kleindl [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/] We are running 2.1 and have a case where the shutdown of a streams application takes several minutes I noticed that although we call streams.close with a timeout of 30 seconds the log says [Producer clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. Matthias J Sax [vor 3 Tagen] I just checked the code, and yes, we don't provide a timeout for the producer on close()... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7997) Replace SaslAuthenticate request/response with automated protocol
Mickael Maison created KAFKA-7997: - Summary: Replace SaslAuthenticate request/response with automated protocol Key: KAFKA-7997 URL: https://issues.apache.org/jira/browse/KAFKA-7997 Project: Kafka Issue Type: Sub-task Reporter: Mickael Maison Assignee: Mickael Maison -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7268) Review Handling Crypto Rules and update ECCN page if needed
[ https://issues.apache.org/jira/browse/KAFKA-7268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16776780#comment-16776780 ] Rajini Sivaram commented on KAFKA-7268: --- Thanks [~bayard] I have updated the source URLs to use gitbox. Can you publish once again? Sorry about that. > Review Handling Crypto Rules and update ECCN page if needed > --- > > Key: KAFKA-7268 > URL: https://issues.apache.org/jira/browse/KAFKA-7268 > Project: Kafka > Issue Type: Task >Reporter: Henri Yandell >Assignee: Rajini Sivaram >Priority: Blocker > > It is suggested in LEGAL-358 that Kafka is containing/using cryptographic > functions and does not have an entry on the ECCN page ( > [http://www.apache.org/licenses/exports/] ). > See [http://www.apache.org/dev/crypto.html] to review and confirm whether you > should add something to the ECCN page, and if needed, please do so. > The text in LEGAL-358 was: > [~zznate] added a comment - 18/Jan/18 16:59 > [~gregSwilliam] I think [~gshapira_impala_35cc] worked on some of that (def. > on kafka client SSL stuff) and is on the Kafka PMC. She can probably help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5453) Controller may miss requests sent to the broker when zk session timeout happens.
[ https://issues.apache.org/jira/browse/KAFKA-5453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16776957#comment-16776957 ] Viktor Somogyi-Vass commented on KAFKA-5453: Just wanted to add: I'll chase the proposal in the description, so either the broker asking the controller for the latest replica assignment or the controller providing it automatically. I had a quick idea to reenque the StopReplicaRequest but I think this is not as robust as the other idea. > Controller may miss requests sent to the broker when zk session timeout > happens. > > > Key: KAFKA-5453 > URL: https://issues.apache.org/jira/browse/KAFKA-5453 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.11.0.0 >Reporter: Jiangjie Qin >Assignee: Viktor Somogyi-Vass >Priority: Major > Fix For: 2.3.0 > > > The issue I encountered was the following: > 1. Partition reassignment was in progress, one replica of a partition is > being reassigned from broker 1 to broker 2. > 2. Controller received an ISR change notification which indicates broker 2 > has caught up. > 3. Controller was sending StopReplicaRequest to broker 1. > 4. Broker 1 zk session timeout occurs. Controller removed broker 1 from the > cluster and cleaned up the queue. i.e. the StopReplicaRequest was removed > from the ControllerChannelManager. > 5. Broker 1 reconnected to zk and act as if it is still a follower replica of > the partition. > 6. Broker 1 will always receive exception from the leader because it is not > in the replica list. > Not sure what is the correct fix here. It seems that broke 1 in this case > should ask the controller for the latest replica assignment. > There are two related bugs: > 1. when a {{NotAssignedReplicaException}} is thrown from > {{Partition.updateReplicaLogReadResult()}}, the other partitions in the same > request will failed to update the fetch timestamp and offset and thus also > drop out of the ISR. > 2. The {{NotAssignedReplicaException}} was not properly returned to the > replicas, instead, a UnknownServerException is returned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7922) Returned authorized operations in describe responses (KIP-430)
[ https://issues.apache.org/jira/browse/KAFKA-7922?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777005#comment-16777005 ] ASF GitHub Bot commented on KAFKA-7922: --- omkreddy commented on pull request #6322: KAFKA-7922: Return authorized operations in describe consumer group responses (KIP-430) [WIP] URL: https://github.com/apache/kafka/pull/6322 - Use automatic RPC generation in DescribeGroups Request/Response classes - https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Returned authorized operations in describe responses (KIP-430) > -- > > Key: KAFKA-7922 > URL: https://issues.apache.org/jira/browse/KAFKA-7922 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Rajini Sivaram >Assignee: Manikumar >Priority: Major > > Add an option to request authorized operations on resources when describing > resources (topics, onsumer groups and cluster). > See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-430+-+Return+Authorized+Operations+in+Describe+Responses > for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7268) Review Handling Crypto Rules and update ECCN page if needed
[ https://issues.apache.org/jira/browse/KAFKA-7268?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Henri Yandell resolved KAFKA-7268. -- Resolution: Fixed > Review Handling Crypto Rules and update ECCN page if needed > --- > > Key: KAFKA-7268 > URL: https://issues.apache.org/jira/browse/KAFKA-7268 > Project: Kafka > Issue Type: Task >Reporter: Henri Yandell >Assignee: Rajini Sivaram >Priority: Blocker > > It is suggested in LEGAL-358 that Kafka is containing/using cryptographic > functions and does not have an entry on the ECCN page ( > [http://www.apache.org/licenses/exports/] ). > See [http://www.apache.org/dev/crypto.html] to review and confirm whether you > should add something to the ECCN page, and if needed, please do so. > The text in LEGAL-358 was: > [~zznate] added a comment - 18/Jan/18 16:59 > [~gregSwilliam] I think [~gshapira_impala_35cc] worked on some of that (def. > on kafka client SSL stuff) and is on the Kafka PMC. She can probably help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7268) Review Handling Crypto Rules and update ECCN page if needed
[ https://issues.apache.org/jira/browse/KAFKA-7268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777033#comment-16777033 ] Henri Yandell commented on KAFKA-7268: -- Thanks - published. > Review Handling Crypto Rules and update ECCN page if needed > --- > > Key: KAFKA-7268 > URL: https://issues.apache.org/jira/browse/KAFKA-7268 > Project: Kafka > Issue Type: Task >Reporter: Henri Yandell >Assignee: Rajini Sivaram >Priority: Blocker > > It is suggested in LEGAL-358 that Kafka is containing/using cryptographic > functions and does not have an entry on the ECCN page ( > [http://www.apache.org/licenses/exports/] ). > See [http://www.apache.org/dev/crypto.html] to review and confirm whether you > should add something to the ECCN page, and if needed, please do so. > The text in LEGAL-358 was: > [~zznate] added a comment - 18/Jan/18 16:59 > [~gregSwilliam] I think [~gshapira_impala_35cc] worked on some of that (def. > on kafka client SSL stuff) and is on the Kafka PMC. She can probably help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer
[ https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin updated KAFKA-7996: --- Labels: needs-kip (was: ) > KafkaStreams does not pass timeout when closing Producer > > > Key: KAFKA-7996 > URL: https://issues.apache.org/jira/browse/KAFKA-7996 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Patrik Kleindl >Priority: Major > Labels: needs-kip > > [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/] > We are running 2.1 and have a case where the shutdown of a streams > application takes several minutes > I noticed that although we call streams.close with a timeout of 30 seconds > the log says > [Producer > clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. > Matthias J Sax [vor 3 Tagen] > I just checked the code, and yes, we don't provide a timeout for the producer > on close()... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer
[ https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lee Dongjin reassigned KAFKA-7996: -- Assignee: Lee Dongjin > KafkaStreams does not pass timeout when closing Producer > > > Key: KAFKA-7996 > URL: https://issues.apache.org/jira/browse/KAFKA-7996 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Patrik Kleindl >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > > [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/] > We are running 2.1 and have a case where the shutdown of a streams > application takes several minutes > I noticed that although we call streams.close with a timeout of 30 seconds > the log says > [Producer > clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. > Matthias J Sax [vor 3 Tagen] > I just checked the code, and yes, we don't provide a timeout for the producer > on close()... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer
[ https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777037#comment-16777037 ] ASF GitHub Bot commented on KAFKA-7996: --- dongjinleekr commented on pull request #6323: KAFKA-7996: KafkaStreams does not pass timeout when closing Producer URL: https://github.com/apache/kafka/pull/6323 Here is the draft fix. The approach is simple - it adds a new parameter for `StreamThread.TaskCreator` and `RecordCollectorImpl` to denote the close wait duration; I found two `Producer#close()` in streams module but if there is omitted one, don't hesitate to give me a comment. Since it introduces a new config, `close.wait.ms`, it needs a KIP. Isn't it? cc/ @mjsax @bbejeck ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KafkaStreams does not pass timeout when closing Producer > > > Key: KAFKA-7996 > URL: https://issues.apache.org/jira/browse/KAFKA-7996 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Patrik Kleindl >Priority: Major > > [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/] > We are running 2.1 and have a case where the shutdown of a streams > application takes several minutes > I noticed that although we call streams.close with a timeout of 30 seconds > the log says > [Producer > clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. > Matthias J Sax [vor 3 Tagen] > I just checked the code, and yes, we don't provide a timeout for the producer > on close()... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7998) Windows Quickstart script fails
Dieter De Paepe created KAFKA-7998: -- Summary: Windows Quickstart script fails Key: KAFKA-7998 URL: https://issues.apache.org/jira/browse/KAFKA-7998 Project: Kafka Issue Type: Bug Affects Versions: 2.1.0 Reporter: Dieter De Paepe Following the [Quickstart |http://kafka.apache.org/quickstart]guide on windows, I received an error in the script to start Zookeeper: {noformat} The input line is too long. The syntax of the command is incorrect.{noformat} The cause is in the long CLASSPATH being constructed, resulting in a very long string: {noformat} for %%i in ("%BASE_DIR%\libs\*") do ( call :concat "%%i" ) ... :concat IF not defined CLASSPATH ( set CLASSPATH="%~1" ) ELSE ( set CLASSPATH=%CLASSPATH%;"%~1" ){noformat} A simple fix is to change the "kafka-run-class.bat" as follows (for all similar loops): {noformat}for %%i in ("%BASE_DIR%\libs\*") do ( call :concat "%%i" ){noformat} should become {noformat}call :concat "%BASE_DIR%\libs\*"{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7268) Review Handling Crypto Rules and update ECCN page if needed
[ https://issues.apache.org/jira/browse/KAFKA-7268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777089#comment-16777089 ] Rajini Sivaram commented on KAFKA-7268: --- [~bayard] Thank you! > Review Handling Crypto Rules and update ECCN page if needed > --- > > Key: KAFKA-7268 > URL: https://issues.apache.org/jira/browse/KAFKA-7268 > Project: Kafka > Issue Type: Task >Reporter: Henri Yandell >Assignee: Rajini Sivaram >Priority: Blocker > > It is suggested in LEGAL-358 that Kafka is containing/using cryptographic > functions and does not have an entry on the ECCN page ( > [http://www.apache.org/licenses/exports/] ). > See [http://www.apache.org/dev/crypto.html] to review and confirm whether you > should add something to the ECCN page, and if needed, please do so. > The text in LEGAL-358 was: > [~zznate] added a comment - 18/Jan/18 16:59 > [~gregSwilliam] I think [~gshapira_impala_35cc] worked on some of that (def. > on kafka client SSL stuff) and is on the Kafka PMC. She can probably help. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer
[ https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777147#comment-16777147 ] Matthias J. Sax commented on KAFKA-7996: [~guozhang] This is marked as "needs-kip", but I am actually not sure if we need a KIP. Thoughts? > KafkaStreams does not pass timeout when closing Producer > > > Key: KAFKA-7996 > URL: https://issues.apache.org/jira/browse/KAFKA-7996 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Patrik Kleindl >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > > [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/] > We are running 2.1 and have a case where the shutdown of a streams > application takes several minutes > I noticed that although we call streams.close with a timeout of 30 seconds > the log says > [Producer > clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. > Matthias J Sax [vor 3 Tagen] > I just checked the code, and yes, we don't provide a timeout for the producer > on close()... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7997) Replace SaslAuthenticate request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-7997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777151#comment-16777151 ] ASF GitHub Bot commented on KAFKA-7997: --- mimaison commented on pull request #6324: KAFKA-7997: Use automatic RPC generation in SaslAuthenticate URL: https://github.com/apache/kafka/pull/6324 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Replace SaslAuthenticate request/response with automated protocol > - > > Key: KAFKA-7997 > URL: https://issues.apache.org/jira/browse/KAFKA-7997 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777149#comment-16777149 ] GEORGE LI commented on KAFKA-6794: -- Hi, [~viktorsomogyi], When I rebalance the whole cluster, I generate the reassignment plan json with a list of topic/partitions with its new_replicas/original_replicas, and sort them by their size, so try to group them in batches of similar sizes for execution, so that they are expected to complete reassignment using about the same amount of time. Say there are 1000 reassignments, and 50 per batch. That will be at least 20 batches/buckets to put in for execution. Comparing the new_replicas Vs. original_replicas set, the algorithm can detect if there is more than 1 new replica in the new_replicas, if yes, then break it and put in different batch/bucket.There are other considerations of the reassignments in the same batch: e.g. for different topic/partition, try to spread the load and not to overwhelm a Leader. e.g. the Leadership bytes within the same batch for reassignments should be balanced across all brokers/leaders in the cluster as much as possible. I think this (optimal executions of reassignment plans in batches) can only be achieved outside of Kafka. > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7999) Flaky Test ExampleConnectIntegrationTest#testProduceConsumeConnector
Matthias J. Sax created KAFKA-7999: -- Summary: Flaky Test ExampleConnectIntegrationTest#testProduceConsumeConnector Key: KAFKA-7999 URL: https://issues.apache.org/jira/browse/KAFKA-7999 Project: Kafka Issue Type: Bug Components: KafkaConnect, unit tests Affects Versions: 2.2.0 Reporter: Matthias J. Sax Fix For: 2.2.1 To get stable nightly builds for `2.2` release, I create tickets for all observed test failures. [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/30/] {quote}org.apache.kafka.common.KafkaException: Could not produce message to topic=test-topic at org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster.produce(EmbeddedKafkaCluster.java:257) at org.apache.kafka.connect.integration.ExampleConnectIntegrationTest.testProduceConsumeConnector(ExampleConnectIntegrationTest.java:129){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-1) The log4j appender still uses the SyncProducer API
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-1: --- Assignee: Matthias J. Sax > The log4j appender still uses the SyncProducer API > -- > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.6 >Assignee: Matthias J. Sax >Priority: Major > > The log4j appender still uses the SyncProducer API. Change it to use the > Producer API using the StringEncoder instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-1) The log4j appender still uses the SyncProducer API
[ https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-1: --- Assignee: (was: Matthias J. Sax) > The log4j appender still uses the SyncProducer API > -- > > Key: KAFKA-1 > URL: https://issues.apache.org/jira/browse/KAFKA-1 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.6 >Priority: Major > > The log4j appender still uses the SyncProducer API. Change it to use the > Producer API using the StringEncoder instead. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7957) Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate
[ https://issues.apache.org/jira/browse/KAFKA-7957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7957: --- Summary: Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate (was: Flaky Test DynamicBrokerReconfigurationTest #testMetricsReporterUpdate) > Flaky Test DynamicBrokerReconfigurationTest#testMetricsReporterUpdate > - > > Key: KAFKA-7957 > URL: https://issues.apache.org/jira/browse/KAFKA-7957 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0 >Reporter: Matthias J. Sax >Priority: Critical > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/18/] > {quote}java.lang.AssertionError: Messages not sent at > kafka.utils.TestUtils$.fail(TestUtils.scala:356) at > kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:766) at > kafka.server.DynamicBrokerReconfigurationTest.startProduceConsume(DynamicBrokerReconfigurationTest.scala:1270) > at > kafka.server.DynamicBrokerReconfigurationTest.testMetricsReporterUpdate(DynamicBrokerReconfigurationTest.scala:650){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7988) Flaky Test DynamicBrokerReconfigurationTest#testThreadPoolResize
[ https://issues.apache.org/jira/browse/KAFKA-7988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777176#comment-16777176 ] Matthias J. Sax commented on KAFKA-7988: Failed again: [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-2.2-jdk8/runs/30/log/?start=0] {quote}kafka.server.DynamicBrokerReconfigurationTest > testThreadPoolResize FAILED java.lang.AssertionError: Invalid threads: expected 6, got 5: List(ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-1, ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-2, ReplicaFetcherThread-0-1) at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1260) at kafka.server.DynamicBrokerReconfigurationTest.maybeVerifyThreadPoolSize$1(DynamicBrokerReconfigurationTest.scala:531) at kafka.server.DynamicBrokerReconfigurationTest.resizeThreadPool$1(DynamicBrokerReconfigurationTest.scala:550) at kafka.server.DynamicBrokerReconfigurationTest.reducePoolSize$1(DynamicBrokerReconfigurationTest.scala:536) at kafka.server.DynamicBrokerReconfigurationTest.$anonfun$testThreadPoolResize$3(DynamicBrokerReconfigurationTest.scala:559) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:558) at kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:572){quote} > Flaky Test DynamicBrokerReconfigurationTest#testThreadPoolResize > > > Key: KAFKA-7988 > URL: https://issues.apache.org/jira/browse/KAFKA-7988 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0 >Reporter: Matthias J. Sax >Priority: Critical > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/30/] > {quote}kafka.server.DynamicBrokerReconfigurationTest > testThreadPoolResize > FAILED java.lang.AssertionError: Invalid threads: expected 6, got 5: > List(ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-1, > ReplicaFetcherThread-0-0, ReplicaFetcherThread-0-2, ReplicaFetcherThread-0-1) > at org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.server.DynamicBrokerReconfigurationTest.verifyThreads(DynamicBrokerReconfigurationTest.scala:1260) > at > kafka.server.DynamicBrokerReconfigurationTest.maybeVerifyThreadPoolSize$1(DynamicBrokerReconfigurationTest.scala:531) > at > kafka.server.DynamicBrokerReconfigurationTest.resizeThreadPool$1(DynamicBrokerReconfigurationTest.scala:550) > at > kafka.server.DynamicBrokerReconfigurationTest.reducePoolSize$1(DynamicBrokerReconfigurationTest.scala:536) > at > kafka.server.DynamicBrokerReconfigurationTest.$anonfun$testThreadPoolResize$3(DynamicBrokerReconfigurationTest.scala:559) > at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) at > kafka.server.DynamicBrokerReconfigurationTest.verifyThreadPoolResize$1(DynamicBrokerReconfigurationTest.scala:558) > at > kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize(DynamicBrokerReconfigurationTest.scala:572){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6824) Flaky Test DynamicBrokerReconfigurationTest#testAddRemoveSslListener
[ https://issues.apache.org/jira/browse/KAFKA-6824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6824: --- Summary: Flaky Test DynamicBrokerReconfigurationTest#testAddRemoveSslListener (was: Flaky Test DynamicBrokerReconfigurationTest.testAddRemoveSslListener) > Flaky Test DynamicBrokerReconfigurationTest#testAddRemoveSslListener > > > Key: KAFKA-6824 > URL: https://issues.apache.org/jira/browse/KAFKA-6824 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0 >Reporter: Anna Povzner >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 2.3.0, 2.2.1 > > > Observed two failures of this test (both in PR builds) :( > > *Failure #1: (JDK 7 and Scala 2.11 )* > *17:20:49* kafka.server.DynamicBrokerReconfigurationTest > > testAddRemoveSslListener FAILED > *17:20:49* java.lang.AssertionError: expected:<10> but was:<12> > *17:20:49* at org.junit.Assert.fail(Assert.java:88) > *17:20:49* at org.junit.Assert.failNotEquals(Assert.java:834) > *17:20:49* at org.junit.Assert.assertEquals(Assert.java:645) > *17:20:49* at org.junit.Assert.assertEquals(Assert.java:631) > *17:20:49* at > kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:959) > *17:20:49* at > kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:784) > *17:20:49* at > kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705) > > *Failure #2: (JDK 8)* > *18:46:23* kafka.server.DynamicBrokerReconfigurationTest > > testAddRemoveSslListener FAILED > *18:46:23* java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is > not the leader for that topic-partition. > *18:46:23* at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) > *18:46:23* at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:77) > *18:46:23* at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) > *18:46:23* at > kafka.server.DynamicBrokerReconfigurationTest.$anonfun$verifyProduceConsume$3(DynamicBrokerReconfigurationTest.scala:953) > *18:46:23* at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234) > *18:46:23* at scala.collection.Iterator.foreach(Iterator.scala:929) > *18:46:23* at scala.collection.Iterator.foreach$(Iterator.scala:929) > *18:46:23* at > scala.collection.AbstractIterator.foreach(Iterator.scala:1417) > *18:46:23* at > scala.collection.IterableLike.foreach(IterableLike.scala:71) > *18:46:23* at > scala.collection.IterableLike.foreach$(IterableLike.scala:70) > *18:46:23* at > scala.collection.AbstractIterable.foreach(Iterable.scala:54) > *18:46:23* at > scala.collection.TraversableLike.map(TraversableLike.scala:234) > *18:46:23* at > scala.collection.TraversableLike.map$(TraversableLike.scala:227) > *18:46:23* at > scala.collection.AbstractTraversable.map(Traversable.scala:104) > *18:46:23* at > kafka.server.DynamicBrokerReconfigurationTest.verifyProduceConsume(DynamicBrokerReconfigurationTest.scala:953) > *18:46:23* at > kafka.server.DynamicBrokerReconfigurationTest.verifyRemoveListener(DynamicBrokerReconfigurationTest.scala:816) > *18:46:23* at > kafka.server.DynamicBrokerReconfigurationTest.testAddRemoveSslListener(DynamicBrokerReconfigurationTest.scala:705) > *18:46:23* > *18:46:23* Caused by: > *18:46:23* > org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is > not the leader for that topic-partition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7967) Kafka Streams: some values in statestore rollback to old value
[ https://issues.apache.org/jira/browse/KAFKA-7967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-7967: - Component/s: streams > Kafka Streams: some values in statestore rollback to old value > -- > > Key: KAFKA-7967 > URL: https://issues.apache.org/jira/browse/KAFKA-7967 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Ziming Dong >Priority: Critical > > We are using kafka streams 2.1.0, we use both persistentKeyValueStore > statestore and persistentWindowStore statestore. We found sometimes both > types of statestore could `fetch` old values instead of newly updated values. > We didn't find any logs except INFO level logs, no instance restart in the > period, also there is no rebalance log which indicates it's not a rebalance > bug. The bug happened no more than one time each week, but many records were > affected each time, and we didn't find a way to reproduce it manually. > For example, the issue may happen like this, note the changelog contains all > the `update`: > # got value 1 from key 1 > # update value 2 to key 1 > # got value 2 from key 1 > # update value 3 to key 1 > # got value 1 from key 1(something wrong!!) > # update value 2 to key 1 > there is only one type log as follow > > {code:java} > 2019-02-19x14:20:00x xx INFO > [org.apache.kafka.clients.FetchSessionHandler] > [xxx-streams-xx-xxx--xxx-xx-StreamThread-1] [Consumer > clientId=x--xxx-xxx--x-StreamThread-1-consumer, > groupId=x] Node 2 was unable to process the fetch request with > (sessionId=1998942517, epoch=4357): INVALID_FETCH_SESSION_EPOCH. > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window
[ https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777234#comment-16777234 ] ASF GitHub Bot commented on KAFKA-7895: --- vvcephei commented on pull request #6325: KAFKA-7895: fix stream-time reckoning for Suppress (2.2) (#6286) URL: https://github.com/apache/kafka/pull/6325 Even within a Task, different Processors have different perceptions of time, due to record caching on stores and in suppression itself, and in general, due to any processor logic that may hold onto records arbitrarily and emit them later. Thanks to this, we can't rely on the whole task existing in the same "instant" of stream-time. The solution is for each processor node that cares about stream-time to track it independently. On the side, backporting some internally-facing code maintainability updates ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Ktable supress operator emitting more than one record for the same key per > window > - > > Key: KAFKA-7895 > URL: https://issues.apache.org/jira/browse/KAFKA-7895 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0, 2.1.1 >Reporter: prasanthi >Assignee: John Roesler >Priority: Major > Fix For: 2.2.0 > > > Hi, We are using kstreams to get the aggregated counts per vendor(key) within > a specified window. > Here's how we configured the suppress operator to emit one final record per > key/window. > {code:java} > KTable, Long> windowedCount = groupedStream > .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L))) > .count(Materialized.with(Serdes.Integer(),Serdes.Long())) > .suppress(Suppressed.untilWindowCloses(unbounded())); > {code} > But we are getting more than one record for the same key/window as shown > below. > {code:java} > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039 > [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162 > [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584 > [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107 > [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315 > [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746 > [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code} > Could you please take a look? > Thanks > > > Added by John: > Acceptance Criteria: > * add suppress to system tests, such that it's exercised with crash/shutdown > recovery, rebalance, etc. > ** [https://github.com/apache/kafka/pull/6278] > * make sure that there's some system test coverage with caching disabled. > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943 > * test with tighter time bounds with windows of say 30 seconds and use > system time without adding any extra time for verification > ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777149#comment-16777149 ] GEORGE LI edited comment on KAFKA-6794 at 2/25/19 7:56 PM: --- Hi, [~viktorsomogyi], When I rebalance the whole cluster, I generate the reassignment plan json with a list of topic/partitions with its new_replicas/original_replicas, and sort them by their size, so try to group them in batches of similar sizes for execution, so that they are expected to complete reassignment using about the same amount of time. Say there are 1000 reassignments, and 50 per batch. That will be at least 20 batches/buckets to put in for execution. there could be > 20 batches because a reassignment like (1,2,3,4) => (5,6,7,8) can be split into 4 reassignments in 4 batches. A batch will be submitted, and an execution program will keep checking the existence of /admin/reassign_partitions before submitting the next batch. Comparing the new_replicas Vs. original_replicas set, the algorithm can detect if there is more than 1 new replica in the new_replicas, if yes, then break it and put in different batch/bucket.There are other considerations of the reassignments in the same batch: e.g. for different topic/partition, try to spread the load and not to overwhelm a Leader. e.g. the Leadership bytes within the same batch for reassignments should be balanced across all brokers/leaders in the cluster as much as possible. Same for new follower (spread across the cluster not to overwhelm a particular follower). I think this (optimal executions of reassignment plans in batches) can be achieved outside of Kafka. was (Author: sql_consulting): Hi, [~viktorsomogyi], When I rebalance the whole cluster, I generate the reassignment plan json with a list of topic/partitions with its new_replicas/original_replicas, and sort them by their size, so try to group them in batches of similar sizes for execution, so that they are expected to complete reassignment using about the same amount of time. Say there are 1000 reassignments, and 50 per batch. That will be at least 20 batches/buckets to put in for execution. Comparing the new_replicas Vs. original_replicas set, the algorithm can detect if there is more than 1 new replica in the new_replicas, if yes, then break it and put in different batch/bucket.There are other considerations of the reassignments in the same batch: e.g. for different topic/partition, try to spread the load and not to overwhelm a Leader. e.g. the Leadership bytes within the same batch for reassignments should be balanced across all brokers/leaders in the cluster as much as possible. I think this (optimal executions of reassignment plans in batches) can only be achieved outside of Kafka. > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6794) Support for incremental replica reassignment
[ https://issues.apache.org/jira/browse/KAFKA-6794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777257#comment-16777257 ] GEORGE LI commented on KAFKA-6794: -- Hi [~viktorsomogyi], I think this "Incremental Reassignment" is different from KIP-236 "Planned Future Change" section. That one is basically trying to overcome the current limitation that only one batch of reassignments can be run in /admin/reassign_partitions. e.g. 50 reassignments in a batch submitted, 49 completed. and there is one long running reassignment pending in /admin/reassign_partitions, Currently, not able to submit new batch until all in /admin/reassign_partitions are completed and the node is removed from ZK. If the cluster is pretty much idle, this pretty much waste the resource for not able to submit new reassignments. The proposal is to enable submit new batch to a queue (ZK node), and merge the new assignments to /admin/reassign_partitions. This will try to use the Cancel Reassignments if there is conflict (same topic/partition) in both the new queue and the current /admin/reassign_partitions . > Support for incremental replica reassignment > > > Key: KAFKA-6794 > URL: https://issues.apache.org/jira/browse/KAFKA-6794 > Project: Kafka > Issue Type: Improvement >Reporter: Jason Gustafson >Assignee: Viktor Somogyi-Vass >Priority: Major > > Say you have a replication factor of 4 and you trigger a reassignment which > moves all replicas to new brokers. Now 8 replicas are fetching at the same > time which means you need to account for 8 times the current producer load > plus the catch-up replication. To make matters worse, the replicas won't all > become in-sync at the same time; in the worst case, you could have 7 replicas > in-sync while one is still catching up. Currently, the old replicas won't be > disabled until all new replicas are in-sync. This makes configuring the > throttle tricky since ISR traffic is not subject to it. > Rather than trying to bring all 4 new replicas online at the same time, a > friendlier approach would be to do it incrementally: bring one replica > online, bring it in-sync, then remove one of the old replicas. Repeat until > all replicas have been changed. This would reduce the impact of a > reassignment and make configuring the throttle easier at the cost of a slower > overall reassignment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7976) Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable
[ https://issues.apache.org/jira/browse/KAFKA-7976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski reassigned KAFKA-7976: -- Assignee: Stanislav Kozlovski > Flaky Test DynamicBrokerReconfigurationTest#testUncleanLeaderElectionEnable > --- > > Key: KAFKA-7976 > URL: https://issues.apache.org/jira/browse/KAFKA-7976 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0 >Reporter: Matthias J. Sax >Assignee: Stanislav Kozlovski >Priority: Critical > Fix For: 2.3.0, 2.2.1 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://builds.apache.org/blue/organizations/jenkins/kafka-2.2-jdk8/detail/kafka-2.2-jdk8/28/] > {quote}java.lang.AssertionError: Unclean leader not elected > at org.junit.Assert.fail(Assert.java:88) > at org.junit.Assert.assertTrue(Assert.java:41) > at > kafka.server.DynamicBrokerReconfigurationTest.testUncleanLeaderElectionEnable(DynamicBrokerReconfigurationTest.scala:488){quote} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer
[ https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777298#comment-16777298 ] Patrik Kleindl commented on KAFKA-7996: --- >From a "user" perspective, if something is called a timeout then the >expectation is that whatever is called returns a response no later than the >given timeout. As the current behaviour does not conform to this it should be >corrected, not "improved". I tried to look into the code and just want to verify my understanding (and maybe provide a simpler solution): KafkaStreams.close(final long timeoutMs) starts a thread which tries to: # shutdown all StreamThreads, each one does sequentially ## Shutdown TaskManager ### Close Active Tasks Close StreamTask -> Close RecordCollector -> Close Producer with Long.MAX_VALUE ### Close Standby Tasks ### Close TaskCreator Close ThreadProducer -> Default is Long.MAX_VALUE ## Close Consumer with DEFAULT_CLOSE_TIMEOUT_MS = 30 Seconds ## Close Restore Consumer with DEFAULT_CLOSE_TIMEOUT_MS = 30 Seconds ## Does *not* close the producer of the StreamThread (this is done via the ThreadProducer in 1.1.3.1 it seems) # shutdown the GlobalStreamThread -> Tries to close the GlobalConsumer with DEFAULT_CLOSE_TIMEOUT_MS = 30 Seconds # Close the AdminClient -> Default is Long.MAX_VALUE When I look at this a special configuration parameter for the producer timeout does really make sense because it doesn't really seem to provide any value because of all the other default values and the total time to close can be much longer anyway because of the sequential handling. So basically any value >0 (eg. DEFAULT_CLOSE_TIMEOUT_MS) the for producer and admin client should prevent the blocking behaviour, and return after timeoutMs has passed, correct? > KafkaStreams does not pass timeout when closing Producer > > > Key: KAFKA-7996 > URL: https://issues.apache.org/jira/browse/KAFKA-7996 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Patrik Kleindl >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > > [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/] > We are running 2.1 and have a case where the shutdown of a streams > application takes several minutes > I noticed that although we call streams.close with a timeout of 30 seconds > the log says > [Producer > clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. > Matthias J Sax [vor 3 Tagen] > I just checked the code, and yes, we don't provide a timeout for the producer > on close()... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777426#comment-16777426 ] ASF GitHub Bot commented on KAFKA-7918: --- ableegoldman commented on pull request #6327: KAFKA-7918: Inline generic parameters Pt. 1: RocksDB Bytes Store and Memory LRU Caches URL: https://github.com/apache/kafka/pull/6327 Second PR in series to inline the generic parameters of the following bytes stores: [ Pt. I] InMemoryKeyValueStore [x] RocksDBWindowStore [x] RocksDBSessionStore [x] MemoryLRUCache [x] MemoryNavigableLRUCache [ ] InMemoryWindowStore ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6582) Partitions get underreplicated, with a single ISR, and doesn't recover. Other brokers do not take over and we need to manually restart the broker.
[ https://issues.apache.org/jira/browse/KAFKA-6582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777441#comment-16777441 ] Sebastian Schmitz commented on KAFKA-6582: -- So far after 24 hours no issue yet... But I think it's a bit too early to tell if it's fixed with latest version. However we found that balancing the leaders was not very good with default settings and we had to run the balancing manually to really change something. Still investigating on that one... > Partitions get underreplicated, with a single ISR, and doesn't recover. Other > brokers do not take over and we need to manually restart the broker. > -- > > Key: KAFKA-6582 > URL: https://issues.apache.org/jira/browse/KAFKA-6582 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.0.0 > Environment: Ubuntu 16.04 > Linux kafka04 4.4.0-109-generic #132-Ubuntu SMP Tue Jan 9 19:52:39 UTC 2018 > x86_64 x86_64 x86_64 GNU/Linux > java version "9.0.1" > Java(TM) SE Runtime Environment (build 9.0.1+11) > Java HotSpot(TM) 64-Bit Server VM (build 9.0.1+11, mixed mode) > but also tried with the latest JVM 8 before with the same result. >Reporter: Jurriaan Pruis >Priority: Major > Attachments: Screenshot 2019-01-18 at 13.08.17.png, Screenshot > 2019-01-18 at 13.16.59.png > > > Partitions get underreplicated, with a single ISR, and doesn't recover. Other > brokers do not take over and we need to manually restart the 'single ISR' > broker (if you describe the partitions of replicated topic it is clear that > some partitions are only in sync on this broker). > This bug resembles KAFKA-4477 a lot, but since that issue is marked as > resolved this is probably something else but similar. > We have the same issue (or at least it looks pretty similar) on Kafka 1.0. > Since upgrading to Kafka 1.0 in November 2017 we've had these issues (we've > upgraded from Kafka 0.10.2.1). > This happens almost every 24-48 hours on a random broker. This is why we > currently have a cronjob which restarts every broker every 24 hours. > During this issue the ISR shows the following server log: > {code:java} > [2018-02-20 12:02:08,342] WARN Attempting to send response via channel for > which there is no open connection, connection id > 10.132.0.32:9092-10.14.148.20:56352-96708 (kafka.network.Processor) > [2018-02-20 12:02:08,364] WARN Attempting to send response via channel for > which there is no open connection, connection id > 10.132.0.32:9092-10.14.150.25:54412-96715 (kafka.network.Processor) > [2018-02-20 12:02:08,349] WARN Attempting to send response via channel for > which there is no open connection, connection id > 10.132.0.32:9092-10.14.149.18:35182-96705 (kafka.network.Processor) > [2018-02-20 12:02:08,379] WARN Attempting to send response via channel for > which there is no open connection, connection id > 10.132.0.32:9092-10.14.150.25:54456-96717 (kafka.network.Processor) > [2018-02-20 12:02:08,448] WARN Attempting to send response via channel for > which there is no open connection, connection id > 10.132.0.32:9092-10.14.159.20:36388-96720 (kafka.network.Processor) > [2018-02-20 12:02:08,683] WARN Attempting to send response via channel for > which there is no open connection, connection id > 10.132.0.32:9092-10.14.157.110:41922-96740 (kafka.network.Processor) > {code} > Also on the ISR broker, the controller log shows this: > {code:java} > [2018-02-20 12:02:14,927] INFO [Controller-3-to-broker-3-send-thread]: > Controller 3 connected to 10.132.0.32:9092 (id: 3 rack: null) for sending > state change requests (kafka.controller.RequestSendThread) > [2018-02-20 12:02:14,927] INFO [Controller-3-to-broker-0-send-thread]: > Controller 3 connected to 10.132.0.10:9092 (id: 0 rack: null) for sending > state change requests (kafka.controller.RequestSendThread) > [2018-02-20 12:02:14,928] INFO [Controller-3-to-broker-1-send-thread]: > Controller 3 connected to 10.132.0.12:9092 (id: 1 rack: null) for sending > state change requests (kafka.controller.RequestSendThread){code} > And the non-ISR brokers show these kind of errors: > > {code:java} > 2018-02-20 12:02:29,204] WARN [ReplicaFetcher replicaId=1, leaderId=3, > fetcherId=0] Error in fetch to broker 3, request (type=FetchRequest, > replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, > fetchData={..}, isolationLevel=READ_UNCOMMITTED) > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 3 was disconnected before the response was > read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95) > at > kafka.server.ReplicaFetcherBlockingS
[jira] [Created] (KAFKA-8001) Fetch from future replica stalls when local replica becomes a leader
Anna Povzner created KAFKA-8001: --- Summary: Fetch from future replica stalls when local replica becomes a leader Key: KAFKA-8001 URL: https://issues.apache.org/jira/browse/KAFKA-8001 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.1.1 Reporter: Anna Povzner With KIP-320, fetch from follower / future replica returns FENCED_LEADER_EPOCH if current leader epoch in the request is lower than the leader epoch known to the leader (or local replica in case of future replica fetching). In case of future replica fetching from the local replica, if local replica becomes the leader of the partition, the next fetch from future replica fails with FENCED_LEADER_EPOCH and fetching from future replica is stopped until the next leader change. Proposed solution: on local replica leader change, future replica should "become a follower" again, and go through the truncation phase. Or we could optimize it, and just update partition state of the future replica to reflect the updated current leader epoch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8002) Replica reassignment to new log dir may not complete if future and current replicas segment files have different base offsets
Anna Povzner created KAFKA-8002: --- Summary: Replica reassignment to new log dir may not complete if future and current replicas segment files have different base offsets Key: KAFKA-8002 URL: https://issues.apache.org/jira/browse/KAFKA-8002 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.1.1 Reporter: Anna Povzner Once future replica fetches log end offset, the intended logic is to finish the move (and rename the future dir to current replica dir, etc). However, the check in Partition.maybeReplaceCurrentWithFutureReplica compares the whole LogOffsetMetadata vs. log end offset. The resulting behavior is that the re-assignment will not finish for topic partitions that were cleaned/ compacted such that base offset of the last segment is different for the current and future replica. The proposed fix is to compare only log end offsets of the current and future replica. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7845) Kafka clients do not re-resolve ips when a broker is replaced.
[ https://issues.apache.org/jira/browse/KAFKA-7845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777502#comment-16777502 ] Jennifer Thompson commented on KAFKA-7845: -- We just deployed Kafka 2.1.1 (Confluent 5.1.2), and everything is working fine. Thanks a lot! > Kafka clients do not re-resolve ips when a broker is replaced. > -- > > Key: KAFKA-7845 > URL: https://issues.apache.org/jira/browse/KAFKA-7845 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Jennifer Thompson >Priority: Major > > When one of our Kafka brokers dies and a new one replaces it (via an aws > ASG), the clients that publish to Kafka still try to publish to the old > brokers. > We see errors like > {code:java} > 2019-01-18 20:16:16 WARN NetworkClient:721 - [Producer clientId=producer-1] > Connection to node 2 (/10.130.98.111:9092) could not be established. Broker > may not be available. > 2019-01-18 20:19:09 WARN Sender:596 - [Producer clientId=producer-1] Got > error produce response with correlation id 3414 on topic-partition aa.pga-2, > retrying (4 attempts left). Error: NOT_LEADER_FOR_PARTITION > 2019-01-18 20:19:09 WARN Sender:641 - [Producer clientId=producer-1] Received > invalid metadata error in produce request on partition aa.pga-2 due to > org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is > not the leader for that topic-partition.. Going to request metadata update now > 2019-01-18 20:21:19 WARN NetworkClient:721 - [Producer clientId=producer-1] > Connection to node 2 (/10.130.98.111:9092) could not be established. Broker > may not be available. > 2019-01-18 20:21:50 ERROR ProducerBatch:233 - Error executing user-provided > callback on message for topic-partition 'aa.test-liz-0'{code} > and > {code:java} > [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} > Failed to flush, timed out while waiting for producer to flush outstanding 27 > messages (org.apache.kafka.connect.runtime.WorkerSourceTask) > [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} > Failed to commit offsets > (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter) > {code} > The ip address referenced is for the broker that died. We have Kafka Manager > running as well, and that picks up the new broker. > We already set > {code:java} > networkaddress.cache.ttl=60{code} > in > {code:java} > jre/lib/security/java.security{code} > Our java version is "Java(TM) SE Runtime Environment (build 1.8.0_192-b12)" > This started happening after we upgraded to 2.1. When had Kafka 1.1, brokers > could failover without a problem. > One thing that might be considered unusual about our deployment is that we > reuse the same broker id and EBS volume for the new broker, so that > partitions do not have to be reassigned. > In kafka-connect, the logs look like > {code} > [2019-01-28 22:11:02,364] WARN [Consumer clientId=consumer-1, > groupId=connect-cluster] Connection to node 3 (/10.130.153.120:9092) could > not be established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2019-01-28 22:11:02,365] INFO [Consumer clientId=consumer-1, > groupId=connect-cluster] Error sending fetch request (sessionId=201133590, > epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException. > (org.apache.kafka.clients.FetchSessionHandler) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7845) Kafka clients do not re-resolve ips when a broker is replaced.
[ https://issues.apache.org/jira/browse/KAFKA-7845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jennifer Thompson resolved KAFKA-7845. -- Resolution: Fixed Fix Version/s: 2.1.1 > Kafka clients do not re-resolve ips when a broker is replaced. > -- > > Key: KAFKA-7845 > URL: https://issues.apache.org/jira/browse/KAFKA-7845 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Jennifer Thompson >Priority: Major > Fix For: 2.1.1 > > > When one of our Kafka brokers dies and a new one replaces it (via an aws > ASG), the clients that publish to Kafka still try to publish to the old > brokers. > We see errors like > {code:java} > 2019-01-18 20:16:16 WARN NetworkClient:721 - [Producer clientId=producer-1] > Connection to node 2 (/10.130.98.111:9092) could not be established. Broker > may not be available. > 2019-01-18 20:19:09 WARN Sender:596 - [Producer clientId=producer-1] Got > error produce response with correlation id 3414 on topic-partition aa.pga-2, > retrying (4 attempts left). Error: NOT_LEADER_FOR_PARTITION > 2019-01-18 20:19:09 WARN Sender:641 - [Producer clientId=producer-1] Received > invalid metadata error in produce request on partition aa.pga-2 due to > org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is > not the leader for that topic-partition.. Going to request metadata update now > 2019-01-18 20:21:19 WARN NetworkClient:721 - [Producer clientId=producer-1] > Connection to node 2 (/10.130.98.111:9092) could not be established. Broker > may not be available. > 2019-01-18 20:21:50 ERROR ProducerBatch:233 - Error executing user-provided > callback on message for topic-partition 'aa.test-liz-0'{code} > and > {code:java} > [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} > Failed to flush, timed out while waiting for producer to flush outstanding 27 > messages (org.apache.kafka.connect.runtime.WorkerSourceTask) > [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} > Failed to commit offsets > (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter) > {code} > The ip address referenced is for the broker that died. We have Kafka Manager > running as well, and that picks up the new broker. > We already set > {code:java} > networkaddress.cache.ttl=60{code} > in > {code:java} > jre/lib/security/java.security{code} > Our java version is "Java(TM) SE Runtime Environment (build 1.8.0_192-b12)" > This started happening after we upgraded to 2.1. When had Kafka 1.1, brokers > could failover without a problem. > One thing that might be considered unusual about our deployment is that we > reuse the same broker id and EBS volume for the new broker, so that > partitions do not have to be reassigned. > In kafka-connect, the logs look like > {code} > [2019-01-28 22:11:02,364] WARN [Consumer clientId=consumer-1, > groupId=connect-cluster] Connection to node 3 (/10.130.153.120:9092) could > not be established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2019-01-28 22:11:02,365] INFO [Consumer clientId=consumer-1, > groupId=connect-cluster] Error sending fetch request (sessionId=201133590, > epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException. > (org.apache.kafka.clients.FetchSessionHandler) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7970) Missing topic causes service shutdown without exception
[ https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777504#comment-16777504 ] Matthias J. Sax commented on KAFKA-7970: [~JonnyHeavey], [~or] Thanks for the input. I double checked with a colleague. It's intended that there is not exception, because KafkaStreams shuts down in a clean way. However, the state should transit to ERROR (or at least NOT_RUNNING, but I think ERROR is more appropriate). Not sure atm, if setting the state listener to `null` is the root cause though – will need to dig into this in more details. > Missing topic causes service shutdown without exception > --- > > Key: KAFKA-7970 > URL: https://issues.apache.org/jira/browse/KAFKA-7970 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Jonny Heavey >Priority: Minor > > When launching a KafkaStreams application that depends on a topic that > doesn't exist, the streams application correctly logs an error such as: > " is unknown yet during rebalance, please make sure they have > been pre-created before starting the Streams application." > The stream is then shutdown, however, no exception is thrown indicating that > an error has occurred. > In our circumstances, we run our streams app inside a container. The streams > service is shutdown, but the process is not exited, meaning that the > container does not crash (reducing visibility of the issue). > As no exception is thrown in the missing topic scenario described above, our > application code has no way to determine that something is wrong that would > then allow it to terminate the process. > > Could the onPartitionsAssigned method in StreamThread.java throw an exception > when it decides to shutdown the stream (somewhere around line 264)? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8001) Fetch from future replica stalls when local replica becomes a leader
[ https://issues.apache.org/jira/browse/KAFKA-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-8001: --- Priority: Critical (was: Major) > Fetch from future replica stalls when local replica becomes a leader > > > Key: KAFKA-8001 > URL: https://issues.apache.org/jira/browse/KAFKA-8001 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: Anna Povzner >Priority: Critical > > With KIP-320, fetch from follower / future replica returns > FENCED_LEADER_EPOCH if current leader epoch in the request is lower than the > leader epoch known to the leader (or local replica in case of future replica > fetching). In case of future replica fetching from the local replica, if > local replica becomes the leader of the partition, the next fetch from future > replica fails with FENCED_LEADER_EPOCH and fetching from future replica is > stopped until the next leader change. > Proposed solution: on local replica leader change, future replica should > "become a follower" again, and go through the truncation phase. Or we could > optimize it, and just update partition state of the future replica to reflect > the updated current leader epoch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8002) Replica reassignment to new log dir may not complete if future and current replicas segment files have different base offsets
[ https://issues.apache.org/jira/browse/KAFKA-8002?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-8002: --- Priority: Critical (was: Major) > Replica reassignment to new log dir may not complete if future and current > replicas segment files have different base offsets > - > > Key: KAFKA-8002 > URL: https://issues.apache.org/jira/browse/KAFKA-8002 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.1 >Reporter: Anna Povzner >Priority: Critical > > Once future replica fetches log end offset, the intended logic is to finish > the move (and rename the future dir to current replica dir, etc). However, > the check in Partition.maybeReplaceCurrentWithFutureReplica compares the > whole LogOffsetMetadata vs. log end offset. The resulting behavior is that > the re-assignment will not finish for topic partitions that were cleaned/ > compacted such that base offset of the last segment is different for the > current and future replica. > The proposed fix is to compare only log end offsets of the current and future > replica. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7967) Kafka Streams: some values in statestore rollback to old value
[ https://issues.apache.org/jira/browse/KAFKA-7967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777517#comment-16777517 ] Guozhang Wang commented on KAFKA-7967: -- It is likely to because of the same issue as in https://issues.apache.org/jira/browse/KAFKA-7652, i.e. the caching store flushing is doing an incorrect ordering between: 1) flush to store and 2) write to downstream. [~suiyuan2009] I'd suggest you turn off caching by setting `StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG` to zero, and see if this issue still persists. If it disappears, it may be due to the bug fixed in this PR: https://github.com/apache/kafka/pull/6191 > Kafka Streams: some values in statestore rollback to old value > -- > > Key: KAFKA-7967 > URL: https://issues.apache.org/jira/browse/KAFKA-7967 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Ziming Dong >Priority: Critical > > We are using kafka streams 2.1.0, we use both persistentKeyValueStore > statestore and persistentWindowStore statestore. We found sometimes both > types of statestore could `fetch` old values instead of newly updated values. > We didn't find any logs except INFO level logs, no instance restart in the > period, also there is no rebalance log which indicates it's not a rebalance > bug. The bug happened no more than one time each week, but many records were > affected each time, and we didn't find a way to reproduce it manually. > For example, the issue may happen like this, note the changelog contains all > the `update`: > # got value 1 from key 1 > # update value 2 to key 1 > # got value 2 from key 1 > # update value 3 to key 1 > # got value 1 from key 1(something wrong!!) > # update value 2 to key 1 > there is only one type log as follow > > {code:java} > 2019-02-19x14:20:00x xx INFO > [org.apache.kafka.clients.FetchSessionHandler] > [xxx-streams-xx-xxx--xxx-xx-StreamThread-1] [Consumer > clientId=x--xxx-xxx--x-StreamThread-1-consumer, > groupId=x] Node 2 was unable to process the fetch request with > (sessionId=1998942517, epoch=4357): INVALID_FETCH_SESSION_EPOCH. > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer
[ https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777521#comment-16777521 ] Matthias J. Sax commented on KAFKA-7996: > As the current behavior does not conform to this it should be corrected, not >"improved". Sure. But if it would require a public API change (as proposed on the PR), we still need a KIP, even for a bug-fix (from my understanding). Thanks a lot for the detailed analysis. I did not think about the details yet. I tend to agree with {quote}From a "user" perspective, if something is called a timeout then the expectation is that whatever is called returns a response no later than the given timeout. {quote} If we put into account that {quote} the total time to close can be much longer anyway because of the sequential handling {quote} We might want to consider the following: * all default timeouts should be set to "MAX_VALUE" * if we get a timeout in `KafkaStreams#close()` we split it up for all parts. Ie, we get a "start timestamp" and subtract the used time so far to compute the "remaining timeout" for the next call. This way, we can make sure that the overall timeout is met. However, I am frankly not sure what the impact to such an implementation might be. What I still don't understand (or maybe I miss read the comment), why does "KafkaStreams#close()" not return after the specified timeout? As you mentioned correctly, we use a background thread for the actual shutdown calls and thus `close()` should respect the provided timeout. > KafkaStreams does not pass timeout when closing Producer > > > Key: KAFKA-7996 > URL: https://issues.apache.org/jira/browse/KAFKA-7996 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Patrik Kleindl >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > > [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/] > We are running 2.1 and have a case where the shutdown of a streams > application takes several minutes > I noticed that although we call streams.close with a timeout of 30 seconds > the log says > [Producer > clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. > Matthias J Sax [vor 3 Tagen] > I just checked the code, and yes, we don't provide a timeout for the producer > on close()... -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7970) Missing topic causes service shutdown without exception
[ https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777524#comment-16777524 ] Guozhang Wang commented on KAFKA-7970: -- As for now, I'd suggest maybe you can grep on the client logs, and trigger the alert / kill the process when you've seen this error logs: admittedly it is not ideal, just to workaround it. > Missing topic causes service shutdown without exception > --- > > Key: KAFKA-7970 > URL: https://issues.apache.org/jira/browse/KAFKA-7970 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Jonny Heavey >Priority: Minor > > When launching a KafkaStreams application that depends on a topic that > doesn't exist, the streams application correctly logs an error such as: > " is unknown yet during rebalance, please make sure they have > been pre-created before starting the Streams application." > The stream is then shutdown, however, no exception is thrown indicating that > an error has occurred. > In our circumstances, we run our streams app inside a container. The streams > service is shutdown, but the process is not exited, meaning that the > container does not crash (reducing visibility of the issue). > As no exception is thrown in the missing topic scenario described above, our > application code has no way to determine that something is wrong that would > then allow it to terminate the process. > > Could the onPartitionsAssigned method in StreamThread.java throw an exception > when it decides to shutdown the stream (somewhere around line 264)? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7971) Producer in Streams environment
[ https://issues.apache.org/jira/browse/KAFKA-7971?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777528#comment-16777528 ] Guozhang Wang commented on KAFKA-7971: -- I see: you have some logic which is not really data-driven but system-time driven. If you are using Processor API to construct your Topology already you can consider using punctuation functions within a transformer, which is the parent of a sink node sending to the topic, in the punctuator then, you can access the store, and sends the result to the downstream sink node via `ProcessorContext#forward()`, and in which case the sink node will be responsible for sending the data to the topic. Does that sounds right to you? > Producer in Streams environment > --- > > Key: KAFKA-7971 > URL: https://issues.apache.org/jira/browse/KAFKA-7971 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Maciej Lizewski >Priority: Minor > Labels: newbie > > Would be nice to have Producers that can emit messages to topic just like any > producer but also have access to local stores from streams environment in > Spring. > consider case: I have event sourced ordering process like this: > [EVENTS QUEUE] -> [MERGING PROCESS] -> [ORDERS CHANGELOG/KTABLE] > Merging process uses local storage "opened orders" to easily apply new > changes. > Now I want to implement process of closing abandoned orders (orders that were > started, but for too long there was no change and they hang in beginning > status). Easiest way is to periodically scan "opened orders" store and > produce "abandon event" for every order that meets criteria. The obnly way > now i to create Transformer with punctuator and connect output to [EVENTS > QUEUE]. That is obvious. but Transformer must be also connected to some input > stream, but these events must be dropped as we want only the punctuator > results. This causes unnecessary overhead in processing input messages > (although they are just dropped) and it is not very elegant. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7970) Missing topic causes service shutdown without exception
[ https://issues.apache.org/jira/browse/KAFKA-7970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777523#comment-16777523 ] Guozhang Wang commented on KAFKA-7970: -- Hi [~JonnyHeavey], I've checked the source code after talked to [~mjsax], and I can confirm that what you've observed is indeed the case: Streams instance did not transit its State correctly if it is the leader, and even with followers who received the error code also just transit to NOT_RUNNING. I'll prepare a PR trying to fix this issue. Thanks for reporting it! > Missing topic causes service shutdown without exception > --- > > Key: KAFKA-7970 > URL: https://issues.apache.org/jira/browse/KAFKA-7970 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Jonny Heavey >Priority: Minor > > When launching a KafkaStreams application that depends on a topic that > doesn't exist, the streams application correctly logs an error such as: > " is unknown yet during rebalance, please make sure they have > been pre-created before starting the Streams application." > The stream is then shutdown, however, no exception is thrown indicating that > an error has occurred. > In our circumstances, we run our streams app inside a container. The streams > service is shutdown, but the process is not exited, meaning that the > container does not crash (reducing visibility of the issue). > As no exception is thrown in the missing topic scenario described above, our > application code has no way to determine that something is wrong that would > then allow it to terminate the process. > > Could the onPartitionsAssigned method in StreamThread.java throw an exception > when it decides to shutdown the stream (somewhere around line 264)? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7845) Kafka clients do not re-resolve ips when a broker is replaced.
[ https://issues.apache.org/jira/browse/KAFKA-7845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777537#comment-16777537 ] Ismael Juma commented on KAFKA-7845: Thanks for confirming! > Kafka clients do not re-resolve ips when a broker is replaced. > -- > > Key: KAFKA-7845 > URL: https://issues.apache.org/jira/browse/KAFKA-7845 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.1.0 >Reporter: Jennifer Thompson >Priority: Major > Fix For: 2.1.1 > > > When one of our Kafka brokers dies and a new one replaces it (via an aws > ASG), the clients that publish to Kafka still try to publish to the old > brokers. > We see errors like > {code:java} > 2019-01-18 20:16:16 WARN NetworkClient:721 - [Producer clientId=producer-1] > Connection to node 2 (/10.130.98.111:9092) could not be established. Broker > may not be available. > 2019-01-18 20:19:09 WARN Sender:596 - [Producer clientId=producer-1] Got > error produce response with correlation id 3414 on topic-partition aa.pga-2, > retrying (4 attempts left). Error: NOT_LEADER_FOR_PARTITION > 2019-01-18 20:19:09 WARN Sender:641 - [Producer clientId=producer-1] Received > invalid metadata error in produce request on partition aa.pga-2 due to > org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is > not the leader for that topic-partition.. Going to request metadata update now > 2019-01-18 20:21:19 WARN NetworkClient:721 - [Producer clientId=producer-1] > Connection to node 2 (/10.130.98.111:9092) could not be established. Broker > may not be available. > 2019-01-18 20:21:50 ERROR ProducerBatch:233 - Error executing user-provided > callback on message for topic-partition 'aa.test-liz-0'{code} > and > {code:java} > [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} > Failed to flush, timed out while waiting for producer to flush outstanding 27 > messages (org.apache.kafka.connect.runtime.WorkerSourceTask) > [2019-01-18 20:28:47,732] ERROR WorkerSourceTask{id=rabbit-vpc-2-kafka-1} > Failed to commit offsets > (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter) > {code} > The ip address referenced is for the broker that died. We have Kafka Manager > running as well, and that picks up the new broker. > We already set > {code:java} > networkaddress.cache.ttl=60{code} > in > {code:java} > jre/lib/security/java.security{code} > Our java version is "Java(TM) SE Runtime Environment (build 1.8.0_192-b12)" > This started happening after we upgraded to 2.1. When had Kafka 1.1, brokers > could failover without a problem. > One thing that might be considered unusual about our deployment is that we > reuse the same broker id and EBS volume for the new broker, so that > partitions do not have to be reassigned. > In kafka-connect, the logs look like > {code} > [2019-01-28 22:11:02,364] WARN [Consumer clientId=consumer-1, > groupId=connect-cluster] Connection to node 3 (/10.130.153.120:9092) could > not be established. Broker may not be available. > (org.apache.kafka.clients.NetworkClient) > [2019-01-28 22:11:02,365] INFO [Consumer clientId=consumer-1, > groupId=connect-cluster] Error sending fetch request (sessionId=201133590, > epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException. > (org.apache.kafka.clients.FetchSessionHandler) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777550#comment-16777550 ] ASF GitHub Bot commented on KAFKA-7918: --- ableegoldman commented on pull request #6328: KAFKA-7918: Inline generic parameters Pt. III: in-memory window store URL: https://github.com/apache/kafka/pull/6328 Third (and final) PR in series to inline the generic parameters of the following bytes stores: [Pt. I] InMemoryKeyValueStore [Pt. II] RocksDBWindowStore [Pt. II] RocksDBSessionStore [Pt. II] MemoryLRUCache [Pt. II] MemoryNavigableLRUCache [x] InMemoryWindowStore ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7918) Streams store cleanup: inline byte-store generic parameters
[ https://issues.apache.org/jira/browse/KAFKA-7918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777554#comment-16777554 ] ASF GitHub Bot commented on KAFKA-7918: --- guozhangwang commented on pull request #6293: KAFKA-7918: Inline generic parameters Pt. I: in-memory key-value store URL: https://github.com/apache/kafka/pull/6293 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Streams store cleanup: inline byte-store generic parameters > --- > > Key: KAFKA-7918 > URL: https://issues.apache.org/jira/browse/KAFKA-7918 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Sophie Blee-Goldman >Priority: Major > > Currently, the fundamental layer of stores in Streams is the "bytes store". > The easiest way to identify this is in > `org.apache.kafka.streams.state.Stores`, all the `StoreBuilder`s require a > `XXBytesStoreSupplier`. > We provide several implementations of these bytes stores, typically an > in-memory one and a persistent one (aka RocksDB). > Inside these bytes stores, the key is always `Bytes` and the value is always > `byte[]` (serialization happens at a higher level). However, the store > implementations are generically typed, just `K` and `V`. > This is good for flexibility, but it makes the code a little harder to > understand. I think that we used to do serialization at a lower level, so the > generics are a hold-over from that. > It would simplify the code if we just inlined the actual k/v types and maybe > even renamed the classes from (e.g.) `InMemoryKeyValueStore` to > `InMemoryKeyValueBytesStore`, and so forth. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7967) Kafka Streams: some values in statestore rollback to old value
[ https://issues.apache.org/jira/browse/KAFKA-7967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777571#comment-16777571 ] Ziming Dong commented on KAFKA-7967: [~guozhang], since the value rollback to an old value which is several days ago, and our data is not small per partition, it should not be an cache problem? Also, [https://github.com/apache/kafka/pull/6191] says that [https://github.com/apache/kafka/pull/4331] fixed KV store, but we met this issue on both KV store and windowed KV store. Let's upgrade to [https://github.com/apache/kafka/releases/tag/2.2.0-rc0] to see what will happen since both KAFKA-7652 and KAFKA-7672 commits are included.. > Kafka Streams: some values in statestore rollback to old value > -- > > Key: KAFKA-7967 > URL: https://issues.apache.org/jira/browse/KAFKA-7967 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Ziming Dong >Priority: Critical > > We are using kafka streams 2.1.0, we use both persistentKeyValueStore > statestore and persistentWindowStore statestore. We found sometimes both > types of statestore could `fetch` old values instead of newly updated values. > We didn't find any logs except INFO level logs, no instance restart in the > period, also there is no rebalance log which indicates it's not a rebalance > bug. The bug happened no more than one time each week, but many records were > affected each time, and we didn't find a way to reproduce it manually. > For example, the issue may happen like this, note the changelog contains all > the `update`: > # got value 1 from key 1 > # update value 2 to key 1 > # got value 2 from key 1 > # update value 3 to key 1 > # got value 1 from key 1(something wrong!!) > # update value 2 to key 1 > there is only one type log as follow > > {code:java} > 2019-02-19x14:20:00x xx INFO > [org.apache.kafka.clients.FetchSessionHandler] > [xxx-streams-xx-xxx--xxx-xx-StreamThread-1] [Consumer > clientId=x--xxx-xxx--x-StreamThread-1-consumer, > groupId=x] Node 2 was unable to process the fetch request with > (sessionId=1998942517, epoch=4357): INVALID_FETCH_SESSION_EPOCH. > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7967) Kafka Streams: some values in statestore rollback to old value
[ https://issues.apache.org/jira/browse/KAFKA-7967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777571#comment-16777571 ] Ziming Dong edited comment on KAFKA-7967 at 2/26/19 4:31 AM: - [~guozhang], since the value rollback to an old value which is several days ago, and our data is not small per partition, it should not be an cache problem? Also, [https://github.com/apache/kafka/pull/6191] says that [https://github.com/apache/kafka/pull/4331] fixed KV store, but we met this issue on both KV store and windowed KV store. Let's upgrade to [https://github.com/apache/kafka/releases/tag/2.2.0-rc0] to see what will happen since both KAFKA-7652 and KAFKA-7672 commits are included..(we have some hack code for KAFKA-7672) was (Author: suiyuan2009): [~guozhang], since the value rollback to an old value which is several days ago, and our data is not small per partition, it should not be an cache problem? Also, [https://github.com/apache/kafka/pull/6191] says that [https://github.com/apache/kafka/pull/4331] fixed KV store, but we met this issue on both KV store and windowed KV store. Let's upgrade to [https://github.com/apache/kafka/releases/tag/2.2.0-rc0] to see what will happen since both KAFKA-7652 and KAFKA-7672 commits are included.. > Kafka Streams: some values in statestore rollback to old value > -- > > Key: KAFKA-7967 > URL: https://issues.apache.org/jira/browse/KAFKA-7967 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Ziming Dong >Priority: Critical > > We are using kafka streams 2.1.0, we use both persistentKeyValueStore > statestore and persistentWindowStore statestore. We found sometimes both > types of statestore could `fetch` old values instead of newly updated values. > We didn't find any logs except INFO level logs, no instance restart in the > period, also there is no rebalance log which indicates it's not a rebalance > bug. The bug happened no more than one time each week, but many records were > affected each time, and we didn't find a way to reproduce it manually. > For example, the issue may happen like this, note the changelog contains all > the `update`: > # got value 1 from key 1 > # update value 2 to key 1 > # got value 2 from key 1 > # update value 3 to key 1 > # got value 1 from key 1(something wrong!!) > # update value 2 to key 1 > there is only one type log as follow > > {code:java} > 2019-02-19x14:20:00x xx INFO > [org.apache.kafka.clients.FetchSessionHandler] > [xxx-streams-xx-xxx--xxx-xx-StreamThread-1] [Consumer > clientId=x--xxx-xxx--x-StreamThread-1-consumer, > groupId=x] Node 2 was unable to process the fetch request with > (sessionId=1998942517, epoch=4357): INVALID_FETCH_SESSION_EPOCH. > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7967) Kafka Streams: some values in statestore rollback to old value
[ https://issues.apache.org/jira/browse/KAFKA-7967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777571#comment-16777571 ] Ziming Dong edited comment on KAFKA-7967 at 2/26/19 4:34 AM: - [~guozhang], since the value rollback to an old value which is several days ago(also some value rollback to the value just before the previous record), and our data is not small per partition, it should not be an cache problem? Also, [https://github.com/apache/kafka/pull/6191] says that [https://github.com/apache/kafka/pull/4331] fixed KV store, but we met this issue on both KV store and windowed KV store. Let's upgrade to [https://github.com/apache/kafka/releases/tag/2.2.0-rc0] to see what will happen since both KAFKA-7652 and KAFKA-7672 commits are included..(we have some hack code for KAFKA-7672) was (Author: suiyuan2009): [~guozhang], since the value rollback to an old value which is several days ago, and our data is not small per partition, it should not be an cache problem? Also, [https://github.com/apache/kafka/pull/6191] says that [https://github.com/apache/kafka/pull/4331] fixed KV store, but we met this issue on both KV store and windowed KV store. Let's upgrade to [https://github.com/apache/kafka/releases/tag/2.2.0-rc0] to see what will happen since both KAFKA-7652 and KAFKA-7672 commits are included..(we have some hack code for KAFKA-7672) > Kafka Streams: some values in statestore rollback to old value > -- > > Key: KAFKA-7967 > URL: https://issues.apache.org/jira/browse/KAFKA-7967 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Ziming Dong >Priority: Critical > > We are using kafka streams 2.1.0, we use both persistentKeyValueStore > statestore and persistentWindowStore statestore. We found sometimes both > types of statestore could `fetch` old values instead of newly updated values. > We didn't find any logs except INFO level logs, no instance restart in the > period, also there is no rebalance log which indicates it's not a rebalance > bug. The bug happened no more than one time each week, but many records were > affected each time, and we didn't find a way to reproduce it manually. > For example, the issue may happen like this, note the changelog contains all > the `update`: > # got value 1 from key 1 > # update value 2 to key 1 > # got value 2 from key 1 > # update value 3 to key 1 > # got value 1 from key 1(something wrong!!) > # update value 2 to key 1 > there is only one type log as follow > > {code:java} > 2019-02-19x14:20:00x xx INFO > [org.apache.kafka.clients.FetchSessionHandler] > [xxx-streams-xx-xxx--xxx-xx-StreamThread-1] [Consumer > clientId=x--xxx-xxx--x-StreamThread-1-consumer, > groupId=x] Node 2 was unable to process the fetch request with > (sessionId=1998942517, epoch=4357): INVALID_FETCH_SESSION_EPOCH. > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
[ https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777630#comment-16777630 ] Jonathan Gordon commented on KAFKA-7652: I tested out with trunk on Feb 22 (commit 0d461e4ea0a8353c358ae661837f471995943bb0) and we're still seeing the same performance issue. Aside from logging the output of the NamedCache stats, is there data I can provide to help further narrow down the issue? Any other ideas? > Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0 > - > > Key: KAFKA-7652 > URL: https://issues.apache.org/jira/browse/KAFKA-7652 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, > 2.0.1 >Reporter: Jonathan Gordon >Assignee: Guozhang Wang >Priority: Major > Labels: kip > Fix For: 2.2.0 > > Attachments: kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt > > > I'm creating this issue in response to [~guozhang]'s request on the mailing > list: > [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E] > We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but > experience a severe performance degradation. The highest amount of CPU time > seems spent in retrieving from the local cache. Here's an example thread > profile with 0.11.0.0: > [https://i.imgur.com/l5VEsC2.png] > When things are running smoothly we're gated by retrieving from the state > store with acceptable performance. Here's an example thread profile with > 0.10.2.1: > [https://i.imgur.com/IHxC2cZ.png] > Some investigation reveals that it appears we're performing about 3 orders > magnitude more lookups on the NamedCache over a comparable time period. I've > attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3. > We're using session windows and have the app configured for > commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760 > I'm happy to share more details if they would be helpful. Also happy to run > tests on our data. > I also found this issue, which seems like it may be related: > https://issues.apache.org/jira/browse/KAFKA-4904 > > KIP-420: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-420%3A+Add+Single+Value+Fetch+in+Session+Stores] > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7186) Controller uses too much memory when sending out UpdateMetadataRequest that can cause OutOfMemoryError
[ https://issues.apache.org/jira/browse/KAFKA-7186?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777623#comment-16777623 ] ASF GitHub Bot commented on KAFKA-7186: --- hzxa21 commented on pull request #5519: KAFKA-7186: Avoid re-instantiating UpdateMetadataReuqest and struct objects to reduce controller memory usage URL: https://github.com/apache/kafka/pull/5519 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Controller uses too much memory when sending out UpdateMetadataRequest that > can cause OutOfMemoryError > -- > > Key: KAFKA-7186 > URL: https://issues.apache.org/jira/browse/KAFKA-7186 > Project: Kafka > Issue Type: Bug > Components: controller >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Major > > During controller failover and broker changes, it sends out > UpdateMetadataRequest to all brokers in the cluster containing the states for > all partitions and live brokers. The current implementation will instantiate > the UpdateMetadataRequest object and its serialized form (Struct) for <# of > brokers> times, which causes OOM if the memory exceeds the configure JVM heap > size. We have seen this issue in the production environment for multiple > times. > > For example, if we have 100 brokers in the cluster and each broker is the > leader of 2k partitions, the extra memory usage introduced by controller > trying to send out UpdateMetadataRequest is around: > * <# of brokers> * leader parittions> > = 250B * 100 * 200k = 5GB -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer
[ https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16777646#comment-16777646 ] Patrik Kleindl commented on KAFKA-7996: --- You are right, I must haven taken a wrong turn somewhere. Let me check the logs and get back to you. One question, at which point will the resources from the old instance be released so that a newly started one could take over? > KafkaStreams does not pass timeout when closing Producer > > > Key: KAFKA-7996 > URL: https://issues.apache.org/jira/browse/KAFKA-7996 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Patrik Kleindl >Assignee: Lee Dongjin >Priority: Major > Labels: needs-kip > > [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/] > We are running 2.1 and have a case where the shutdown of a streams > application takes several minutes > I noticed that although we call streams.close with a timeout of 30 seconds > the log says > [Producer > clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. > Matthias J Sax [vor 3 Tagen] > I just checked the code, and yes, we don't provide a timeout for the producer > on close()... -- This message was sent by Atlassian JIRA (v7.6.3#76005)