[jira] [Comment Edited] (KAFKA-7593) Infinite restart loop when failed to store big config for task
[ https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704 ] Oleg Kuznetsov edited comment on KAFKA-7593 at 11/7/18 5:27 AM: [~rhauch] Sure, the workaround is clear, but the ticket is more about letting user heads up about the problem preventively. For instance, by throwing an exception. Is it possible to have feedback from the broker that this payload is too big and just stop working? was (Author: olkuznsmith): [~rhauch] Sure, workaround is clear, but the ticket is more about letting user heads up about the problem preventively. For instance, by throwing exception. Is it possible to have feedback from broker that this payload is too big and just stop working? > Infinite restart loop when failed to store big config for task > -- > > Key: KAFKA-7593 > URL: https://issues.apache.org/jira/browse/KAFKA-7593 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > In case when config message for config topic is greater than kafka broker > allows to store, source connector starts infinite restart loop without any > error indication. > There could be an exception thrown in this case or a smarter handling of big > config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7593) Infinite restart loop when failed to store big config for task
[ https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704 ] Oleg Kuznetsov commented on KAFKA-7593: --- [~rhauch] Sure, workaround is clear, but the ticket is more about to let user head up about the problem preventively. For instance, by throwing exception. Is it possible to have feedback from broker that this payload is too big and just stop working? > Infinite restart loop when failed to store big config for task > -- > > Key: KAFKA-7593 > URL: https://issues.apache.org/jira/browse/KAFKA-7593 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > In case when config message for config topic is greater than kafka broker > allows to store, source connector starts infinite restart loop without any > error indication. > There could be an exception thrown in this case or a smarter handling of big > config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7593) Infinite restart loop when failed to store big config for task
[ https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677704#comment-16677704 ] Oleg Kuznetsov edited comment on KAFKA-7593 at 11/7/18 5:23 AM: [~rhauch] Sure, workaround is clear, but the ticket is more about letting user heads up about the problem preventively. For instance, by throwing exception. Is it possible to have feedback from broker that this payload is too big and just stop working? was (Author: olkuznsmith): [~rhauch] Sure, workaround is clear, but the ticket is more about to let user head up about the problem preventively. For instance, by throwing exception. Is it possible to have feedback from broker that this payload is too big and just stop working? > Infinite restart loop when failed to store big config for task > -- > > Key: KAFKA-7593 > URL: https://issues.apache.org/jira/browse/KAFKA-7593 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > In case when config message for config topic is greater than kafka broker > allows to store, source connector starts infinite restart loop without any > error indication. > There could be an exception thrown in this case or a smarter handling of big > config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException
[ https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677601#comment-16677601 ] Jin Tianfan commented on KAFKA-3980: [~rsivaram] Unfortunately,we did not open jmx port,so we can not analysis these metrics directly.but in the dump file.the find numerous string like "client-id=admin-\{anyNumber}" or "client-id=consumer-\{anyNumber}" .such as in the picture below: !heap_img.png! howerver,the dump file size is too big,I compressed it,and still 800M 。i dont konw how to share it. > JmxReporter uses excessive memory causing OutOfMemoryException > -- > > Key: KAFKA-3980 > URL: https://issues.apache.org/jira/browse/KAFKA-3980 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 0.9.0.1 >Reporter: Andrew Jorgensen >Priority: Major > Attachments: heap_img.png > > > I have some nodes in a kafka cluster that occasionally will run out of memory > whenever I restart the producers. I was able to take a heap dump from both a > recently restarted Kafka node which weighed in at about 20 MB and a node that > has been running for 2 months is using over 700MB of memory. Looking at the > heap dump it looks like the JmxReporter is holding on to metrics and causing > them to build up over time. > !http://imgur.com/N6Cd0Ku.png! > !http://imgur.com/kQBqA2j.png! > The ultimate problem this causes is that there is a chance when I restart the > producers it will cause the node to experience an Java heap space exception > and OOM. The nodes then fail to startup correctly and write a -1 as the > leader number to the partitions they were responsible for effectively > resetting the offset and rendering that partition unavailable. The kafka > process then needs to go be restarted in order to re-assign the node to the > partition that it owns. > I have a few questions: > 1. I am not quite sure why there are so many client id entries in that > JmxReporter map. > 2. Is there a way to have the JmxReporter release metrics after a set amount > of time or a way to turn certain high cardinality metrics like these off? > I can provide any logs or heap dumps if more information is needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException
[ https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jin Tianfan updated KAFKA-3980: --- Attachment: heap_img.png > JmxReporter uses excessive memory causing OutOfMemoryException > -- > > Key: KAFKA-3980 > URL: https://issues.apache.org/jira/browse/KAFKA-3980 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 0.9.0.1 >Reporter: Andrew Jorgensen >Priority: Major > Attachments: heap_img.png > > > I have some nodes in a kafka cluster that occasionally will run out of memory > whenever I restart the producers. I was able to take a heap dump from both a > recently restarted Kafka node which weighed in at about 20 MB and a node that > has been running for 2 months is using over 700MB of memory. Looking at the > heap dump it looks like the JmxReporter is holding on to metrics and causing > them to build up over time. > !http://imgur.com/N6Cd0Ku.png! > !http://imgur.com/kQBqA2j.png! > The ultimate problem this causes is that there is a chance when I restart the > producers it will cause the node to experience an Java heap space exception > and OOM. The nodes then fail to startup correctly and write a -1 as the > leader number to the partitions they were responsible for effectively > resetting the offset and rendering that partition unavailable. The kafka > process then needs to go be restarted in order to re-assign the node to the > partition that it owns. > I have a few questions: > 1. I am not quite sure why there are so many client id entries in that > JmxReporter map. > 2. Is there a way to have the JmxReporter release metrics after a set amount > of time or a way to turn certain high cardinality metrics like these off? > I can provide any logs or heap dumps if more information is needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7313) StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists
[ https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved KAFKA-7313. - Resolution: Fixed > StopReplicaRequest should attempt to remove future replica for the partition > only if future replica exists > -- > > Key: KAFKA-7313 > URL: https://issues.apache.org/jira/browse/KAFKA-7313 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > This patch fixes two issues: > 1) Currently if a broker received StopReplicaRequest with delete=true for the > same offline replica, the first StopRelicaRequest will show > KafkaStorageException and the second StopRelicaRequest will show > ReplicaNotAvailableException. This is because the first StopRelicaRequest > will remove the mapping (tp -> ReplicaManager.OfflinePartition) from > ReplicaManager.allPartitions before returning KafkaStorageException, thus the > second StopRelicaRequest will not find this partition as offline. > This result appears to be inconsistent. And since the replica is already > offline and broker will not be able to delete file for this replica, the > StopReplicaRequest should fail without making any change and broker should > still remember that this replica is offline. > 2) Currently if broker receives StopReplicaRequest with delete=true, the > broker will attempt to remove future replica for the partition, which will > cause KafkaStorageException in the StopReplicaResponse if this replica does > not have future replica. It is problematic to always return > KafkaStorageException in the response if future replica does not exist. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7481: Fix Version/s: (was: 2.1.0) 2.2.0 > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 2.2.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7537) Only include live brokers in the UpdateMetadataRequest sent to existing brokers if there is no change in the partition states
[ https://issues.apache.org/jira/browse/KAFKA-7537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-7537. Resolution: Fixed Fix Version/s: 2.2.0 Merged the PR to trunk. > Only include live brokers in the UpdateMetadataRequest sent to existing > brokers if there is no change in the partition states > - > > Key: KAFKA-7537 > URL: https://issues.apache.org/jira/browse/KAFKA-7537 > Project: Kafka > Issue Type: Improvement > Components: controller >Reporter: Zhanxiang (Patrick) Huang >Assignee: Zhanxiang (Patrick) Huang >Priority: Major > Fix For: 2.2.0 > > > Currently if when brokers join/leave the cluster without any partition states > changes, controller will send out UpdateMetadataRequests containing the > states of all partitions to all brokers. But for existing brokers in the > cluster, the metadata diff between controller and the broker should only be > the "live_brokers" info. Only the brokers with empty metadata cache need the > full UpdateMetadataRequest. Sending the full UpdateMetadataRequest to all > brokers can place nonnegligible memory pressure on the controller side. > Let's say in total we have N brokers, M partitions in the cluster and we want > to add 1 brand new broker in the cluster. With RF=2, the memory footprint per > partition in the UpdateMetadataRequest is ~200 Bytes. In the current > controller implementation, if each of the N RequestSendThreads serializes and > sends out the UpdateMetadataRequest at roughly the same time (which is very > likely the case), we will end up using *(N+1)*M*200B*. In a large kafka > cluster, we can have: > {noformat} > N=99 > M=100k > Memory usage to send out UpdateMetadataRequest to all brokers: > 100 * 100K * 200B = 2G > However, we only need to send out full UpdateMetadataRequest to the newly > added broker. We only need to include live broker ids (4B * 100 brokers) in > the UpdateMetadataRequest sent to the existing 99 brokers. So the amount of > data that is actully needed will be: > 1 * 100K * 200B + 99 * (100 * 4B) = ~21M > We will can potentially reduce 2G / 21M = ~95x memory footprint as well as > the data tranferred in the network.{noformat} > > This issue kind of hurts the scalability of a kafka cluster. KIP-380 and > KAFKA-7186 also help to further reduce the controller memory footprint. > > In terms of implementation, we can keep some in-memory state in the > controller side to differentiate existing brokers and uninitialized brokers > (e.g. brand new brokers) so that if there is no change in partition states, > we only send out live brokers info to existing brokers. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7537) Only include live brokers in the UpdateMetadataRequest sent to existing brokers if there is no change in the partition states
[ https://issues.apache.org/jira/browse/KAFKA-7537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677407#comment-16677407 ] ASF GitHub Bot commented on KAFKA-7537: --- junrao closed pull request #5869: KAFKA-7537: Avoid sending full UpdateMetadataRequest to existing brokers in the cluster on broker changes to reduce controller memory footprint URL: https://github.com/apache/kafka/pull/5869 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 85da8b8c0b2..a11f5535bda 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -383,13 +383,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge } } -val givenPartitions = if (partitions.isEmpty) - controllerContext.partitionLeadershipInfo.keySet -else - partitions - updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0) -givenPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, +partitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = controller.topicDeletionManager.topicsToBeDeleted.contains(partition.topic))) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 740ab7ff78c..a52f3f02363 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -254,7 +254,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti // they can process the LeaderAndIsrRequests that are generated by replicaStateMachine.startup() and // partitionStateMachine.startup(). info("Sending update metadata request") - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) replicaStateMachine.startup() partitionStateMachine.startup() @@ -357,11 +357,14 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti info(s"New broker startup callback for ${newBrokers.mkString(",")}") newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove) val newBrokersSet = newBrokers.toSet -// send update metadata request to all live and shutting down brokers. Old brokers will get to know of the new -// broker via this update. +val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds -- newBrokers +// Send update metadata request to all the existing brokers in the cluster so that they know about the new brokers +// via this update. No need to include any partition states in the request since there are no partition state changes. +sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty) +// Send update metadata request to all the new brokers in the cluster with a full set of partition states for initialization. // In cases of controlled shutdown leaders will not be elected when a new broker comes up. So at least in the -// common controlled shutdown case, the metadata will reach the new brokers faster - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) +// common controlled shutdown case, the metadata will reach the new brokers faster. +sendUpdateMetadataRequest(newBrokers, controllerContext.partitionLeadershipInfo.keySet) // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet) @@ -421,7 +424,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti private def onBrokerUpdate(updatedBrokerId: Int) { info(s"Broker info update callback for $updatedBrokerId") - sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) + sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty) } /** @@ -458,10 +461,10 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti topicDeletionManager.failReplicaDeletion(newOfflineReplicasForDeletion) } -// If replica failure did not require leader re-election, inform brokers of
[jira] [Created] (KAFKA-7602) Improve usage of @see tag in Streams JavaDocs
Matthias J. Sax created KAFKA-7602: -- Summary: Improve usage of @see tag in Streams JavaDocs Key: KAFKA-7602 URL: https://issues.apache.org/jira/browse/KAFKA-7602 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax As discussed on this PR [https://github.com/apache/kafka/pull/5273/files/bd8410ed3d5be9ca89e963687aa05e953d712b62..e4e3eed141447baf1c70ff15e2dc0df4e9a33f12#r223510489] we extensively use `@see` tags in Streams API Java docs. This ticket is about revisiting all public JavaDocs (KStream, KTable, KGroupedStream, KGroupedTable, etc) and to define and document (in the wiki) a coherent strategy about the usage of `@see` tag, with the goal to guide users on how to use the API, and not too use `@see` too often to avoid confusion. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677334#comment-16677334 ] ASF GitHub Bot commented on KAFKA-7481: --- lindong28 closed pull request #5857: KAFKA-7481; Add upgrade/downgrade notes for 2.1.x URL: https://github.com/apache/kafka/pull/5857 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/upgrade.html b/docs/upgrade.html index 41e2277bb24..33d9964113a 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -20,6 +20,47 @@
[jira] [Reopened] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin reopened KAFKA-7481: - > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved KAFKA-7481. - Resolution: Fixed > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7481) Consider options for safer upgrade of offset commit value schema
[ https://issues.apache.org/jira/browse/KAFKA-7481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin reassigned KAFKA-7481: --- Assignee: Jason Gustafson > Consider options for safer upgrade of offset commit value schema > > > Key: KAFKA-7481 > URL: https://issues.apache.org/jira/browse/KAFKA-7481 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Blocker > Fix For: 2.1.0 > > > KIP-211 and KIP-320 add new versions of the offset commit value schema. The > use of the new schema version is controlled by the > `inter.broker.protocol.version` configuration. Once the new inter-broker > version is in use, it is not possible to downgrade since the older brokers > will not be able to parse the new schema. > The options at the moment are the following: > 1. Do nothing. Users can try the new version and keep > `inter.broker.protocol.version` locked to the old release. Downgrade will > still be possible, but users will not be able to test new capabilities which > depend on inter-broker protocol changes. > 2. Instead of using `inter.broker.protocol.version`, we could use > `message.format.version`. This would basically extend the use of this config > to apply to all persistent formats. The advantage is that it allows users to > upgrade the broker and begin using the new inter-broker protocol while still > allowing downgrade. But features which depend on the persistent format could > not be tested. > Any other options? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6812) Async ConsoleProducer exits with 0 status even after data loss
[ https://issues.apache.org/jira/browse/KAFKA-6812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677312#comment-16677312 ] Kamal Kang commented on KAFKA-6812: --- [~enether], sure, I can write KIP for this issue. > Async ConsoleProducer exits with 0 status even after data loss > -- > > Key: KAFKA-6812 > URL: https://issues.apache.org/jira/browse/KAFKA-6812 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 1.1.0 >Reporter: Andras Beni >Assignee: Stanislav Kozlovski >Priority: Minor > > When {{ConsoleProducer}} is run without {{--sync}} flag and one of the > batches times out, {{ErrorLoggingCallback}} logs the error: > {code:java} > 18/04/21 04:23:01 WARN clients.NetworkClient: [Producer > clientId=console-producer] Connection to node 10 could not be established. > Broker may not be available. > 18/04/21 04:23:02 ERROR internals.ErrorLoggingCallback: Error when sending > message to topic my-topic with key: null, value: 8 bytes with error: > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > my-topic-0: 1530 ms has passed since batch creation plus linger time{code} > However, the tool exits with status code 0. > In my opinion the tool should indicate in the exit status that there was > data lost. Maybe it's reasonable to exit after the first error. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR
[ https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677302#comment-16677302 ] Jaume M commented on KAFKA-6522: I believe this is happening to me because there were some kafa nodes with the same zookeeper cluster that I wasn't aware of. Which would make sense with what was said two comments before this one. > Retrying leaderEpoch request for partition xxx as the leader reported an > error: UNKNOWN_SERVER_ERROR > > > Key: KAFKA-6522 > URL: https://issues.apache.org/jira/browse/KAFKA-6522 > Project: Kafka > Issue Type: New Feature > Components: core >Affects Versions: 1.0.0 > Environment: Ubuntu 16.04 LTS 64bit-server >Reporter: Wang Shuxiao >Priority: Major > > we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 > fails to fetch data from leader: > {code:java} > [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as > the leader reported an error: UNKNOWN_SERVER_ERROR > (kafka.server.ReplicaFetcherThread) > [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=3] Error when sending leader epoch request for > Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, > sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, > sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, > pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, > pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 > -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, > pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, > __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, > sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 > -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, > sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 > -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, > smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, > __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> > -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 401 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) > at > kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code} > > on the leader(broker-401) side, the log shows: > {code:java} > [2018-02-02 08:58:26,859] ERROR Closing socket for > 192.168.100.101:9099-192.168.100.103:30476 because of error > (kafka.network.Processor) > org.apache.kafka.common.errors.InvalidRequestException: Error getting request > for apiKey: 23 and apiVersion: 0 > Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it > should be between `0` and `20` (inclusive) > at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73) > at > org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) > at > kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96) > at kafka.network.RequestChannel$Request.(RequestChannel.scala:91) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487) > at kafka.network.Processor.run(SocketServer.scala:417) > at java.lang.Thread.run(Thread.java:745){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7313) StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists
[ https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677229#comment-16677229 ] ASF GitHub Bot commented on KAFKA-7313: --- lindong28 closed pull request #5533: KAFKA-7313; StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists URL: https://github.com/apache/kafka/pull/5533 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 819da2cfd42..745c89a393b 100755 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -352,7 +352,8 @@ class Partition(val topicPartition: TopicPartition, leaderEpochStartOffsetOpt = None removePartitionMetrics() logManager.asyncDelete(topicPartition) - logManager.asyncDelete(topicPartition, isFuture = true) + if (logManager.getLog(topicPartition, isFuture = true).isDefined) +logManager.asyncDelete(topicPartition, isFuture = true) } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 39029b078d2..26bfbe9e0fe 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -845,7 +845,7 @@ class LogManager(logDirs: Seq[File], addLogToBeDeleted(removedLog) info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion") } else if (offlineLogDirs.nonEmpty) { - throw new KafkaStorageException("Failed to delete log for " + topicPartition + " because it may be in one of the offline directories " + offlineLogDirs.mkString(",")) + throw new KafkaStorageException(s"Failed to delete log for ${if (isFuture) "future" else ""} $topicPartition because it may be in one of the offline directories ${offlineLogDirs.mkString(",")}") } removedLog } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 1146befdc8e..443a5cfd08b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -338,8 +338,10 @@ class ReplicaManager(val config: KafkaConfig, if (deletePartition) { val removedPartition = allPartitions.remove(topicPartition) - if (removedPartition eq ReplicaManager.OfflinePartition) + if (removedPartition eq ReplicaManager.OfflinePartition) { +allPartitions.put(topicPartition, ReplicaManager.OfflinePartition) throw new KafkaStorageException(s"Partition $topicPartition is on an offline disk") + } if (removedPartition != null) { val topicHasPartitions = allPartitions.values.exists(partition => topicPartition.topic == partition.topic) @@ -1402,7 +1404,8 @@ class ReplicaManager(val config: KafkaConfig, } // logDir should be an absolute path - def handleLogDirFailure(dir: String) { + // sendZkNotification is needed for unit test + def handleLogDirFailure(dir: String, sendZkNotification: Boolean = true) { if (!logManager.isLogDirOnline(dir)) return info(s"Stopping serving replicas in dir $dir") @@ -1438,7 +1441,9 @@ class ReplicaManager(val config: KafkaConfig, s"for partitions ${partitionsWithOfflineFutureReplica.mkString(",")} because they are in the failed log directory $dir.") } logManager.handleLogDirFailure(dir) -zkClient.propagateLogDirEvent(localBrokerId) + +if (sendZkNotification) + zkClient.propagateLogDirEvent(localBrokerId) info(s"Stopped serving replicas in dir $dir") } diff --git a/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala new file mode 100644 index 000..5df61ebe56e --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/StopReplicaRequestTest.scala @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is
[jira] [Updated] (KAFKA-7313) StopReplicaRequest should attempt to remove future replica for the partition only if future replica exists
[ https://issues.apache.org/jira/browse/KAFKA-7313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7313: Fix Version/s: (was: 2.1.1) 2.1.0 2.0.1 > StopReplicaRequest should attempt to remove future replica for the partition > only if future replica exists > -- > > Key: KAFKA-7313 > URL: https://issues.apache.org/jira/browse/KAFKA-7313 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > This patch fixes two issues: > 1) Currently if a broker received StopReplicaRequest with delete=true for the > same offline replica, the first StopRelicaRequest will show > KafkaStorageException and the second StopRelicaRequest will show > ReplicaNotAvailableException. This is because the first StopRelicaRequest > will remove the mapping (tp -> ReplicaManager.OfflinePartition) from > ReplicaManager.allPartitions before returning KafkaStorageException, thus the > second StopRelicaRequest will not find this partition as offline. > This result appears to be inconsistent. And since the replica is already > offline and broker will not be able to delete file for this replica, the > StopReplicaRequest should fail without making any change and broker should > still remember that this replica is offline. > 2) Currently if broker receives StopReplicaRequest with delete=true, the > broker will attempt to remove future replica for the partition, which will > cause KafkaStorageException in the StopReplicaResponse if this replica does > not have future replica. It is problematic to always return > KafkaStorageException in the response if future replica does not exist. > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7559) ConnectStandaloneFileTest system tests do not pass
[ https://issues.apache.org/jira/browse/KAFKA-7559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved KAFKA-7559. - Resolution: Fixed > ConnectStandaloneFileTest system tests do not pass > -- > > Key: KAFKA-7559 > URL: https://issues.apache.org/jira/browse/KAFKA-7559 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Randall Hauch >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > Both tests `test_skip_and_log_to_dlq` and `test_file_source_and_sink` under > `kafkatest.tests.connect.connect_test.ConnectStandaloneFileTest` fail with > error messages similar to: > "TimeoutError: Kafka Connect failed to start on node: ducker@ducker04 in > condition mode: LISTEN" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7559) ConnectStandaloneFileTest system tests do not pass
[ https://issues.apache.org/jira/browse/KAFKA-7559?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7559: Fix Version/s: 2.1.0 2.0.1 > ConnectStandaloneFileTest system tests do not pass > -- > > Key: KAFKA-7559 > URL: https://issues.apache.org/jira/browse/KAFKA-7559 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Randall Hauch >Priority: Major > Fix For: 2.0.1, 2.1.0 > > > Both tests `test_skip_and_log_to_dlq` and `test_file_source_and_sink` under > `kafkatest.tests.connect.connect_test.ConnectStandaloneFileTest` fail with > error messages similar to: > "TimeoutError: Kafka Connect failed to start on node: ducker@ducker04 in > condition mode: LISTEN" -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR
[ https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677217#comment-16677217 ] Jaume M commented on KAFKA-6522: Same happens to me by the end of this tutorial if I follow it without any deviation: [https://data-flair.training/blogs/kafka-cluster/] . It goes away when I restart the nodes. > Retrying leaderEpoch request for partition xxx as the leader reported an > error: UNKNOWN_SERVER_ERROR > > > Key: KAFKA-6522 > URL: https://issues.apache.org/jira/browse/KAFKA-6522 > Project: Kafka > Issue Type: New Feature > Components: core >Affects Versions: 1.0.0 > Environment: Ubuntu 16.04 LTS 64bit-server >Reporter: Wang Shuxiao >Priority: Major > > we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 > fails to fetch data from leader: > {code:java} > [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as > the leader reported an error: UNKNOWN_SERVER_ERROR > (kafka.server.ReplicaFetcherThread) > [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=3] Error when sending leader epoch request for > Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, > sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, > sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, > pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, > pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 > -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, > pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, > __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, > sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 > -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, > sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 > -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, > smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, > __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> > -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 401 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) > at > kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code} > > on the leader(broker-401) side, the log shows: > {code:java} > [2018-02-02 08:58:26,859] ERROR Closing socket for > 192.168.100.101:9099-192.168.100.103:30476 because of error > (kafka.network.Processor) > org.apache.kafka.common.errors.InvalidRequestException: Error getting request > for apiKey: 23 and apiVersion: 0 > Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it > should be between `0` and `20` (inclusive) > at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73) > at > org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) > at > kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96) > at kafka.network.RequestChannel$Request.(RequestChannel.scala:91) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487) > at kafka.network.Processor.run(SocketServer.scala:417) > at java.lang.Thread.run(Thread.java:745){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6522) Retrying leaderEpoch request for partition xxx as the leader reported an error: UNKNOWN_SERVER_ERROR
[ https://issues.apache.org/jira/browse/KAFKA-6522?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677217#comment-16677217 ] Jaume M edited comment on KAFKA-6522 at 11/6/18 7:33 PM: - Same happens to me by the end of this tutorial if I follow it without any deviation: [https://data-flair.training/blogs/kafka-cluster/] . It goes away when I restart the nodes. Using in my case \{{kafka_2.11-2.0.0.tgz}} for the three nodes. was (Author: jmarhuen): Same happens to me by the end of this tutorial if I follow it without any deviation: [https://data-flair.training/blogs/kafka-cluster/] . It goes away when I restart the nodes. > Retrying leaderEpoch request for partition xxx as the leader reported an > error: UNKNOWN_SERVER_ERROR > > > Key: KAFKA-6522 > URL: https://issues.apache.org/jira/browse/KAFKA-6522 > Project: Kafka > Issue Type: New Feature > Components: core >Affects Versions: 1.0.0 > Environment: Ubuntu 16.04 LTS 64bit-server >Reporter: Wang Shuxiao >Priority: Major > > we have 3 brokers in a kafka cluster(brokerid:401,402,403). The broker-403 > fails to fetch data from leader: > {code:java} > [2018-02-02 08:58:26,861] INFO [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=0] Retrying leaderEpoch request for partition sub_payone1hour-0 as > the leader reported an error: UNKNOWN_SERVER_ERROR > (kafka.server.ReplicaFetcherThread) > [2018-02-02 08:58:26,865] WARN [ReplicaFetcher replicaId=403, leaderId=401, > fetcherId=3] Error when sending leader epoch request for > Map(sub_myshardSinfo-3 -> -1, sub_myshardUinfo-1 -> -1, > sub_videoOnlineResourceType8Test-0 -> -1, pub_videoReportEevent-1 -> 9, > sub_StreamNofity-3 -> -1, pub_RsVideoInfo-1 -> -1, pub_lidaTopic3-15 -> -1, > pub_lidaTopic3-3 -> -1, sub_zwbtest-1 -> -1, sub_svAdminTagging-5 -> -1, > pub_channelinfoupdate-1 -> -1, pub_RsPlayInfo-4 -> -1, sub_tinyVideoWatch-4 > -> 14, __consumer_offsets-36 -> -1, pub_ybusAuditorChannel3-2 -> -1, > pub_vipPush-4 -> -1, sub_LivingNotifyOnline-3 -> -1, sub_baseonline-4 -> -1, > __consumer_offsets-24 -> -1, sub_lidaTopic-3 -> -1, > sub_mobileGuessGameReward-0 -> -1, pub_lidaTopic-6 -> -1, sub_NewUserAlgo-0 > -> -1, __consumer_offsets-48 -> -1, pub_RsUserBehavior-3 -> -1, > sub_channelinfoupdate-0 -> -1, pub_tinyVideoComment-1 -> -1, pub_bulletin-2 > -> -1, pub_RecordCompleteNotifition-6 -> -1, sub_lidaTopic2-3 -> -1, > smsgateway-10 -> -1, __consumer_offsets-0 -> -1, pub_baseonlinetest-1 -> -1, > __consumer_offsets-12 -> -1, pub_myshardUinfo-0 -> -1, pub_baseonline-3 -> > -1, smsGatewayMarketDbInfo-6 -> -1, sub_tinyVideoComment-0 -> 14) > (kafka.server.ReplicaFetcherThread) > java.io.IOException: Connection to 401 was disconnected before the response > was read > at > org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:95) > at > kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:96) > at > kafka.server.ReplicaFetcherThread.fetchEpochsFromLeader(ReplicaFetcherThread.scala:312) > at > kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:130) > at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:102) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64){code} > > on the leader(broker-401) side, the log shows: > {code:java} > [2018-02-02 08:58:26,859] ERROR Closing socket for > 192.168.100.101:9099-192.168.100.103:30476 because of error > (kafka.network.Processor) > org.apache.kafka.common.errors.InvalidRequestException: Error getting request > for apiKey: 23 and apiVersion: 0 > Caused by: java.lang.IllegalArgumentException: Unexpected ApiKeys id `23`, it > should be between `0` and `20` (inclusive) > at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:73) > at > org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39) > at > kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:96) > at kafka.network.RequestChannel$Request.(RequestChannel.scala:91) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:492) > at > kafka.network.Processor$$anonfun$processCompletedReceives$1.apply(SocketServer.scala:487) > at scala.collection.Iterator$class.foreach(Iterator.scala:893) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at kafka.network.Processor.processCompletedReceives(SocketServer.scala:487) > at kafka.network.Processor.run(SocketServer.scala:417) > at java.lang.Thread.run(Thread.java:745){code}
[jira] [Commented] (KAFKA-7560) PushHttpMetricsReporter should not convert metric value to double
[ https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677207#comment-16677207 ] ASF GitHub Bot commented on KAFKA-7560: --- lindong28 opened a new pull request #5886: KAFKA-7560; PushHttpMetricsReporter should not convert metric value t… URL: https://github.com/apache/kafka/pull/5886 @ijuma @omkreddy would you have time to review this patch? It seems to be the last issue for the 2.1 system test. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > PushHttpMetricsReporter should not convert metric value to double > - > > Key: KAFKA-7560 > URL: https://issues.apache.org/jira/browse/KAFKA-7560 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Dong Lin >Priority: Blocker > > Currently PushHttpMetricsReporter will convert value from > KafkaMetric.metricValue() to double. This will not work for non-numerical > metrics such as version in AppInfoParser whose value can be string. This has > caused issue for PushHttpMetricsReporter which in turn caused system test > kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail with the > following exception: > {code:java} > File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, > in validate metric.value for k, metrics in > producer.metrics(group='producer-metrics', name='outgoing-byte-rate', > client_id=producer.client_id) for metric in metrics ValueError: max() arg is > an empty sequence > {code} > Since we allow metric value to be object, PushHttpMetricsReporter should also > read metric value as object and pass it to the http server. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7560) PushHttpMetricsReporter should not convert metric value to double
[ https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7560: Description: Currently PushHttpMetricsReporter will convert value from KafkaMetric.metricValue() to double. This will not work for non-numerical metrics such as version in AppInfoParser whose value can be string. This has caused issue for PushHttpMetricsReporter which in turn caused system test kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail with the following exception: {code:java} File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, in validate metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics ValueError: max() arg is an empty sequence {code} Since we allow metric value to be object, PushHttpMetricsReporter should also read metric value as object and pass it to the http server. was: Currently metricValue The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails when I run it locally. It produces the following error message: {code:java} File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, in validate metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics ValueError: max() arg is an empty sequence {code} I assume it cannot find the metric it's searching for > PushHttpMetricsReporter should not convert metric value to double > - > > Key: KAFKA-7560 > URL: https://issues.apache.org/jira/browse/KAFKA-7560 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Dong Lin >Priority: Blocker > > Currently PushHttpMetricsReporter will convert value from > KafkaMetric.metricValue() to double. This will not work for non-numerical > metrics such as version in AppInfoParser whose value can be string. This has > caused issue for PushHttpMetricsReporter which in turn caused system test > kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail with the > following exception: > {code:java} > File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, > in validate metric.value for k, metrics in > producer.metrics(group='producer-metrics', name='outgoing-byte-rate', > client_id=producer.client_id) for metric in metrics ValueError: max() arg is > an empty sequence > {code} > Since we allow metric value to be object, PushHttpMetricsReporter should also > read metric value as object and pass it to the http server. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677173#comment-16677173 ] Matthias J. Sax commented on KAFKA-7595: Just read [~vvcephei] comments. This is not a bug but expected behavior. We have an example in the wiki about how to compute average correctly: [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns#KafkaStreamUsagePatterns-Howtocomputean(windowed)average?] > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-7595. Resolution: Not A Bug > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7560) PushHttpMetricsReporter should not convert metric value to double
[ https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7560: Description: Currently metricValue The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails when I run it locally. It produces the following error message: {code:java} File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, in validate metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics ValueError: max() arg is an empty sequence {code} I assume it cannot find the metric it's searching for was: The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails when I run it locally. It produces the following error message: {code:java} File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, in validate metric.value for k, metrics in producer.metrics(group='producer-metrics', name='outgoing-byte-rate', client_id=producer.client_id) for metric in metrics ValueError: max() arg is an empty sequence {code} I assume it cannot find the metric it's searching for > PushHttpMetricsReporter should not convert metric value to double > - > > Key: KAFKA-7560 > URL: https://issues.apache.org/jira/browse/KAFKA-7560 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Dong Lin >Priority: Blocker > > Currently metricValue > > The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails > when I run it locally. It produces the following error message: > {code:java} > File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, > in validate metric.value for k, metrics in > producer.metrics(group='producer-metrics', name='outgoing-byte-rate', > client_id=producer.client_id) for metric in metrics ValueError: max() arg is > an empty sequence > {code} > I assume it cannot find the metric it's searching for -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7560) PushHttpMetricsReporter should not convert metric value to double
[ https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7560: Summary: PushHttpMetricsReporter should not convert metric value to double (was: Client Quota - system test failure) > PushHttpMetricsReporter should not convert metric value to double > - > > Key: KAFKA-7560 > URL: https://issues.apache.org/jira/browse/KAFKA-7560 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Dong Lin >Priority: Blocker > > The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails > when I run it locally. It produces the following error message: > {code:java} > File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, > in validate metric.value for k, metrics in > producer.metrics(group='producer-metrics', name='outgoing-byte-rate', > client_id=producer.client_id) for metric in metrics ValueError: max() arg is > an empty sequence > {code} > I assume it cannot find the metric it's searching for -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction
[ https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677148#comment-16677148 ] Jason Gustafson commented on KAFKA-7531: [~spuzon] Thanks for the extra info. Yeah, let us know if can reproduce it. Just looking at the trace, it's tough to tell what is causing the NPE. Which timeout are you referring to specifically? > NPE NullPointerException at TransactionCoordinator handleEndTransaction > --- > > Key: KAFKA-7531 > URL: https://issues.apache.org/jira/browse/KAFKA-7531 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 2.0.0 >Reporter: Sebastian Puzoń >Priority: Critical > Fix For: 2.1.1, 2.0.2 > > > Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper. > Streams Application 4 instances, each has 5 Streams threads, total 20 stream > threads. > I observe NPE NullPointerException at coordinator broker which causes all > application stream threads shutdown, here's stack from broker: > {code:java} > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member > elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe > in group elo > g_agg has failed, removing it from the group > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance > group elog_agg with old generation 49 (__consumer_offsets-21) > (kafka.coordinator.gro > up.GroupCoordinator) > [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group > elog_agg generation 50 (__consumer_offsets-21) > (kafka.coordinator.group.GroupCoordinator) > [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from > leader for group elog_agg for generation 50 > (kafka.coordinator.group.GroupCoordina > tor) > [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized > transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on > partition _ > _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator) > [ > [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request > {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr > ue} (kafka.server.KafkaApis) > java.lang.NullPointerException > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at > kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383) > at scala.util.Either$RightProjection.flatMap(Either.scala:702) > at > kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437) > at > kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619) > at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129) > at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70) > at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110) > at > kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495) > at > kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) > at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257) > at > kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590) > at > kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437) > at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653) > at kafka.server.KafkaApis.handle(KafkaApis.scala:132) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) > at java.lang.Thread.run(Thread.java:745) > [2018-10-22
[jira] [Commented] (KAFKA-7599) Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second in task status
[ https://issues.apache.org/jira/browse/KAFKA-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677128#comment-16677128 ] Colin P. McCabe commented on KAFKA-7599: bq. I propose we allow for unbounded `targetMessagesPerSec` if the field is not present. I guess the reason for having a limit by default is that it tends to give a better "out of the box" experience. If you produce as fast as you can, you can often make the cluster less responsive for others, which can be annoying. But I don't feel that strongly about it, I guess. bq. Further, it would be very useful if some of these workers showed the `messagesPerSecond` they have been producing/consuming at. Yeah. We may want a long-run and short-run average as well. > Trogdor - Allow configuration for not throttling Benchmark Workers and expose > messages per second in task status > > > Key: KAFKA-7599 > URL: https://issues.apache.org/jira/browse/KAFKA-7599 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Major > > In Trogdor, the ConsumeBench, ProduceBench and RoundTrip workers all take in > an argument called "targetMessagesPerSec". That argument works as an upper > bound on the number of messages that can be consumed/produced per second in > that worker. > It is useful to support infinite messages per second. Currently, if the > `targetMessagesPerSec` field is not present in the request, the > RoundTripWorker will raise an exception, whereas the ConsumeBench and > ProduceBench workers will work as if they had `targetMessagesPerSec=10`. > I propose we allow for unbounded `targetMessagesPerSec` if the field is not > present. > Further, it would be very useful if some of these workers showed the > `messagesPerSecond` they have been producing/consuming at. > Even now, giving the worker a `targetMessagesPerSec` does not guarantee that > the worker will reach the needed `targetMessagesPerSec`. There is no easy way > of knowing how the worker performed - you have to subtract the status fields > `startedMs` and `doneMs` to get the total duration of the task, convert to > seconds and then divide that by the `maxMessages` field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7560) Client Quota - system test failure
[ https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677126#comment-16677126 ] Dong Lin commented on KAFKA-7560: - Never mind. I just found the issue. > Client Quota - system test failure > -- > > Key: KAFKA-7560 > URL: https://issues.apache.org/jira/browse/KAFKA-7560 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Dong Lin >Priority: Blocker > > The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails > when I run it locally. It produces the following error message: > {code:java} > File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, > in validate metric.value for k, metrics in > producer.metrics(group='producer-metrics', name='outgoing-byte-rate', > client_id=producer.client_id) for metric in metrics ValueError: max() arg is > an empty sequence > {code} > I assume it cannot find the metric it's searching for -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7601) Handle message format downgrades during upgrade of message format version
[ https://issues.apache.org/jira/browse/KAFKA-7601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-7601: --- Description: During an upgrade of the message format, there is a short time during which the configured message format version is not consistent across all replicas of a partition. As long as all brokers are on the same binary version, this typically does not cause any problems. Followers will take whatever message format is used by the leader. However, it is possible for leadership to change several times between brokers which support the new format and those which support the old format. This can cause the version used in the log to flap between the different formats until the upgrade is complete. For example, suppose broker 1 has been updated to use v2 and broker 2 is still on v1. When broker 1 is the leader, all new messages will be written in the v2 format. When broker 2 is leader, v1 will be used. If there is any instability in the cluster or if completion of the update is delayed, then the log will be seen to switch back and forth between v1 and v2. Once the update is completed and broker 1 begins using v2, then the message format will stabilize and everything will generally be ok. Downgrades of the message format are problematic, even if they are just temporary. There are basically two issues: 1. We use the configured message format version to tell whether down-conversion is needed. We assume that the this is always the maximum version used in the log, but that assumption fails in the case of a downgrade. In the worst case, old clients will see the new format and likely fail. 2. The logic we use for finding the truncation offset during the become follower transition does not handle flapping between message formats. When the new format is used by the leader, then the epoch cache will be updated correctly. When the old format is in use, the epoch cache won't be updated. This can lead to an incorrect result to OffsetsForLeaderEpoch queries. We have actually observed the second problem. The scenario went something like this. Broker 1 is the leader of epoch 0 and writes some messages to the log using the v2 message format. Broker 2 then becomes the leader for epoch 1 and writes some messages in the v2 format. On broker 2, the last entry in the epoch cache is epoch 0. No entry is written for epoch 1 because it uses the old format. When broker 1 became a follower, it send an OffsetsForLeaderEpoch query to broker 2 for epoch 0. Since epoch 0 was the last entry in the cache, the log end offset was returned. This resulted in localized log divergence. There are a few options to fix this problem. From a high level, we can either be stricter about preventing downgrades of the message format, or we can add additional logic to make downgrades safe. (Disallow downgrades): As an example of the first approach, the leader could always use the maximum of the last version written to the log and the configured message format version. (Allow downgrades): If we want to allow downgrades, then it make makes sense to invalidate and remove all entries in the epoch cache following the message format downgrade. This would basically force us to revert to truncation to the high watermark, which is what you'd expect from a downgrade. We would also need a solution for the problem of detecting when down-conversion is needed for a fetch request. One option I've been thinking about is enforcing the invariant that each segment uses only one message format version. Whenever the message format changes, we need to roll a new segment. Then we can simply remember which format is in use by each segment to tell whether down-conversion is needed for a given fetch request. was: During an upgrade of the message format, there is a short time during which the configured message format version is not consistent across all replicas of a partition. As long as all brokers are on the same version, this typically does not cause any problems. Followers will take whatever message format is used by the leader. However, it is possible for leadership to change several times between brokers which support the new format and those which support the old format. This can cause the version used in the log to flap between the different formats until the upgrade is complete. For example, suppose broker 1 has been updated to use v2 and broker 2 is still on v1. When broker 1 is the leader, all new messages will be written in the v2 format. When broker 2 is leader, v1 will be used. If there is any instability in the cluster or if completion of the update is delayed, then the log will be seen to switch back and forth between v1 and v2. Once the update is completed and broker 1 begins using v2, then the message format will stabilize and everything will generally be ok. Downgrades of the message
[jira] [Updated] (KAFKA-7601) Handle message format downgrades during upgrade of message format version
[ https://issues.apache.org/jira/browse/KAFKA-7601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-7601: --- Description: During an upgrade of the message format, there is a short time during which the configured message format version is not consistent across all replicas of a partition. As long as all brokers are on the same version, this typically does not cause any problems. Followers will take whatever message format is used by the leader. However, it is possible for leadership to change several times between brokers which support the new format and those which support the old format. This can cause the version used in the log to flap between the different formats until the upgrade is complete. For example, suppose broker 1 has been updated to use v2 and broker 2 is still on v1. When broker 1 is the leader, all new messages will be written in the v2 format. When broker 2 is leader, v1 will be used. If there is any instability in the cluster or if completion of the update is delayed, then the log will be seen to switch back and forth between v1 and v2. Once the update is completed and broker 1 begins using v2, then the message format will stabilize and everything will generally be ok. Downgrades of the message format are problematic, even if they are just temporary. There are basically two issues: 1. We use the configured message format version to tell whether down-conversion is needed. We assume that the this is always the maximum version used in the log, but that assumption fails in the case of a downgrade. In the worst case, old clients will see the new format and likely fail. 2. The logic we use for finding the truncation offset during the become follower transition does not handle flapping between message formats. When the new format is used by the leader, then the epoch cache will be updated correctly. When the old format is in use, the epoch cache won't be updated. This can lead to an incorrect result to OffsetsForLeaderEpoch queries. We have actually observed the second problem. The scenario went something like this. Broker 1 is the leader of epoch 0 and writes some messages to the log using the v2 message format. Broker 2 then becomes the leader for epoch 1 and writes some messages in the v2 format. On broker 2, the last entry in the epoch cache is epoch 0. No entry is written for epoch 1 because it uses the old format. When broker 1 became a follower, it send an OffsetsForLeaderEpoch query to broker 2 for epoch 0. Since epoch 0 was the last entry in the cache, the log end offset was returned. This resulted in localized log divergence. There are a few options to fix this problem. From a high level, we can either be stricter about preventing downgrades of the message format, or we can add additional logic to make downgrades safe. (Disallow downgrades): As an example of the first approach, the leader could always use the maximum of the last version written to the log and the configured message format version. (Allow downgrades): If we want to allow downgrades, then it make makes sense to invalidate and remove all entries in the epoch cache following the message format downgrade. This would basically force us to revert to truncation to the high watermark, which is what you'd expect from a downgrade. We would also need a solution for the problem of detecting when down-conversion is needed for a fetch request. One option I've been thinking about is enforcing the invariant that each segment uses only one message format version. Whenever the message format changes, we need to roll a new segment. Then we can simply remember which format is in use by each segment to tell whether down-conversion is needed for a given fetch request. was: During an upgrade of the message format, there is a short time during which the configured message format version is not consistent across all replicas of a partition. As long as all brokers are on the same version, this typically does not cause any problems. Followers will take whatever message format is used by the leader. However, it is possible for leadership to change several times between brokers which support the new format and those which support the old format. This can cause the version used in the log to flap between the different formats until the upgrade is complete. For example, suppose broker 1 has been updated to use v2 and broker 2 is still on v1. When broker 1 is the leader, all new messages will be written in the v2 format. When broker 2 is leader, v1 will be used. If there is any instability in the cluster or if completion of the update is delayed, then the log will be seen to switch back and forth between v1 and v2. Once the update is completed and broker 1 begins using v2, then the message format will stabilize and everything will generally be ok. Downgrades of the message format
[jira] [Created] (KAFKA-7601) Handle message format downgrades during upgrade of message format version
Jason Gustafson created KAFKA-7601: -- Summary: Handle message format downgrades during upgrade of message format version Key: KAFKA-7601 URL: https://issues.apache.org/jira/browse/KAFKA-7601 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson During an upgrade of the message format, there is a short time during which the configured message format version is not consistent across all replicas of a partition. As long as all brokers are on the same version, this typically does not cause any problems. Followers will take whatever message format is used by the leader. However, it is possible for leadership to change several times between brokers which support the new format and those which support the old format. This can cause the version used in the log to flap between the different formats until the upgrade is complete. For example, suppose broker 1 has been updated to use v2 and broker 2 is still on v1. When broker 1 is the leader, all new messages will be written in the v2 format. When broker 2 is leader, v1 will be used. If there is any instability in the cluster or if completion of the update is delayed, then the log will be seen to switch back and forth between v1 and v2. Once the update is completed and broker 1 begins using v2, then the message format will stabilize and everything will generally be ok. Downgrades of the message format are problematic, even if they are just temporary. There are basically two issues: 1. We use the configured message format version to tell whether down-conversion is needed. We assume that the this is always the maximum version used in the log, but that assumption fails in the case of a downgrade. In the worst case, old clients will see the new format and likely fail. 2. The logic we use for finding the truncation offset during the become follower transition does not handle flapping between message formats. When the new format is used by the leader, then the epoch cache will be updated correctly. When the old format is in use, the epoch cache won't be updated. This can lead to an incorrect result to OffsetsForLeaderEpoch queries. For the second point, the specific case we observed is something like this. Broker 1 is the leader of epoch 0 and writes some messages to the log using the v2 message format. Broker 2 then becomes the leader for epoch 1 and writes some messages in the v2 format. On broker 2, the last entry in the epoch cache is epoch 0. No entry is written for epoch 1 because it uses the old format. When broker 1 became a follower, it send an OffsetsForLeaderEpoch query to broker 2 for epoch 0. Since epoch 0 was the last entry in the cache, the log end offset was returned. This resulted in localized log divergence. There are a few options to fix this problem. From a high level, we can either be stricter about preventing downgrades of the message format, or we can add additional logic to make downgrades safe. (Disallow downgrades): As an example of the first approach, the leader could always use the maximum of the last version written to the log and the configured message format version. (Allow downgrades): If we want to allow downgrades, then it make makes sense to invalidate and remove all entries in the epoch cache following the message format downgrade. We would also need a solution for the problem of detecting when down-conversion is needed for a fetch request. One option I've been thinking about is enforcing the invariant that each segment uses only one message format version. Whenever the message format changes, we need to roll a new segment. Then we can simply remember which format is in use by each segment to tell whether down-conversion is needed for a given fetch request. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6890) Add connector level configurability for producer/consumer client configs
[ https://issues.apache.org/jira/browse/KAFKA-6890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16677049#comment-16677049 ] Allen Tang commented on KAFKA-6890: --- I really appreciate your input, Randall! The KIP to accompany this JIRA is here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-296%3A+Connector+level+configurability+for+client+configs To respond to your concerns, I should also add that defining bootstrap.servers at the connector level and then subsequently changing the bootstrap.servers configuration at the worker level is actually something that we have performed in the past as it was the most graceful solution to what we were trying to achieve. We've had an occasion where we were tasked with provisioning an entirely new Kafka cluster, migrating all topics from the old cluster to the new cluster, and reconfiguring all producers and consumers that were interfacing with the old cluster to instead point to the new cluster -- Kafka Connect connectors included. In the absence of a bootstrap.servers overriding capability on a connector-by-connector basis, we would only have the worker bootstrap.servers to go by, which meant we would have to face a highly coordinated inter-departmental effort with over forty connectors within the Kafka Connect cluster to account for, along with all of their downstream business-facing real-time implications after flipping the cluster-wide switch. By providing configurability of bootstrap.servers at the connector-level, connectors became decoupled from one another, and they were no longer required to read/write data where the internal topics live, allowing for customers to migrate their connectors over to the new Kafka cluster at their own pace and on their own schedule. Eventually, the three internal topics used for Kafka Connect were mirrored over to the new Kafka cluster and the cluster-wide boostrap.servers configuration change was applied, thereby fully decoupling Kafka Connect from the older Kafka cluster. If blacklisting overrides for specific client configs, like bootstrap.servers, is something you feel strongly about, we may be able to achieve this via whitelisting of client configs, defined by administrators of Kafka Connect clusters, at the cluster-level within worker properties. Let me know your thoughts on this. > Add connector level configurability for producer/consumer client configs > > > Key: KAFKA-6890 > URL: https://issues.apache.org/jira/browse/KAFKA-6890 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Allen Tang >Priority: Minor > > Right now, each source connector and sink connector inherit their client > configurations from the worker properties. Within the worker properties, all > configurations that have a prefix of "producer." or "consumer." are applied > to all source connectors and sink connectors respectively. > We should also provide connector-level overrides whereby connector properties > that are prefixed with "producer." and "consumer." are used to feed into the > producer and consumer clients embedded within source and sink connectors > respectively. The prefixes will be removed via a String#substring() call, and > the remainder of the connector property key will be used as the client > configuration key. The value is fed directly to the client as the > configuration value. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7593) Infinite restart loop when failed to store big config for task
[ https://issues.apache.org/jira/browse/KAFKA-7593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676944#comment-16676944 ] Randall Hauch commented on KAFKA-7593: -- [~olkuznsmith], have you tried manually setting the {{max.message.bytes}} config to a larger value on the {{connect-configs}} topic in the broker? If so, then this is a good workaround in case other users run into this. However, as you suggest we should evaluate how to respond when when attempts to write to the config topic fail. Logging might be easy (e.g., add a callback when the KafkaConfigBackingStore sends messages to the log), but it still might be difficult to prevent the loop. It might also be good to set {{max.message.bytes}} to a larger value when creating the topic (no KIP required). Or, we could expose a new property for the internal topics on the distributed worker to allow users to set this (requires a KIP). IMO, you can always create this topic ahead of time, so maybe simply increasing the default {{max.message.bytes}} topic config is sufficient. > Infinite restart loop when failed to store big config for task > -- > > Key: KAFKA-7593 > URL: https://issues.apache.org/jira/browse/KAFKA-7593 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Oleg Kuznetsov >Priority: Major > > In case when config message for config topic is greater than kafka broker > allows to store, source connector starts infinite restart loop without any > error indication. > There could be an exception thrown in this case or a smarter handling of big > config. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException
[ https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676940#comment-16676940 ] Rajini Sivaram commented on KAFKA-3980: --- [~jintianfan] Do you have quotas enabled? If your broker is still running, you could attach jconsole and see if there are metrics which should have been expired. If not, we could take a look at the heap dump. > JmxReporter uses excessive memory causing OutOfMemoryException > -- > > Key: KAFKA-3980 > URL: https://issues.apache.org/jira/browse/KAFKA-3980 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 0.9.0.1 >Reporter: Andrew Jorgensen >Priority: Major > > I have some nodes in a kafka cluster that occasionally will run out of memory > whenever I restart the producers. I was able to take a heap dump from both a > recently restarted Kafka node which weighed in at about 20 MB and a node that > has been running for 2 months is using over 700MB of memory. Looking at the > heap dump it looks like the JmxReporter is holding on to metrics and causing > them to build up over time. > !http://imgur.com/N6Cd0Ku.png! > !http://imgur.com/kQBqA2j.png! > The ultimate problem this causes is that there is a chance when I restart the > producers it will cause the node to experience an Java heap space exception > and OOM. The nodes then fail to startup correctly and write a -1 as the > leader number to the partitions they were responsible for effectively > resetting the offset and rendering that partition unavailable. The kafka > process then needs to go be restarted in order to re-assign the node to the > partition that it owns. > I have a few questions: > 1. I am not quite sure why there are so many client id entries in that > JmxReporter map. > 2. Is there a way to have the JmxReporter release metrics after a set amount > of time or a way to turn certain high cardinality metrics like these off? > I can provide any logs or heap dumps if more information is needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7280) ConcurrentModificationException in FetchSessionHandler in heartbeat thread
[ https://issues.apache.org/jira/browse/KAFKA-7280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676909#comment-16676909 ] Rajini Sivaram commented on KAFKA-7280: --- [~sachinu] The fix is available in 2.0.1 which is expected to be released very soon. The specific exception in this JIRA is in a code path that is used only when TRACE logging is enabled on the consumer. The fix in this JIRA makes the whole class thread-safe, but if you are seeing the exception without TRACE, can you include the stack trace, so that we can verify if the PR has fixed that too? Thanks. > ConcurrentModificationException in FetchSessionHandler in heartbeat thread > -- > > Key: KAFKA-7280 > URL: https://issues.apache.org/jira/browse/KAFKA-7280 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.1.1, 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 1.1.2, 2.0.1, 2.1.0 > > > Request/response handling in FetchSessionHandler is not thread-safe. But we > are using it in Kafka consumer without any synchronization even though poll() > from heartbeat thread can process responses. Heartbeat thread holds the > coordinator lock while processing its poll and responses, making other > operations involving the group coordinator safe. We also need to lock > FetchSessionHandler for the operations that update or read > FetchSessionHandler#sessionPartitions. > This exception is from a system test run on trunk of > TestSecurityRollingUpgrade.test_rolling_upgrade_sasl_mechanism_phase_two: > {quote}[2018-08-12 06:13:22,316] ERROR [Consumer clientId=console-consumer, > groupId=group] Heartbeat thread failed due to unexpected error > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator) > java.util.ConcurrentModificationException > at > java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719) > at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742) > at > org.apache.kafka.clients.FetchSessionHandler.responseDataToLogString(FetchSessionHandler.java:362) > at > org.apache.kafka.clients.FetchSessionHandler.handleResponse(FetchSessionHandler.java:424) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:216) > at > org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:304) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:996) > {quote} > > The logs just prior to the exception show that a partition was removed from > the session: > {quote}[2018-08-12 06:13:22,315] TRACE [Consumer clientId=console-consumer, > groupId=group] Skipping fetch for partition test_topic-1 because there is an > in-flight request to worker4:9095 (id: 3 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] TRACE [Consumer clientId=console-consumer, > groupId=group] Completed receive from node 2 for FETCH with correlation id > 417, received > {throttle_time_ms=0,error_code=0,session_id=109800960,responses=[{topic=test_topic,partition_responses=[{partition_header= > Unknown macro: > \{partition=2,error_code=0,high_watermark=184,last_stable_offset=-1,log_start_offset=0,aborted_transactions=null} > ,record_set=[(record=DefaultRecord(offset=183, timestamp=1534054402327, key=0 > bytes, value=3 bytes))]}]}]} (org.apache.kafka.clients.NetworkClient) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Added READ_UNCOMMITTED fetch request for partition > test_topic-0 at offset 189 to node worker3:9095 (id: 2 rack: null) > (org.apache.kafka.clients.consumer.internals.Fetcher) > [2018-08-12 06:13:22,316] DEBUG [Consumer clientId=console-consumer, > groupId=group] Built incremental fetch (sessionId=109800960, epoch=237) for > node 2. Added (), altered (), removed (test_topic-2) out of (test_topic-0) > (org.apache.kafka.clients.FetchSessionHandler) > [2018-08-12 06:13:22,316]
[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId
[ https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676896#comment-16676896 ] ASF GitHub Bot commented on KAFKA-5503: --- layfe opened a new pull request #5881: KAFKA-5503 Idempotent producer ignores shutdown while fetching Produc… URL: https://github.com/apache/kafka/pull/5881 …erId Check running in `Sender.maybeWaitForProducerId` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Idempotent producer ignores shutdown while fetching ProducerId > -- > > Key: KAFKA-5503 > URL: https://issues.apache.org/jira/browse/KAFKA-5503 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Evgeny Veretennikov >Priority: Major > Fix For: 2.2.0 > > > When using the idempotent producer, we initially block the sender thread > while we attempt to get the ProducerId. During this time, a concurrent call > to close() will be ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId
[ https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676895#comment-16676895 ] ASF GitHub Bot commented on KAFKA-5503: --- layfe closed pull request #5881: KAFKA-5503 Idempotent producer ignores shutdown while fetching Produc… URL: https://github.com/apache/kafka/pull/5881 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 19d7af2e7a0..6df77677f89 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -492,7 +492,7 @@ private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException } private void maybeWaitForProducerId() { -while (!transactionManager.hasProducerId() && !transactionManager.hasError()) { +while (running && !transactionManager.hasProducerId() && !transactionManager.hasError()) { Node node = null; try { node = awaitLeastLoadedNodeReady(requestTimeoutMs); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Idempotent producer ignores shutdown while fetching ProducerId > -- > > Key: KAFKA-5503 > URL: https://issues.apache.org/jira/browse/KAFKA-5503 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Evgeny Veretennikov >Priority: Major > Fix For: 2.2.0 > > > When using the idempotent producer, we initially block the sender thread > while we attempt to get the ProducerId. During this time, a concurrent call > to close() will be ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7600) Provide capability to rename cluster id
Yeva Byzek created KAFKA-7600: - Summary: Provide capability to rename cluster id Key: KAFKA-7600 URL: https://issues.apache.org/jira/browse/KAFKA-7600 Project: Kafka Issue Type: Improvement Components: core Reporter: Yeva Byzek Enhancement suggestion: ability to configure the cluster id that is displayed in ZK {{/cluster/id}} to be something more human readable like {{datacenter1}} instead of random characters like {{YLD3M3faTFG7ftEvoDGn5Q}}. Value add: downstream clients that use the cluster id can present users with a more meaningful cluster identification Other relevant links: https://cwiki.apache.org/confluence/display/KAFKA/KIP-78%3A+Cluster+Id -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7599) Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second in task status
[ https://issues.apache.org/jira/browse/KAFKA-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski updated KAFKA-7599: --- Summary: Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second in task status (was: Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second) > Trogdor - Allow configuration for not throttling Benchmark Workers and expose > messages per second in task status > > > Key: KAFKA-7599 > URL: https://issues.apache.org/jira/browse/KAFKA-7599 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Major > > In Trogdor, the ConsumeBench, ProduceBench and RoundTrip workers all take in > an argument called "targetMessagesPerSec". That argument works as an upper > bound on the number of messages that can be consumed/produced per second in > that worker. > It is useful to support infinite messages per second. Currently, if the > `targetMessagesPerSec` field is not present in the request, the > RoundTripWorker will raise an exception, whereas the ConsumeBench and > ProduceBench workers will work as if they had `targetMessagesPerSec=10`. > I propose we allow for unbounded `targetMessagesPerSec` if the field is not > present. > Further, it would be very useful if some of these workers showed the > `messagesPerSecond` they have been producing/consuming at. > Even now, giving the worker a `targetMessagesPerSec` does not guarantee that > the worker will reach the needed `targetMessagesPerSec`. There is no easy way > of knowing how the worker performed - you have to subtract the status fields > `startedMs` and `doneMs` to get the total duration of the task, convert to > seconds and then divide that by the `maxMessages` field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7599) Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second
Stanislav Kozlovski created KAFKA-7599: -- Summary: Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second Key: KAFKA-7599 URL: https://issues.apache.org/jira/browse/KAFKA-7599 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski In Trogdor, the ConsumeBench, ProduceBench and RoundTrip workers all take in an argument called "targetMessagesPerSec". That argument works as an upper bound on the number of messages that can be consumed/produced per second in that worker. It is useful to support infinite messages per second. Currently, if the `targetMessagesPerSec` field is not present in the request, the RoundTripWorker will raise an exception, whereas the ConsumeBench and ProduceBench workers will work as if they had `targetMessagesPerSec=10`. I propose we allow for unbounded `targetMessagesPerSec` if the field is not present. Further, it would be very useful if some of these workers showed the `messagesPerSecond` they have been producing/consuming at. Even now, giving the worker a `targetMessagesPerSec` does not guarantee that the worker will reach the needed `targetMessagesPerSec`. There is no easy way of knowing how the worker performed - you have to subtract the status fields `startedMs` and `doneMs` to get the total duration of the task, convert to seconds and then divide that by the `maxMessages` field. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId
[ https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676835#comment-16676835 ] ASF GitHub Bot commented on KAFKA-5503: --- layfe opened a new pull request #5881: KAFKA-5503 Idempotent producer ignores shutdown while fetching Produc… URL: https://github.com/apache/kafka/pull/5881 …erId Check running in `Sender.maybeWaitForProducerId` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Idempotent producer ignores shutdown while fetching ProducerId > -- > > Key: KAFKA-5503 > URL: https://issues.apache.org/jira/browse/KAFKA-5503 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Evgeny Veretennikov >Priority: Major > Fix For: 2.2.0 > > > When using the idempotent producer, we initially block the sender thread > while we attempt to get the ProducerId. During this time, a concurrent call > to close() will be ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId
[ https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676834#comment-16676834 ] ASF GitHub Bot commented on KAFKA-5503: --- layfe closed pull request #5881: KAFKA-5503 Idempotent producer ignores shutdown while fetching Produc… URL: https://github.com/apache/kafka/pull/5881 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 19d7af2e7a0..6df77677f89 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -492,7 +492,7 @@ private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException } private void maybeWaitForProducerId() { -while (!transactionManager.hasProducerId() && !transactionManager.hasError()) { +while (running && !transactionManager.hasProducerId() && !transactionManager.hasError()) { Node node = null; try { node = awaitLeastLoadedNodeReady(requestTimeoutMs); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Idempotent producer ignores shutdown while fetching ProducerId > -- > > Key: KAFKA-5503 > URL: https://issues.apache.org/jira/browse/KAFKA-5503 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Evgeny Veretennikov >Priority: Major > Fix For: 2.2.0 > > > When using the idempotent producer, we initially block the sender thread > while we attempt to get the ProducerId. During this time, a concurrent call > to close() will be ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7598) SIGSEGV on scala library Set
[ https://issues.apache.org/jira/browse/KAFKA-7598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676816#comment-16676816 ] Ismael Juma commented on KAFKA-7598: This is a JVM bug and you should report it to the OpenJDK project if it hasn't been reported already. > SIGSEGV on scala library Set > > > Key: KAFKA-7598 > URL: https://issues.apache.org/jira/browse/KAFKA-7598 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 1.0.0 > Environment: Docker CentOs image 7.3.1611 upgraded to 7.4.1708 > (https://hub.docker.com/r/library/centos/tags/) > java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64 >Reporter: Antoine Tran >Priority: Major > > We had a crash, that appears randomly, with a SIGSEGV to the scala Set > library: > > {color:#FF}[2018-09-24 20:52:04,568] INFO Updated PartitionLeaderEpoch. > New: \{epoch:0, offset:0}, Current: \{epoch:-1, offset-1} for Partition: > fac---MTI2RTM-rtmForecast2d-RCFD_2017100_0260-4. Cache now contains 0 > entries. (kafka.server.epoch.LeaderEpochFileCache){color} > {color:#FF} #{color} > {color:#FF} # A fatal error has been detected by the Java Runtime > Environment:{color} > {color:#FF} #{color} > {color:#FF} # SIGSEGV (0xb) at pc=0x7fdb3998c814, pid=1, > tid=0x7fd9a4588700{color} > {color:#FF} #{color} > {color:#FF} # JRE version: OpenJDK Runtime Environment (8.0_161-b14) > (build 1.8.0_161-b14){color} > {color:#FF} # Java VM: OpenJDK 64-Bit Server VM (25.161-b14 mixed mode > linux-amd64 compressed oops){color} > {color:#FF} # Problematic frame:{color} > {color:#FF} # J 6249 C2 > scala.collection.immutable.Set$EmptySet$.$minus(Ljava/lang/Object;)Lscala/collection/generic/Subtractable; > (6 bytes) @ 0x7fdb3998c814 [0x7fdb3998c7e0+0x34]{color} > {color:#FF} #{color} > {color:#FF} # Core dump written. Default location: //core or core.1{color} > {color:#FF} #{color} > {color:#FF} # An error report file with more information is saved > as:{color} > {color:#FF} # //hs_err_pid1.log{color} > {color:#FF} #{color} > {color:#FF} # If you would like to submit a bug report, please > visit:{color} > {color:#FF} # [http://bugreport.java.com/bugreport/crash.jsp]{color} > {color:#FF} #{color} > [error occurred during error reporting , id 0xb] > I couldn't have the core dump for now, I asked our team for it next time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7597) Trogdor - Support transactions in ProduceBenchWorker
[ https://issues.apache.org/jira/browse/KAFKA-7597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676787#comment-16676787 ] ASF GitHub Bot commented on KAFKA-7597: --- stanislavkozlovski opened a new pull request #5885: KAFKA-7597: Make Trogdor ProduceBenchWorker support transactions URL: https://github.com/apache/kafka/pull/5885 It now accepts a new "messagesPerTransaction" field, which, if present, will enable transactional functionality in the bench worker. The producer will open N transactions with X messages each (bounded by the mandatory "maxMessages" field) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Trogdor - Support transactions in ProduceBenchWorker > > > Key: KAFKA-7597 > URL: https://issues.apache.org/jira/browse/KAFKA-7597 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > The `ProduceBenchWorker` allows you to schedule a benchmark of a Kafka > Producer. > It would prove useful if we supported transactions in this producer, as to > allow benchmarks with transactions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7598) SIGSEGV on scala library Set
Antoine Tran created KAFKA-7598: --- Summary: SIGSEGV on scala library Set Key: KAFKA-7598 URL: https://issues.apache.org/jira/browse/KAFKA-7598 Project: Kafka Issue Type: Bug Components: core Affects Versions: 1.0.0 Environment: Docker CentOs image 7.3.1611 upgraded to 7.4.1708 (https://hub.docker.com/r/library/centos/tags/) java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64 Reporter: Antoine Tran We had a crash, that appears randomly, with a SIGSEGV to the scala Set library: {color:#FF}[2018-09-24 20:52:04,568] INFO Updated PartitionLeaderEpoch. New: \{epoch:0, offset:0}, Current: \{epoch:-1, offset-1} for Partition: fac---MTI2RTM-rtmForecast2d-RCFD_2017100_0260-4. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache){color} {color:#FF} #{color} {color:#FF} # A fatal error has been detected by the Java Runtime Environment:{color} {color:#FF} #{color} {color:#FF} # SIGSEGV (0xb) at pc=0x7fdb3998c814, pid=1, tid=0x7fd9a4588700{color} {color:#FF} #{color} {color:#FF} # JRE version: OpenJDK Runtime Environment (8.0_161-b14) (build 1.8.0_161-b14){color} {color:#FF} # Java VM: OpenJDK 64-Bit Server VM (25.161-b14 mixed mode linux-amd64 compressed oops){color} {color:#FF} # Problematic frame:{color} {color:#FF} # J 6249 C2 scala.collection.immutable.Set$EmptySet$.$minus(Ljava/lang/Object;)Lscala/collection/generic/Subtractable; (6 bytes) @ 0x7fdb3998c814 [0x7fdb3998c7e0+0x34]{color} {color:#FF} #{color} {color:#FF} # Core dump written. Default location: //core or core.1{color} {color:#FF} #{color} {color:#FF} # An error report file with more information is saved as:{color} {color:#FF} # //hs_err_pid1.log{color} {color:#FF} #{color} {color:#FF} # If you would like to submit a bug report, please visit:{color} {color:#FF} # [http://bugreport.java.com/bugreport/crash.jsp]{color} {color:#FF} #{color} [error occurred during error reporting , id 0xb] I couldn't have the core dump for now, I asked our team for it next time. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7595) Kafka Streams: KTrable to KTable join introduces duplicates in downstream KTable
[ https://issues.apache.org/jira/browse/KAFKA-7595?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676728#comment-16676728 ] Damian Guy commented on KAFKA-7595: --- [~vvcephei] your reasoning seems valid to me > Kafka Streams: KTrable to KTable join introduces duplicates in downstream > KTable > > > Key: KAFKA-7595 > URL: https://issues.apache.org/jira/browse/KAFKA-7595 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Vik Gamov >Priority: Major > > When perform KTable to KTable join after aggregation, there are duplicates in > resulted KTable. > 1. caching disabled, no materialized => duplicates > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}} > {{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue());}} > 2. caching disabled, materialized => duplicate > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, > 0);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > 3. caching enabled, materiazlized => all good > {{// Enable record cache of size 10 MB.}} > {{streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 > * 1024 * 1024L);}} > {{// Set commit interval to 1 second.}} > {{streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, > 1000);}}{{KTable ratingCounts = ratingsById.count();}} > {{KTable ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);}} > {{KTable ratingAverage = ratingSums.join(ratingCounts,}} > {{ (sum, count) -> sum / count.doubleValue(),}} > {{ Materialized.as("average-ratings"));}} > > Demo app > [https://github.com/tlberglund/streams-movie-demo/blob/master/streams/src/main/java/io/confluent/demo/StreamsDemo.java#L107] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-3980) JmxReporter uses excessive memory causing OutOfMemoryException
[ https://issues.apache.org/jira/browse/KAFKA-3980?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676724#comment-16676724 ] Jin Tianfan commented on KAFKA-3980: [~ijuma] [~rsivaram] I meet the same problem.My kafka broker version kafka_2.11-0.11.0.0. we use jmap -histo:live pId,this is the output: -- num #instances #bytes class name -- 1: 10468626 1241525032 [C 2: 5186139 448500296 [Ljava.util.HashMap$Node; 3: 10468427 251242248 java.lang.String 4: 10389530 249348720 javax.management.ObjectName$Property 5: 10372951 249121296 [Ljavax.management.ObjectName$Property; 6: 5179185 248600880 java.util.HashMap 7: 5186476 207459040 javax.management.ObjectName 8: 5240552 167697664 java.util.HashMap$Node 9: 6302 139607712 [B 10: 5176173 124228152 org.apache.kafka.common.metrics.JmxReporter$KafkaMbean 11: 5176210 82819360 java.util.HashMap$EntrySet 12: 90003 2160072 java.util.concurrent.ConcurrentSkipListMap$Node 13: 84784 2034816 java.lang.Double 14: 45662 1461184 java.util.concurrent.ConcurrentHashMap$Node 15: 25106 1244408 [Ljava.lang.Object; 16: 43453 1042872 java.util.concurrent.ConcurrentSkipListMap$Index 17: 18418 736720 java.util.LinkedHashMap$Entry -- our jvm info as below: /data/program/java/bin/java -Xmx4G -Xms4G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true -Xloggc:/data/program/kafka/kafka_2.11-0.11.0.0/bin/../logs/kafkaServer-gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Dkafka.logs.dir=/data/program/kafka/kafka_2.11-0.11.0.0/bin/../logs -Dlog4j.configuration= we found too many metrics data in our borker memory.The broker has running healthly nearly one year.I review the code,only ReplicationQuotaManager & ClientQuotaManager set sensor expire time 1hour.other sensors set sensor expire time Long.MAX_VALUE. Is this cause too many metrics in my heap? if you need i will send you my dump file. I hope to receive your reply as soon as possible. > JmxReporter uses excessive memory causing OutOfMemoryException > -- > > Key: KAFKA-3980 > URL: https://issues.apache.org/jira/browse/KAFKA-3980 > Project: Kafka > Issue Type: Bug > Components: metrics >Affects Versions: 0.9.0.1 >Reporter: Andrew Jorgensen >Priority: Major > > I have some nodes in a kafka cluster that occasionally will run out of memory > whenever I restart the producers. I was able to take a heap dump from both a > recently restarted Kafka node which weighed in at about 20 MB and a node that > has been running for 2 months is using over 700MB of memory. Looking at the > heap dump it looks like the JmxReporter is holding on to metrics and causing > them to build up over time. > !http://imgur.com/N6Cd0Ku.png! > !http://imgur.com/kQBqA2j.png! > The ultimate problem this causes is that there is a chance when I restart the > producers it will cause the node to experience an Java heap space exception > and OOM. The nodes then fail to startup correctly and write a -1 as the > leader number to the partitions they were responsible for effectively > resetting the offset and rendering that partition unavailable. The kafka > process then needs to go be restarted in order to re-assign the node to the > partition that it owns. > I have a few questions: > 1. I am not quite sure why there are so many client id entries in that > JmxReporter map. > 2. Is there a way to have the JmxReporter release metrics after a set amount > of time or a way to turn certain high cardinality metrics like these off? > I can provide any logs or heap dumps if more information is needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId
[ https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676714#comment-16676714 ] ASF GitHub Bot commented on KAFKA-5503: --- layfe closed pull request #5881: KAFKA-5503 Idempotent producer ignores shutdown while fetching Produc… URL: https://github.com/apache/kafka/pull/5881 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 19d7af2e7a0..6df77677f89 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -492,7 +492,7 @@ private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException } private void maybeWaitForProducerId() { -while (!transactionManager.hasProducerId() && !transactionManager.hasError()) { +while (running && !transactionManager.hasProducerId() && !transactionManager.hasError()) { Node node = null; try { node = awaitLeastLoadedNodeReady(requestTimeoutMs); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Idempotent producer ignores shutdown while fetching ProducerId > -- > > Key: KAFKA-5503 > URL: https://issues.apache.org/jira/browse/KAFKA-5503 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Evgeny Veretennikov >Priority: Major > Fix For: 2.2.0 > > > When using the idempotent producer, we initially block the sender thread > while we attempt to get the ProducerId. During this time, a concurrent call > to close() will be ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId
[ https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676715#comment-16676715 ] ASF GitHub Bot commented on KAFKA-5503: --- layfe opened a new pull request #5881: KAFKA-5503 Idempotent producer ignores shutdown while fetching Produc… URL: https://github.com/apache/kafka/pull/5881 …erId Check running in `Sender.maybeWaitForProducerId` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Idempotent producer ignores shutdown while fetching ProducerId > -- > > Key: KAFKA-5503 > URL: https://issues.apache.org/jira/browse/KAFKA-5503 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Evgeny Veretennikov >Priority: Major > Fix For: 2.2.0 > > > When using the idempotent producer, we initially block the sender thread > while we attempt to get the ProducerId. During this time, a concurrent call > to close() will be ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7597) Trogdor - Support transactions in ProduceBenchWorker
Stanislav Kozlovski created KAFKA-7597: -- Summary: Trogdor - Support transactions in ProduceBenchWorker Key: KAFKA-7597 URL: https://issues.apache.org/jira/browse/KAFKA-7597 Project: Kafka Issue Type: Improvement Reporter: Stanislav Kozlovski Assignee: Stanislav Kozlovski The `ProduceBenchWorker` allows you to schedule a benchmark of a Kafka Producer. It would prove useful if we supported transactions in this producer, as to allow benchmarks with transactions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5503) Idempotent producer ignores shutdown while fetching ProducerId
[ https://issues.apache.org/jira/browse/KAFKA-5503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676621#comment-16676621 ] ASF GitHub Bot commented on KAFKA-5503: --- layfe closed pull request #5881: KAFKA-5503 Idempotent producer ignores shutdown while fetching Produc… URL: https://github.com/apache/kafka/pull/5881 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 19d7af2e7a0..6df77677f89 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -492,7 +492,7 @@ private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException } private void maybeWaitForProducerId() { -while (!transactionManager.hasProducerId() && !transactionManager.hasError()) { +while (running && !transactionManager.hasProducerId() && !transactionManager.hasError()) { Node node = null; try { node = awaitLeastLoadedNodeReady(requestTimeoutMs); This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Idempotent producer ignores shutdown while fetching ProducerId > -- > > Key: KAFKA-5503 > URL: https://issues.apache.org/jira/browse/KAFKA-5503 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer >Affects Versions: 0.11.0.0 >Reporter: Jason Gustafson >Assignee: Evgeny Veretennikov >Priority: Major > Fix For: 2.2.0 > > > When using the idempotent producer, we initially block the sender thread > while we attempt to get the ProducerId. During this time, a concurrent call > to close() will be ignored. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7560) Client Quota - system test failure
[ https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676512#comment-16676512 ] Dong Lin commented on KAFKA-7560: - Initially I thought the test failure is related to quota logic in this test and thus I would be one of the best person to debug this test. Now it seems that the test failed because the test suite is not able to read metrics from producer using the solution developed in [https://github.com/apache/kafka/pull/4072.] More specifically, the log message shows that 5 messages are successfully produced and consumed. But do_POST in http.py is never called and thus we have the exception shown in the Jira description. [~ewencp] [~apurva] could you have time to take a look since you are probably more familiar with the HTTP based approach of sending metrics here? I will also try to debug further. > Client Quota - system test failure > -- > > Key: KAFKA-7560 > URL: https://issues.apache.org/jira/browse/KAFKA-7560 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Dong Lin >Priority: Blocker > > The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails > when I run it locally. It produces the following error message: > {code:java} > File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, > in validate metric.value for k, metrics in > producer.metrics(group='producer-metrics', name='outgoing-byte-rate', > client_id=producer.client_id) for metric in metrics ValueError: max() arg is > an empty sequence > {code} > I assume it cannot find the metric it's searching for -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7560) Client Quota - system test failure
[ https://issues.apache.org/jira/browse/KAFKA-7560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin updated KAFKA-7560: Priority: Blocker (was: Major) > Client Quota - system test failure > -- > > Key: KAFKA-7560 > URL: https://issues.apache.org/jira/browse/KAFKA-7560 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.1.0 >Reporter: Stanislav Kozlovski >Assignee: Dong Lin >Priority: Blocker > > The test under `kafkatest.tests.client.quota_test.QuotaTest.test_quota` fails > when I run it locally. It produces the following error message: > {code:java} > File "/opt/kafka-dev/tests/kafkatest/tests/client/quota_test.py", line 196, > in validate metric.value for k, metrics in > producer.metrics(group='producer-metrics', name='outgoing-byte-rate', > client_id=producer.client_id) for metric in metrics ValueError: max() arg is > an empty sequence > {code} > I assume it cannot find the metric it's searching for -- This message was sent by Atlassian JIRA (v7.6.3#76005)